Netty 进阶
一、黏包与半包
1.1 黏包现象
/**
* 服务端
*/
@Slf4j
public class HelloWorldServer{
void start(){
NioEventLoopGroup boss = new NioEventLoopGroup();
NioEventLoopGroup worker = new NioEventLoopGroup();
try{
ServerBootstrap serverBootstrap = new ServerBootstrap();
serverBootstrap.channel(NioServerSocketChannel.class);
// 将接收缓冲区调小些观察半包现象
serverBootstrap.option(ChannelOption.SO_RCVBUF, 10);
serverBootstrap.group(boss, worker);
serverBootstrap.childHandler(new ChannelInitializer<SocketChannel>(){
@Override
protected void initChannel(SocketChannel ch) throws Exception{
ch.pipeline().addLast(new LoggingHandler(LogLevel.DEBUG));
}
});
ChannelFuture channelFuture = serverBootstrap.bind(8080).sync();
channelFuture.channel().closeFuture().sync();
} catch(InterruptedException e){
log.error("server error", e);
} finally {
boss.shutdownGracefully();
worker.shutdownGracefully();
}
}
}
/**
* 客户端
*/
public class HelloWorldClient{
static final Logger log = LoggerFactory.getLogger(HelloWorldClient.class);
public static void main(String[] args){
NioEventLoopGroup worker = new NioEventLoopGroup();
try{
Bootstrap bootstrap = new Bootstrap();
bootstrap.channel(NioSocketChannel.class);
bootstrap.group(worker);
bootstrap.handler(new ChannelInitializer<SocketChannel>(){
@Override
protected void initChannel(SocketChannel ch){
ch.pipeline().addLast(new ChannelInboundHandlerAdapter(){
// 会在连接channel建立成功以后,会触发channelActive事件
@Override
public void channelActive(ChannelHandlerContext ctx){
for(int i = 0; i < 10; i++){
ByteBuf buf = ctx.alloc().buffer(16);
buf.writeBytes(new byte[]{0, 1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12, 13, 14, 15});
ctx.writeAndFlush(buf);
}
}
});
}
});
ChannelFuture channelFuture = bootstrap.connect("127.0.0.1", 8080).sync();
channelFuture.channel().closeFuture().sync();
} catch(InterruptedException e){
log.error("client error", e);
} finally {
worker.shutdownGracefully();
}
}
}
滑动窗口
- TCP 以一个段(segment)为单位,每发送一个段就需要进行一次确认应答(ack)处理,但如果这么做,缺点是包的往返时间越长性能就越差。
- 为了解决此问题,引入了窗口概念,窗口大小即决定了无需等待应答而可以继续发送的数据最大值
- 窗口实际就起到一个缓冲区的作用,同时也能起到流量控制的作用
- 图中深色的部分即要发送的数据,高亮的部分即窗口
- 窗口内的数据才允许被发送,当应答未到达前,窗口必须停止滑动
- 如果 1001~2000 这个段的数据 ack 回来了,窗口就可以向前滑动
- 接收方也会维护一个窗口,只有落在窗口内的数据才能允许接收
Netty 中黏包半包现象分析
黏包
- 现象,发送 abc def,接收 abcdef
- 原因
- 应用层:接收方 ByteBuf 设置太大(Netty 默认 1024)
- 滑动窗口:假设发送方 256byte 表示一个完整报文,但由于接收方处理不及时且窗口大小足够大,这 256bytes 字节就会缓冲在接收方的滑动窗口中,当滑动窗口中缓冲了多个报文就会黏包。 【TCP】
- Nagle 算法:会造成黏包 【TCP】
半包
- 现象,发送 abcdef,接收 abc def
- 原因
- 应用层:接收方 ByteBuf 小于实际发送数据量
- 滑动窗口:假设接收方的窗口只剩了 128bytes,发送方的报文大小是 256bytes,这时放不下了,只能先发送前 128bytes,等待 ack 后才能发送剩余部分,这就造成了半包【TCP】
- MSS 限制:当发送的数据超过 MSS 限制后,会将数据切分发送,就会造成半包
本质是因为TCP是流式协议,消息无边界
黏包半包解决:
- 短连接:可以解决黏包问题,但无法解决半包问题
- 定长解码器:可以解决黏包半包问题,但是占用的字节数比较多,造成浪费
- 行解码器:(采用分割符确定消息边界)
- LTC 解码器:
二、协议设计与解析
2.1 redis
@Slf4j
public class TestRedis{
/**
set name zhangsan
*3
$3
set
$4
name
$8
zhangsan
*/
public static void main(String[] args){
final byte[] LINE = {13, 10}; // 回车 换行
NioEventLoopGroup worker = new NioEventLoopGroup();
try{
Bootstrap bootstrap = new Bootstrap();
bootstrap.channel(NioSocketChannel.class);
bootstrap.group(worker);
bootstrap.handler(new ChannelInitializer<SocketChannel>(){
@Override
protected void initChannel(SocketChannel ch){
ch.pipeline().addLast(new LoggingHandler());
ch.pipeline().addLast(new ChannelInboundHandlerAdapter(){
@Override
public void channelActive(ChannelHandlerContext ctx){
ByteBuf buf = ctx.alloc().buffer();
buf.writeBytes("*3".getBytes());
buf.writeBytes(LINE);
buf.writeBytes("$3".getBytes());
buf.writeBytes(LINE);
buf.writeBytes("set".getBytes());
buf.writeBytes(LINE);
buf.writeBytes("$4".getBytes());
buf.writeBytes(LINE);
buf.writeBytes("name".getBytes());
buf.writeBytes(LINE);
buf.writeBytes("$8".getBytes());
buf.writeBytes(LINE);
buf.writeBytes("zhangsan".getBytes());
buf.writeBytes(LINE);
ctx.writeAndFlush(buf);
}
@Override
public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception{
ByteBuf buf = (ByteBuf) msg;
sout(buf.toString(Charset.defaultCharset()));
}
});
}
});
ChannelFuture channelFuture = bootstrap.connect("localhost", 6379).sync();
channelFuture.channel().closeFuture().sync();
} catch(InterruptedException e){
log.error("client error", e);
} finally {
worker.shutdownGracefully();
}
}
}
2.2 http
@Slf4j
public class TestHttp{
public static void main(String[] args){
NioEventLoopGroup boss = new NioEventLoopGroup();
NioEventLoopGroup worker = new NioEventLoopGroup();
try{
ServerBootstrap serverBootstrap = new ServerBootstrap();
serverBootstrap.channel(NioServerSocketChannel.class);
serverBootstrap.group(boss, worker);
serverBootstrap.childHandler(new ChannelInitializer<SocketChannel>(){
@Override
protected void initChannel(SocketChannel ch) throws Exception{
ch.pipeline().addLast(new LoggingHandler(LogLevel.DEBUG));
ch.pipeline().addLast(new HttpServerCodec());
// 上一个handler传入下一个handler特定的类型,下例只有符合HttpRequest类型的消息会被处理
ch.pipeline().addLast(new SimpleChannelInboundHandler<HttpRequest>(){
@Override
protected void channelRead0(ChannelHandlerContext ctx, HttpRequest msg) throws Exception{
// 获取请求
log.debug(msg.uri());
log.debug(msg.headers());
// 返回响应
DefaultFullHttpResponse response = new DefaultFullHttpResponse(msg.protocolVersion(), HttpResponseStatus.OK);
byte[] bytes = "<h1>hell world</h1>".getBytes();
// 如果不设置长度浏览器会一直等待获取资源
response.headers().setInt(CONTENT_LENGTH, bytes.length);
response.content().writeBytes(bytes);
// 写回响应
ctx.writeAndFlush(response);
}
});
/**
ch.pipeline().addLast(new ChannelInboundHandlerAdapter(){
@Override
public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception{
log.debug("{}", msg.getClass());
if(msg instanceof HttpRequest){ // 请求行 请求头
} else if(msg instanceof HttpContent){ // 请求体
}
}
});
*/
}
});
ChannelFuture channelFuture = serverBootstrap.bind(8080).sync();
channelFuture.channel().closeFuture().sync();
} catch(InterruptedException e){
log.error("server error", e);
} finally{
boss.shutdownGracefully();
worker.shutdownGracefully();
}
}
}
2.3 自定义协议要素
- 魔数:一般发送的头几个都是数字都是魔数,例如 Java 的二进制字节码的起始 8 个字节就是魔数,用来第一时间判定是否是无效数据包。(例如 cafebabe)
- 版本号:可以支持协议升级
- 序列化算法:消息正文采用哪种序列化反序列化方式,可以由此扩展,例如:json、protobuf(google)、hessian、jdk(不能跨平台、性能一般)
- 指令类型:是登录、注册、单聊、群聊...跟业务相关
- 请求序号:为了双工通信,提供异步能力
- 正文长度
- 消息正文
2.4 自定义消息编码解码
@Data
public abstract Message implements Serializable{
public static Class<?> getMessageClass(int messageType){
return messageClasses.get(messageType);
}
private int sequenceId;
private int messageType;
public abstract int getMessageType();
public static final int LoginRequestMessage = 0; // 登录请求消息
public static final int LoginResponseMessage = 1; // 登录响应消息
public static final int ChatRequestMessage = 2; // 发送一条聊天信息
public static final int ChatResponseMessage = 3; // 获取一条聊天信息
public static final int GroupCreateRequestMessage = 4; // 创建聊天室的请求
public static final int GroupCreateResponseMessage = 5; // 创建聊天室的响应
public static final int GroupJoinRequestMessage = 6;
public static final int GroupJoinResponseMessage = 7;
public static final int GroupQuitRequestMessage = 8;
public static final int GroupQuitResponseMessage = 9;
public static final int GroupChatRequestMessage = 10;
public static final int GroupChatResponseMessage = 11;
public static final int GroupMemberRequestMessage = 12;
public static final int GroupMemberResponseMessage = 13;
private static final Map<Integer, Class<?>> messageClassses = new HashMap<>();
static{
}
}
/**
* 登录请求消息
*/
@Data
@ToString(callSuper = true)
public class LoginRequestMessage extends Message{
private String username;
private String password;
private String nickname;
public LoginRequestMessage(){}
public LoginRequestMessage(String username, String password, String nickname){
this.username = username;
this.password = password;
this.nickname = nickname;
}
@Override
public int getMessageType(){
return LoginRequestMessage;
}
}
/**
* 编码解码器
*/
@Slf4j
public class MessageCodec extends ByteToMessageCodec<Message>{
@Override
protected void encode(ChannelHandlerContext ctx, Message msg, ByteBuf out) throws Exception{
// 1、 4字节的魔数
out.writeBytes(new byte[]{1, 2, 3, 4});
// 2、 1字节的版本
out.writeByte(1);
// 3、 1字节的序列化方式 jdk 0, json 1
out.writeByte(0);
// 4、 1字节的指令类型
out.writeByte(msg.getMessageType());
// 5、 4个字节
out.writeInt(msg.getSequenceId());
// 无意义,对其填充
out.writeByte(0xff);
// 6、 获取内容的字节数组
ByteArrayOutputStream bos = new ByteArrayOutputStream();
ObjectOutputStream oos = new ObjectOutputStream(bos);
oos.writeObject(msg);
byte[] bytes = bos.toByteArray();
// 7、 长度
out.writeInt(bytes.length);
// 8、 写入内容
out.writeBytes(bytes);
}
@Override
protected void decode(ChannelHandlerContext ctx, ByteBuf in, List<Object> out) throws Exception{
int magicNum = in.readInt();
byte version = in.readByte();
byte serializerType = in.readByte();
byte messageType = in.readByte();
int sequenceId = in.readInt();
in.readByte();
int length = in.readInt();
byte[] bytes = new byte[length];
in.readBytes(bytes, 0, length);
if(serializerType == 0){
ObjectInputStream ois = new ObjectInputStream(new ByteArrayInputStream(bytes));
Message message = (Message) ois.readObject();
}
log.debug("{},{},{},{},{},{}", magicNum, version, serializerType, messageType, sequenceId, length);
log.debug("{}", message);
out.add(message); // 为了给下一个handler使用
}
}
/**
* 测试
*/
public class TestMessageCodec {
public static void main(String[] args) throws Exception{
EmbeddedChannel channel = new EmbeddedChannel(
new LoggingHandler(),
// 解决decode中的黏包半包问题
new LengthFieldBasedFrameDecoder(1024, 12, 4, 0, 0),
new MessageCodec()
);
// encode
LoginRequestMessage message = new LoginRequestMessage("zhangsan", "123", "张三");
channel.writeOutbound(message);
// decode
ByteBuf buf = ByteBufAllocator.DEFAULT.buffer();
new MessageCodec().encode(null, message, buf);
// 入站
channel.writeInbound(buf);
}
}
欢迎来到这里!
我们正在构建一个小众社区,大家在这里相互信任,以平等 • 自由 • 奔放的价值观进行分享交流。最终,希望大家能够找到与自己志同道合的伙伴,共同成长。
注册 关于