Netty 进阶

本贴最后更新于 532 天前,其中的信息可能已经斗转星移

Netty 进阶

一、黏包与半包

1.1 黏包现象

/** * 服务端 */ @Slf4j public class HelloWorldServer{ void start(){ NioEventLoopGroup boss = new NioEventLoopGroup(); NioEventLoopGroup worker = new NioEventLoopGroup(); try{ ServerBootstrap serverBootstrap = new ServerBootstrap(); serverBootstrap.channel(NioServerSocketChannel.class); // 将接收缓冲区调小些观察半包现象 serverBootstrap.option(ChannelOption.SO_RCVBUF, 10); serverBootstrap.group(boss, worker); serverBootstrap.childHandler(new ChannelInitializer<SocketChannel>(){ @Override protected void initChannel(SocketChannel ch) throws Exception{ ch.pipeline().addLast(new LoggingHandler(LogLevel.DEBUG)); } }); ChannelFuture channelFuture = serverBootstrap.bind(8080).sync(); channelFuture.channel().closeFuture().sync(); } catch(InterruptedException e){ log.error("server error", e); } finally { boss.shutdownGracefully(); worker.shutdownGracefully(); } } } /** * 客户端 */ public class HelloWorldClient{ static final Logger log = LoggerFactory.getLogger(HelloWorldClient.class); public static void main(String[] args){ NioEventLoopGroup worker = new NioEventLoopGroup(); try{ Bootstrap bootstrap = new Bootstrap(); bootstrap.channel(NioSocketChannel.class); bootstrap.group(worker); bootstrap.handler(new ChannelInitializer<SocketChannel>(){ @Override protected void initChannel(SocketChannel ch){ ch.pipeline().addLast(new ChannelInboundHandlerAdapter(){ // 会在连接channel建立成功以后,会触发channelActive事件 @Override public void channelActive(ChannelHandlerContext ctx){ for(int i = 0; i < 10; i++){ ByteBuf buf = ctx.alloc().buffer(16); buf.writeBytes(new byte[]{0, 1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12, 13, 14, 15}); ctx.writeAndFlush(buf); } } }); } }); ChannelFuture channelFuture = bootstrap.connect("127.0.0.1", 8080).sync(); channelFuture.channel().closeFuture().sync(); } catch(InterruptedException e){ log.error("client error", e); } finally { worker.shutdownGracefully(); } } }

滑动窗口

  • TCP 以一个段(segment)为单位,每发送一个段就需要进行一次确认应答(ack)处理,但如果这么做,缺点是包的往返时间越长性能就越差。
  • 为了解决此问题,引入了窗口概念,窗口大小即决定了无需等待应答而可以继续发送的数据最大值
  • 窗口实际就起到一个缓冲区的作用,同时也能起到流量控制的作用
    • 图中深色的部分即要发送的数据,高亮的部分即窗口
    • 窗口内的数据才允许被发送,当应答未到达前,窗口必须停止滑动
    • 如果 1001~2000 这个段的数据 ack 回来了,窗口就可以向前滑动
    • 接收方也会维护一个窗口,只有落在窗口内的数据才能允许接收

Netty 中黏包半包现象分析

黏包

  • 现象,发送 abc def,接收 abcdef
  • 原因
    • 应用层:接收方 ByteBuf 设置太大(Netty 默认 1024)
    • 滑动窗口:假设发送方 256byte 表示一个完整报文,但由于接收方处理不及时且窗口大小足够大,这 256bytes 字节就会缓冲在接收方的滑动窗口中,当滑动窗口中缓冲了多个报文就会黏包。 【TCP】
    • Nagle 算法:会造成黏包 【TCP】

半包

  • 现象,发送 abcdef,接收 abc def
  • 原因
    • 应用层:接收方 ByteBuf 小于实际发送数据量
    • 滑动窗口:假设接收方的窗口只剩了 128bytes,发送方的报文大小是 256bytes,这时放不下了,只能先发送前 128bytes,等待 ack 后才能发送剩余部分,这就造成了半包【TCP】
    • MSS 限制:当发送的数据超过 MSS 限制后,会将数据切分发送,就会造成半包
本质是因为TCP是流式协议,消息无边界

黏包半包解决:

  • 短连接:可以解决黏包问题,但无法解决半包问题
  • 定长解码器:可以解决黏包半包问题,但是占用的字节数比较多,造成浪费
  • 行解码器:(采用分割符确定消息边界)
  • LTC 解码器:

二、协议设计与解析

2.1 redis

@Slf4j public class TestRedis{ /** set name zhangsan *3 $3 set $4 name $8 zhangsan */ public static void main(String[] args){ final byte[] LINE = {13, 10}; // 回车 换行 NioEventLoopGroup worker = new NioEventLoopGroup(); try{ Bootstrap bootstrap = new Bootstrap(); bootstrap.channel(NioSocketChannel.class); bootstrap.group(worker); bootstrap.handler(new ChannelInitializer<SocketChannel>(){ @Override protected void initChannel(SocketChannel ch){ ch.pipeline().addLast(new LoggingHandler()); ch.pipeline().addLast(new ChannelInboundHandlerAdapter(){ @Override public void channelActive(ChannelHandlerContext ctx){ ByteBuf buf = ctx.alloc().buffer(); buf.writeBytes("*3".getBytes()); buf.writeBytes(LINE); buf.writeBytes("$3".getBytes()); buf.writeBytes(LINE); buf.writeBytes("set".getBytes()); buf.writeBytes(LINE); buf.writeBytes("$4".getBytes()); buf.writeBytes(LINE); buf.writeBytes("name".getBytes()); buf.writeBytes(LINE); buf.writeBytes("$8".getBytes()); buf.writeBytes(LINE); buf.writeBytes("zhangsan".getBytes()); buf.writeBytes(LINE); ctx.writeAndFlush(buf); } @Override public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception{ ByteBuf buf = (ByteBuf) msg; sout(buf.toString(Charset.defaultCharset())); } }); } }); ChannelFuture channelFuture = bootstrap.connect("localhost", 6379).sync(); channelFuture.channel().closeFuture().sync(); } catch(InterruptedException e){ log.error("client error", e); } finally { worker.shutdownGracefully(); } } }

2.2 http

@Slf4j public class TestHttp{ public static void main(String[] args){ NioEventLoopGroup boss = new NioEventLoopGroup(); NioEventLoopGroup worker = new NioEventLoopGroup(); try{ ServerBootstrap serverBootstrap = new ServerBootstrap(); serverBootstrap.channel(NioServerSocketChannel.class); serverBootstrap.group(boss, worker); serverBootstrap.childHandler(new ChannelInitializer<SocketChannel>(){ @Override protected void initChannel(SocketChannel ch) throws Exception{ ch.pipeline().addLast(new LoggingHandler(LogLevel.DEBUG)); ch.pipeline().addLast(new HttpServerCodec()); // 上一个handler传入下一个handler特定的类型,下例只有符合HttpRequest类型的消息会被处理 ch.pipeline().addLast(new SimpleChannelInboundHandler<HttpRequest>(){ @Override protected void channelRead0(ChannelHandlerContext ctx, HttpRequest msg) throws Exception{ // 获取请求 log.debug(msg.uri()); log.debug(msg.headers()); // 返回响应 DefaultFullHttpResponse response = new DefaultFullHttpResponse(msg.protocolVersion(), HttpResponseStatus.OK); byte[] bytes = "<h1>hell world</h1>".getBytes(); // 如果不设置长度浏览器会一直等待获取资源 response.headers().setInt(CONTENT_LENGTH, bytes.length); response.content().writeBytes(bytes); // 写回响应 ctx.writeAndFlush(response); } }); /** ch.pipeline().addLast(new ChannelInboundHandlerAdapter(){ @Override public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception{ log.debug("{}", msg.getClass()); if(msg instanceof HttpRequest){ // 请求行 请求头 } else if(msg instanceof HttpContent){ // 请求体 } } }); */ } }); ChannelFuture channelFuture = serverBootstrap.bind(8080).sync(); channelFuture.channel().closeFuture().sync(); } catch(InterruptedException e){ log.error("server error", e); } finally{ boss.shutdownGracefully(); worker.shutdownGracefully(); } } }

2.3 自定义协议要素

  • 魔数:一般发送的头几个都是数字都是魔数,例如 Java 的二进制字节码的起始 8 个字节就是魔数,用来第一时间判定是否是无效数据包。(例如 cafebabe)
  • 版本号:可以支持协议升级
  • 序列化算法:消息正文采用哪种序列化反序列化方式,可以由此扩展,例如:json、protobuf(google)、hessian、jdk(不能跨平台、性能一般)
  • 指令类型:是登录、注册、单聊、群聊...跟业务相关
  • 请求序号:为了双工通信,提供异步能力
  • 正文长度
  • 消息正文

2.4 自定义消息编码解码

@Data public abstract Message implements Serializable{ public static Class<?> getMessageClass(int messageType){ return messageClasses.get(messageType); } private int sequenceId; private int messageType; public abstract int getMessageType(); public static final int LoginRequestMessage = 0; // 登录请求消息 public static final int LoginResponseMessage = 1; // 登录响应消息 public static final int ChatRequestMessage = 2; // 发送一条聊天信息 public static final int ChatResponseMessage = 3; // 获取一条聊天信息 public static final int GroupCreateRequestMessage = 4; // 创建聊天室的请求 public static final int GroupCreateResponseMessage = 5; // 创建聊天室的响应 public static final int GroupJoinRequestMessage = 6; public static final int GroupJoinResponseMessage = 7; public static final int GroupQuitRequestMessage = 8; public static final int GroupQuitResponseMessage = 9; public static final int GroupChatRequestMessage = 10; public static final int GroupChatResponseMessage = 11; public static final int GroupMemberRequestMessage = 12; public static final int GroupMemberResponseMessage = 13; private static final Map<Integer, Class<?>> messageClassses = new HashMap<>(); static{ } }
/** * 登录请求消息 */ @Data @ToString(callSuper = true) public class LoginRequestMessage extends Message{ private String username; private String password; private String nickname; public LoginRequestMessage(){} public LoginRequestMessage(String username, String password, String nickname){ this.username = username; this.password = password; this.nickname = nickname; } @Override public int getMessageType(){ return LoginRequestMessage; } }
/** * 编码解码器 */ @Slf4j public class MessageCodec extends ByteToMessageCodec<Message>{ @Override protected void encode(ChannelHandlerContext ctx, Message msg, ByteBuf out) throws Exception{ // 1、 4字节的魔数 out.writeBytes(new byte[]{1, 2, 3, 4}); // 2、 1字节的版本 out.writeByte(1); // 3、 1字节的序列化方式 jdk 0, json 1 out.writeByte(0); // 4、 1字节的指令类型 out.writeByte(msg.getMessageType()); // 5、 4个字节 out.writeInt(msg.getSequenceId()); // 无意义,对其填充 out.writeByte(0xff); // 6、 获取内容的字节数组 ByteArrayOutputStream bos = new ByteArrayOutputStream(); ObjectOutputStream oos = new ObjectOutputStream(bos); oos.writeObject(msg); byte[] bytes = bos.toByteArray(); // 7、 长度 out.writeInt(bytes.length); // 8、 写入内容 out.writeBytes(bytes); } @Override protected void decode(ChannelHandlerContext ctx, ByteBuf in, List<Object> out) throws Exception{ int magicNum = in.readInt(); byte version = in.readByte(); byte serializerType = in.readByte(); byte messageType = in.readByte(); int sequenceId = in.readInt(); in.readByte(); int length = in.readInt(); byte[] bytes = new byte[length]; in.readBytes(bytes, 0, length); if(serializerType == 0){ ObjectInputStream ois = new ObjectInputStream(new ByteArrayInputStream(bytes)); Message message = (Message) ois.readObject(); } log.debug("{},{},{},{},{},{}", magicNum, version, serializerType, messageType, sequenceId, length); log.debug("{}", message); out.add(message); // 为了给下一个handler使用 } }
/** * 测试 */ public class TestMessageCodec { public static void main(String[] args) throws Exception{ EmbeddedChannel channel = new EmbeddedChannel( new LoggingHandler(), // 解决decode中的黏包半包问题 new LengthFieldBasedFrameDecoder(1024, 12, 4, 0, 0), new MessageCodec() ); // encode LoginRequestMessage message = new LoginRequestMessage("zhangsan", "123", "张三"); channel.writeOutbound(message); // decode ByteBuf buf = ByteBufAllocator.DEFAULT.buffer(); new MessageCodec().encode(null, message, buf); // 入站 channel.writeInbound(buf); } }
  • Netty

    Netty 是一个基于 NIO 的客户端-服务器编程框架,使用 Netty 可以让你快速、简单地开发出一个可维护、高性能的网络应用,例如实现了某种协议的客户、服务端应用。

    49 引用 • 33 回帖 • 38 关注
  • IO
    8 引用 • 20 回帖
  • 网络
    142 引用 • 184 回帖 • 4 关注

相关帖子

1 回帖

欢迎来到这里!

我们正在构建一个小众社区,大家在这里相互信任,以平等 • 自由 • 奔放的价值观进行分享交流。最终,希望大家能够找到与自己志同道合的伙伴,共同成长。

注册 关于
请输入回帖内容 ...
  • amethystfob
    作者

    1、居然有外链图不能推送社区 2、居然只有推到社区的文章才能评论。。。很无语的设定。。