Netty 处理消息的粘包和拆包

本贴最后更新于 2092 天前,其中的信息可能已经天翻地覆

Netty 通过固定的消息头处理消息的粘包和拆包

对于消息的拆包和粘包,通用的做法就是用一个消息头部来描述消息内容的长度。Netty 提供了一个 LengthFieldBasedFrameDecoder Decoder,可以非常方便的去处理各种带有长度头的私有协议。

LengthFieldBasedFrameDecoder

构造函数

public LengthFieldBasedFrameDecoder(int maxFrameLength,int lengthFieldOffset, int lengthFieldLength)
public LengthFieldBasedFrameDecoder(int maxFrameLength,int lengthFieldOffset, int lengthFieldLength,int lengthAdjustment, int initialBytesToStrip)
public LengthFieldBasedFrameDecoder(int maxFrameLength, int lengthFieldOffset, int lengthFieldLength,int lengthAdjustment, int initialBytesToStrip, boolean failFast)
public LengthFieldBasedFrameDecoder(ByteOrder byteOrder, int maxFrameLength, int lengthFieldOffset, int lengthFieldLength,int lengthAdjustment, int initialBytesToStrip, boolean failFast)
  • byteOrder

    • 网络字节序
  • maxFrameLength

    • 单个包最大的长度,这个值根据实际场景而定
  • lengthFieldOffset

    • 表示数据长度字段开始的偏移量
    • 第几个字节开始,是表示数据长度的(0)
  • lengthFieldLength

    • 数据长度字段的所占的字节数
  • lengthAdjustment

    • 添加到长度字段的补偿值(0)
    • lengthAdjustment + 数据长度取值 = 数据长度字段之后剩下包的字节数
    • 对于某些协议,长度字段还包含了消息头的长度,在这种应用场景中,往往需要使用 lengthAdjustment 进行修正,由于整个消息(包含消息头)的长度往往大于消息体的长度,所以它要设置为负数(数据长度字段的长度取负)
  • initialBytesToStrip

    • 表示从整个包第一个字节开始,向后忽略的字节数(0)
    • 可以设置该参数,来忽略掉包头信息,仅仅留下数据包体,给下一个 Handler 处理

各种不同的协议,与 LengthFieldBasedFrameDecoder 的实现

与LengthFieldBasedFrameDecoder 的官方 doc 中就有各种协议及其实现的详细描述

# 协议1
lengthFieldOffset   = 0
lengthFieldLength   = 2		// 通用的前面2字节表示数据长度
lengthAdjustment    = 0
initialBytesToStrip = 0 

BEFORE DECODE (14 bytes)         AFTER DECODE (14 bytes)
+--------+----------------+      +--------+----------------+
| Length | Actual Content |----->| Length | Actual Content |
| 0x000C | "HELLO, WORLD" |      | 0x000C | "HELLO, WORLD" |
+--------+----------------+      +--------+----------------+

# 协议2
lengthFieldOffset   = 0
lengthFieldLength   = 2
lengthAdjustment    = 0
initialBytesToStrip = 2	// 丢弃前面2字节的包头

BEFORE DECODE (14 bytes)         AFTER DECODE (12 bytes)
+--------+----------------+      +----------------+
| Length | Actual Content |----->| Actual Content |
| 0x000C | "HELLO, WORLD" |      | "HELLO, WORLD" |
+--------+----------------+      +----------------+

# 协议3
lengthFieldOffset   =  0
lengthFieldLength   =  2
lengthAdjustment    = -2 // 长度头表示的长度,包含了自身头部的长度
initialBytesToStrip =  0

BEFORE DECODE (14 bytes)         AFTER DECODE (14 bytes)
+--------+----------------+      +--------+----------------+
| Length | Actual Content |----->| Length | Actual Content |
| 0x000E | "HELLO, WORLD" |      | 0x000E | "HELLO, WORLD" |
+--------+----------------+      +--------+----------------+

# 协议4
lengthFieldOffset   = 2	// 表示消息长度的头,不在首部
lengthFieldLength   = 3
lengthAdjustment    = 0
initialBytesToStrip = 0

BEFORE DECODE (17 bytes)                      AFTER DECODE (17 bytes)
+----------+----------+----------------+      +----------+----------+----------------+
| Header 1 |  Length  | Actual Content |----->| Header 1 |  Length  | Actual Content |
|  0xCAFE  | 0x00000C | "HELLO, WORLD" |      |  0xCAFE  | 0x00000C | "HELLO, WORLD" |
+----------+----------+----------------+      +----------+----------+----------------+

# 协议5
lengthFieldOffset   = 0
lengthFieldLength   = 3
lengthAdjustment    = 2	// 整个消息体的长度,还要包含一个头部的长度,因为后面还有一个头部
initialBytesToStrip = 0

BEFORE DECODE (17 bytes)                      AFTER DECODE (17 bytes)
+----------+----------+----------------+      +----------+----------+----------------+
|  Length  | Header 1 | Actual Content |----->|  Length  | Header 1 | Actual Content |
| 0x00000C |  0xCAFE  | "HELLO, WORLD" |      | 0x00000C |  0xCAFE  | "HELLO, WORLD" |
+----------+----------+----------------+      +----------+----------+----------------+

# 协议6
lengthFieldOffset   = 1 (= the length of HDR1)			// 第2个字节表示数据长度
lengthFieldLength   = 2								
lengthAdjustment    = 1 (= the length of HDR2)			// 除此之外,还有1个字节的消息头
initialBytesToStrip = 3 (= the length of HDR1 + LEN)	// 移除前面3个字节的数据

BEFORE DECODE (16 bytes)                       AFTER DECODE (13 bytes)
+------+--------+------+----------------+      +------+----------------+
| HDR1 | Length | HDR2 | Actual Content |----->| HDR2 | Actual Content |
| 0xCA | 0x000C | 0xFE | "HELLO, WORLD" |      | 0xFE | "HELLO, WORLD" |
+------+--------+------+----------------+      +------+----------------+

# 协议7
lengthFieldOffset   =  1
lengthFieldLength   =  2								
lengthAdjustment    = -3 (= the length of HDR1 + LEN, negative)	//长度头表示的是整个消息体的长度
initialBytesToStrip =  3

BEFORE DECODE (16 bytes)                       AFTER DECODE (13 bytes)
+------+--------+------+----------------+      +------+----------------+
| HDR1 | Length | HDR2 | Actual Content |----->| HDR2 | Actual Content |
| 0xCA | 0x0010 | 0xFE | "HELLO, WORLD" |      | 0xFE | "HELLO, WORLD" |
+------+--------+------+----------------+      +------+----------------+

Demo

服务端

import java.net.InetSocketAddress;
import java.nio.charset.StandardCharsets;
import java.security.cert.CertificateException;

import javax.net.ssl.SSLException;

import io.netty.bootstrap.ServerBootstrap;
import io.netty.buffer.ByteBuf;
import io.netty.channel.ChannelFuture;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.ChannelInitializer;
import io.netty.channel.ChannelOption;
import io.netty.channel.ChannelPipeline;
import io.netty.channel.EventLoopGroup;
import io.netty.channel.SimpleChannelInboundHandler;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.SocketChannel;
import io.netty.channel.socket.nio.NioServerSocketChannel;
import io.netty.handler.codec.LengthFieldBasedFrameDecoder;

public class Server {

	public static void main(String[] args) throws CertificateException, SSLException, InterruptedException {

		EventLoopGroup bossEventLoopGroup = new NioEventLoopGroup();
		EventLoopGroup workerEventLoopGroup = new NioEventLoopGroup();
		try {
			ServerBootstrap serverBootstrap = new ServerBootstrap();
			serverBootstrap.group(bossEventLoopGroup, workerEventLoopGroup);
			serverBootstrap.channel(NioServerSocketChannel.class);
			serverBootstrap.option(ChannelOption.SO_REUSEADDR, true);

			serverBootstrap.childOption(ChannelOption.SO_KEEPALIVE, true);
			serverBootstrap.childHandler(new ChannelInitializer<SocketChannel>() {
				@Override
				protected void initChannel(SocketChannel ch) throws Exception {
					ChannelPipeline channelPipeline = ch.pipeline();
					// 头偏移:0,头长度:4,长度补充:0,丢弃:4
					channelPipeline.addLast(new LengthFieldBasedFrameDecoder(Integer.MAX_VALUE,0,4,0,4));
					channelPipeline.addLast(new SimpleChannelInboundHandler<ByteBuf>() {
						@Override
						protected void channelRead0(ChannelHandlerContext ctx, ByteBuf msg) throws Exception {
							// msg里面只剩下消息体
							System.out.println(msg.toString(StandardCharsets.UTF_8));
						}
					});
				}
			});

			ChannelFuture channelFuture = serverBootstrap.bind(new InetSocketAddress(1024)).sync();
			channelFuture.channel().closeFuture().sync();
		} finally {
			bossEventLoopGroup.shutdownGracefully();
			workerEventLoopGroup.shutdownGracefully();
		}
	}
}

客户端

import socket
import struct

connector = socket.socket(family=socket.AF_INET, type=socket.SOCK_STREAM)
connector.connect(('127.0.0.1', 1024))

# 对消息编码,添加4个字节长消息头
def encode(data):
    data = bytes(data,'UTF_8')
    length = len(data)
    return struct.pack('>I%ds'%(length),length,data)

# 可以对消息进行解码,分离消息头和消息
def decode(data):
    length = len(data) - 4
    return struct.unpack('>I%ds'%(length),data)

while True:
    connector.send(encode(input('输入消息内容\n')))

成功解决了消息的粘包和拆包

imagepng

imagepng

Netty 提供的其他一些解决方案

LineBasedFrameDecoder

根据换行符来分割消息

DelimiterBasedFrameDecoder

根据指定分隔符来分割消息

FixedLengthFrameDecoder

固定消息的长度

  • Netty

    Netty 是一个基于 NIO 的客户端-服务器编程框架,使用 Netty 可以让你快速、简单地开发出一个可维护、高性能的网络应用,例如实现了某种协议的客户、服务端应用。

    49 引用 • 33 回帖 • 19 关注

相关帖子

欢迎来到这里!

我们正在构建一个小众社区,大家在这里相互信任,以平等 • 自由 • 奔放的价值观进行分享交流。最终,希望大家能够找到与自己志同道合的伙伴,共同成长。

注册 关于
请输入回帖内容 ...