Netty 进阶

Netty 进阶

一、黏包与半包

1.1 黏包现象

/**
 * 服务端
 */
@Slf4j
public class HelloWorldServer{
    void start(){
        NioEventLoopGroup boss = new NioEventLoopGroup();
        NioEventLoopGroup worker = new NioEventLoopGroup();
        try{
            ServerBootstrap serverBootstrap = new ServerBootstrap();
            serverBootstrap.channel(NioServerSocketChannel.class);
            // 将接收缓冲区调小些观察半包现象
            serverBootstrap.option(ChannelOption.SO_RCVBUF, 10);
            serverBootstrap.group(boss, worker);
            serverBootstrap.childHandler(new ChannelInitializer<SocketChannel>(){
                @Override
                protected void initChannel(SocketChannel ch) throws Exception{
                    ch.pipeline().addLast(new LoggingHandler(LogLevel.DEBUG));
                }
            });
            ChannelFuture channelFuture = serverBootstrap.bind(8080).sync();
            channelFuture.channel().closeFuture().sync();
        } catch(InterruptedException e){
            log.error("server error", e);
        } finally {
            boss.shutdownGracefully();
            worker.shutdownGracefully();
        }
    }
}

/**
 * 客户端
 */
public class HelloWorldClient{
    static final Logger log = LoggerFactory.getLogger(HelloWorldClient.class);
  
    public static void main(String[] args){
        NioEventLoopGroup worker = new NioEventLoopGroup();
        try{
            Bootstrap bootstrap = new Bootstrap();
            bootstrap.channel(NioSocketChannel.class);
            bootstrap.group(worker);
            bootstrap.handler(new ChannelInitializer<SocketChannel>(){
                @Override
                protected void initChannel(SocketChannel ch){
                    ch.pipeline().addLast(new ChannelInboundHandlerAdapter(){
                        // 会在连接channel建立成功以后,会触发channelActive事件
                        @Override
                        public void channelActive(ChannelHandlerContext ctx){
                            for(int i = 0; i < 10; i++){
                                ByteBuf buf = ctx.alloc().buffer(16);
                                buf.writeBytes(new byte[]{0, 1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12, 13, 14, 15});
                                ctx.writeAndFlush(buf);
                            }
                        }
                    });
                }
            });
            ChannelFuture channelFuture = bootstrap.connect("127.0.0.1", 8080).sync();
            channelFuture.channel().closeFuture().sync();
        } catch(InterruptedException e){
            log.error("client error", e);
        } finally {
            worker.shutdownGracefully();
        }
    }
}

滑动窗口

  • TCP 以一个段(segment)为单位,每发送一个段就需要进行一次确认应答(ack)处理,但如果这么做,缺点是包的往返时间越长性能就越差。
  • 为了解决此问题,引入了窗口概念,窗口大小即决定了无需等待应答而可以继续发送的数据最大值
  • 窗口实际就起到一个缓冲区的作用,同时也能起到流量控制的作用
    • 图中深色的部分即要发送的数据,高亮的部分即窗口
    • 窗口内的数据才允许被发送,当应答未到达前,窗口必须停止滑动
    • 如果 1001~2000 这个段的数据 ack 回来了,窗口就可以向前滑动
    • 接收方也会维护一个窗口,只有落在窗口内的数据才能允许接收

Netty 中黏包半包现象分析

黏包

  • 现象,发送 abc def,接收 abcdef
  • 原因
    • 应用层:接收方 ByteBuf 设置太大(Netty 默认 1024)
    • 滑动窗口:假设发送方 256byte 表示一个完整报文,但由于接收方处理不及时且窗口大小足够大,这 256bytes 字节就会缓冲在接收方的滑动窗口中,当滑动窗口中缓冲了多个报文就会黏包。 【TCP】
    • Nagle 算法:会造成黏包 【TCP】

半包

  • 现象,发送 abcdef,接收 abc def
  • 原因
    • 应用层:接收方 ByteBuf 小于实际发送数据量
    • 滑动窗口:假设接收方的窗口只剩了 128bytes,发送方的报文大小是 256bytes,这时放不下了,只能先发送前 128bytes,等待 ack 后才能发送剩余部分,这就造成了半包【TCP】
    • MSS 限制:当发送的数据超过 MSS 限制后,会将数据切分发送,就会造成半包
本质是因为TCP是流式协议,消息无边界

黏包半包解决:

  • 短连接:可以解决黏包问题,但无法解决半包问题
  • 定长解码器:可以解决黏包半包问题,但是占用的字节数比较多,造成浪费
  • 行解码器:(采用分割符确定消息边界)
  • LTC 解码器:

二、协议设计与解析

2.1 redis

@Slf4j
public class TestRedis{
    /**
     set name zhangsan
     *3
     $3
     set
     $4
     name
     $8
     zhangsan
     */
    public static void main(String[] args){
        final byte[] LINE = {13, 10};	// 回车 换行
        NioEventLoopGroup worker = new NioEventLoopGroup();
        try{
            Bootstrap bootstrap = new Bootstrap();
            bootstrap.channel(NioSocketChannel.class);
            bootstrap.group(worker);
            bootstrap.handler(new ChannelInitializer<SocketChannel>(){
                @Override
                protected void initChannel(SocketChannel ch){
                    ch.pipeline().addLast(new LoggingHandler());
                    ch.pipeline().addLast(new ChannelInboundHandlerAdapter(){
                        @Override
                        public void channelActive(ChannelHandlerContext ctx){
                            ByteBuf buf = ctx.alloc().buffer();
                            buf.writeBytes("*3".getBytes());
                            buf.writeBytes(LINE);
                            buf.writeBytes("$3".getBytes());
                            buf.writeBytes(LINE);
                            buf.writeBytes("set".getBytes());
                            buf.writeBytes(LINE);
                            buf.writeBytes("$4".getBytes());
                            buf.writeBytes(LINE);
                            buf.writeBytes("name".getBytes());
                            buf.writeBytes(LINE);
                            buf.writeBytes("$8".getBytes());
                            buf.writeBytes(LINE);
                            buf.writeBytes("zhangsan".getBytes());
                            buf.writeBytes(LINE);
                            ctx.writeAndFlush(buf);
                        }
                      
                        @Override
                        public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception{
                            ByteBuf buf = (ByteBuf) msg;
                            sout(buf.toString(Charset.defaultCharset()));
                        }
                    });
                }
            });
            ChannelFuture channelFuture = bootstrap.connect("localhost", 6379).sync();
            channelFuture.channel().closeFuture().sync();
        } catch(InterruptedException e){
            log.error("client error", e);
        } finally {
            worker.shutdownGracefully();
        }
    }
}

2.2 http

@Slf4j
public class TestHttp{
    public static void main(String[] args){
        NioEventLoopGroup boss = new NioEventLoopGroup();
        NioEventLoopGroup worker = new NioEventLoopGroup();
        try{
            ServerBootstrap serverBootstrap = new ServerBootstrap();
            serverBootstrap.channel(NioServerSocketChannel.class);
            serverBootstrap.group(boss, worker);
            serverBootstrap.childHandler(new ChannelInitializer<SocketChannel>(){
                @Override
                protected void initChannel(SocketChannel ch) throws Exception{
                    ch.pipeline().addLast(new LoggingHandler(LogLevel.DEBUG));
                    ch.pipeline().addLast(new HttpServerCodec());
                    // 上一个handler传入下一个handler特定的类型,下例只有符合HttpRequest类型的消息会被处理
                    ch.pipeline().addLast(new SimpleChannelInboundHandler<HttpRequest>(){
                        @Override
                        protected void channelRead0(ChannelHandlerContext ctx, HttpRequest msg) throws Exception{
                            // 获取请求
                            log.debug(msg.uri());
                            log.debug(msg.headers());
                            // 返回响应
                            DefaultFullHttpResponse response = new DefaultFullHttpResponse(msg.protocolVersion(), HttpResponseStatus.OK);
                            byte[] bytes = "<h1>hell world</h1>".getBytes();
                            // 如果不设置长度浏览器会一直等待获取资源
                            response.headers().setInt(CONTENT_LENGTH, bytes.length);
                            response.content().writeBytes(bytes);
                            // 写回响应
                            ctx.writeAndFlush(response);
                        }
                    });
                    /**
                    ch.pipeline().addLast(new ChannelInboundHandlerAdapter(){
                        @Override
                        public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception{
                            log.debug("{}", msg.getClass());
                          
                            if(msg instanceof HttpRequest){	// 请求行 请求头
                              
                            } else if(msg instanceof HttpContent){	// 请求体
                              
                            }
                        }
                    });
                    */
                }
            });
            ChannelFuture channelFuture = serverBootstrap.bind(8080).sync();
            channelFuture.channel().closeFuture().sync();
        } catch(InterruptedException e){
            log.error("server error", e);
        } finally{
            boss.shutdownGracefully();
            worker.shutdownGracefully();
        }
    }
}

2.3 自定义协议要素

  • 魔数:一般发送的头几个都是数字都是魔数,例如 Java 的二进制字节码的起始 8 个字节就是魔数,用来第一时间判定是否是无效数据包。(例如 cafebabe)
  • 版本号:可以支持协议升级
  • 序列化算法:消息正文采用哪种序列化反序列化方式,可以由此扩展,例如:json、protobuf(google)、hessian、jdk(不能跨平台、性能一般)
  • 指令类型:是登录、注册、单聊、群聊...跟业务相关
  • 请求序号:为了双工通信,提供异步能力
  • 正文长度
  • 消息正文

2.4 自定义消息编码解码

@Data
public abstract Message implements Serializable{
  
    public static Class<?> getMessageClass(int messageType){
        return messageClasses.get(messageType);
    }
  
    private int sequenceId;
  
    private int messageType;
  
    public abstract int getMessageType();
  
    public static final int LoginRequestMessage = 0;		// 登录请求消息
    public static final int LoginResponseMessage = 1;		// 登录响应消息
    public static final int ChatRequestMessage = 2;			// 发送一条聊天信息
    public static final int ChatResponseMessage = 3;		// 获取一条聊天信息
    public static final int GroupCreateRequestMessage = 4;	// 创建聊天室的请求
    public static final int GroupCreateResponseMessage = 5;	// 创建聊天室的响应
    public static final int GroupJoinRequestMessage = 6;
    public static final int GroupJoinResponseMessage = 7;
    public static final int GroupQuitRequestMessage = 8;
    public static final int GroupQuitResponseMessage = 9;
    public static final int GroupChatRequestMessage = 10;
    public static final int GroupChatResponseMessage = 11;
    public static final int GroupMemberRequestMessage = 12;
    public static final int GroupMemberResponseMessage = 13;
    private static final Map<Integer, Class<?>> messageClassses = new HashMap<>();
  
    static{
      
    }
}
/**
 * 登录请求消息
 */
 @Data
 @ToString(callSuper = true)
 public class LoginRequestMessage extends Message{
 	 private String username;
     private String password;
     private String nickname;
   
     public LoginRequestMessage(){}
   
     public LoginRequestMessage(String username, String password, String nickname){
         this.username = username;
         this.password = password;
         this.nickname = nickname;
     }
   
     @Override
     public int getMessageType(){
         return LoginRequestMessage;
     }
 }
/**
 * 编码解码器
 */
@Slf4j
public class MessageCodec extends ByteToMessageCodec<Message>{
    @Override
    protected void encode(ChannelHandlerContext ctx, Message msg, ByteBuf out) throws Exception{
        // 1、 4字节的魔数
		out.writeBytes(new byte[]{1, 2, 3, 4});
        // 2、 1字节的版本
        out.writeByte(1);
        // 3、 1字节的序列化方式 jdk 0, json 1
        out.writeByte(0);
        // 4、 1字节的指令类型
        out.writeByte(msg.getMessageType());
        // 5、 4个字节
        out.writeInt(msg.getSequenceId());
        // 无意义,对其填充
        out.writeByte(0xff);
        // 6、 获取内容的字节数组
        ByteArrayOutputStream bos = new ByteArrayOutputStream();
        ObjectOutputStream oos = new ObjectOutputStream(bos);
        oos.writeObject(msg);
        byte[] bytes = bos.toByteArray();
        // 7、 长度
        out.writeInt(bytes.length);
        // 8、 写入内容
        out.writeBytes(bytes);
    }
  
    @Override
    protected void decode(ChannelHandlerContext ctx, ByteBuf in, List<Object> out) throws Exception{
        int magicNum = in.readInt();
        byte version = in.readByte();
        byte serializerType = in.readByte();
        byte messageType = in.readByte();
        int sequenceId = in.readInt();
        in.readByte();
        int length = in.readInt();
        byte[] bytes = new byte[length];
        in.readBytes(bytes, 0, length);
        if(serializerType == 0){
            ObjectInputStream ois = new ObjectInputStream(new ByteArrayInputStream(bytes));
            Message message = (Message) ois.readObject();
        }
        log.debug("{},{},{},{},{},{}", magicNum, version, serializerType, messageType, sequenceId, length);
        log.debug("{}", message);
        out.add(message);	// 为了给下一个handler使用
    }
}
/**
 * 测试
 */
public class TestMessageCodec {
    public static void main(String[] args) throws Exception{
        EmbeddedChannel channel = new EmbeddedChannel(
            new LoggingHandler(),
            // 解决decode中的黏包半包问题
            new LengthFieldBasedFrameDecoder(1024, 12, 4, 0, 0),
            new MessageCodec()
        );
      
        // encode
        LoginRequestMessage message = new LoginRequestMessage("zhangsan", "123", "张三");
        channel.writeOutbound(message);
      
        // decode
        ByteBuf buf = ByteBufAllocator.DEFAULT.buffer();
        new MessageCodec().encode(null, message, buf);
      
        // 入站
        channel.writeInbound(buf);
    }
}
  • Netty

    Netty 是一个基于 NIO 的客户端-服务器编程框架,使用 Netty 可以让你快速、简单地开发出一个可维护、高性能的网络应用,例如实现了某种协议的客户、服务端应用。

    49 引用 • 33 回帖 • 23 关注
  • IO
    8 引用 • 20 回帖
  • 网络
    128 引用 • 177 回帖 • 3 关注

相关帖子

1 回帖

欢迎来到这里!

我们正在构建一个小众社区,大家在这里相互信任,以平等 • 自由 • 奔放的价值观进行分享交流。最终,希望大家能够找到与自己志同道合的伙伴,共同成长。

注册 关于
请输入回帖内容 ...
  • amethystfob
    作者

    1、居然有外链图不能推送社区 2、居然只有推到社区的文章才能评论。。。很无语的设定。。