前言
本文结合了网上一些资料 结合自身经验和实战整理出来的
SpringBoot 集成
添加依赖
<dependency>
<groupId>io.netty</groupId>
<artifactId>netty-all</artifactId>
<version>4.1.36.Final</version>
</dependency>
创建一个 NettyService
以下代码直接复制就行 这里不是重点
重点在 NettyServerHandlerInitializer 这个类里面
@Service
@Slf4j
public class NettyService {
@Value("${netty.port}")
private Integer port;
/**
* boss 线程组用于处理连接工作
*/
private EventLoopGroup boss = new NioEventLoopGroup();
/**
* work 线程组用于数据处理
*/
private EventLoopGroup work = new NioEventLoopGroup();
@Autowired
private NettyServerHandlerInitializer nettyServerHandlerInitializer;
/**
* SpringBoot 启动的时候 调用
* @throws InterruptedException
*/
@PostConstruct
public void init() throws InterruptedException {
ServerBootstrap bootstrap = new ServerBootstrap();
bootstrap.group(boss, work)
// 指定Channel
.channel(NioServerSocketChannel.class)
//使用指定的端口设置套接字地址
.localAddress(new InetSocketAddress(port))
//服务端可连接队列数,对应TCP/IP协议listen函数中backlog参数
.option(ChannelOption.SO_BACKLOG, 1024)
//设置TCP长连接,一般如果两个小时内没有数据的通信时,TCP会自动发送一个活动探测数据报文
.childOption(ChannelOption.SO_KEEPALIVE, true)
//将小的数据包包装成更大的帧进行传送,提高网络的负载,即TCP延迟传输
.childOption(ChannelOption.TCP_NODELAY, true)
.childHandler(nettyServerHandlerInitializer);
ChannelFuture future = bootstrap.bind().sync();
if (future.isSuccess()) {
log.info("启动 Netty Server");
}
}
/**
* SpringBoot 销毁的时候 调用
* @throws InterruptedException
*/
@PreDestroy
public void destory() throws InterruptedException {
boss.shutdownGracefully().sync();
work.shutdownGracefully().sync();
log.info("关闭Netty");
}
}
配置 Netty
Netty 的服务 初始化类
@Component
public class NettyServerHandlerInitializer extends ChannelInitializer<Channel> {
/**
* 换行解码器 最大解码长度
*/
private final int messageMaxLength = 1024;
/**
* 初始化通道
* @param channel
* @throws Exception
*/
@Override
protected void initChannel(Channel channel) throws Exception {
channel.pipeline()
//添加编码器
.addLast(new NettyMessageEncode())
//添加Netty 自带的 换行解码器(用来解决 沾包,拆包) 详细见 https://juejin.im/post/5b67902f6fb9a04fc67c1a24
.addLast(new LineBasedFrameDecoder(messageMaxLength))
//添加自定义的 解码器
.addLast( new NettyMessageDecode())
//添加 接收消息的 处理器
.addLast(new ServiceMessageReceiveHandler());
}
}
Netty 编码器
这个环节很重要
第一点,TCP 通讯常见问题存在沾包,拆包 这个 Netty 自带解决方案(例:LineBasedFrameDecoder)
第二点, 利用编码器,我们可以直接用 channel write(object),然后由编码转换为字节
第三点,利用解码器,我们可以由解码器,解码字节,并转换为 JavaBean,然后直接在 Handler 处理
第四点,如果添加了 LineBasedFrameDecoder,那么在给服务端发送消息的时候,结尾要发送换行符
自定义编码器
拦截相应 ImMessageEntity 类型的数据,然后使用 ByteBufUtil 转换为 ByteBuf
@Slf4j
public class NettyMessageEncode extends MessageToMessageEncoder<ImMessageEntity> {
@Override
protected void encode(ChannelHandlerContext channelHandlerContext, ImMessageEntity imMessageEntity, List<Object> list) throws Exception {
// 转换为JSON
String json = GsonPlugin.toJson(imMessageEntity) + "\n";
log.info(json);
list.add(ByteBufUtil.encodeString(channelHandlerContext.alloc(), CharBuffer.wrap(json), Charset.defaultCharset()));
}
}
自定义解码器
拦截 ByteBuf 数据,编码为 String,并且解析 JSON 为 JavaBean
@Slf4j
public class NettyMessageDecode extends MessageToMessageDecoder<ByteBuf> {
@Override
protected void decode(ChannelHandlerContext ctx, ByteBuf msg, List<Object> list) throws Exception {
String message = msg.toString(Charset.defaultCharset());
log.info(message);
ImMessageEntity imMessageEntity = GsonPlugin.fromJson(message, ImMessageEntity.class);
list.add(imMessageEntity);
}
}
消息处理器
剩下的看这个类的注释就行。
public class ServiceMessageReceiveHandler extends SimpleChannelInboundHandler<ImMessageEntity> {
private static final AttributeKey<Long> userId = AttributeKey.newInstance("userId");
@Override
protected void channelRead0(ChannelHandlerContext ctx, ImMessageEntity msg) throws Exception {
Channel channel = ctx.channel();
Attribute<Long> attr = channel.attr(userId);
System.out.println("消息来自:" + attr.get());
}
/**
* 通道进入
* @param ctx
* @throws Exception
*/
@Override
public void channelRegistered(ChannelHandlerContext ctx) throws Exception {
super.channelRegistered(ctx);
//假设 这个是用户的ID
Long uMid = 1238192381284123L;
System.out.println("用户进入:" + uMid);
//我们在该通道 放入用户id数据 当然也可以放入其他数据 可以用来绑定用户之类的操作
Channel channel = ctx.channel();
channel.attr(userId).set(uMid);
}
/**
* 通道退出
* @param ctx
* @throws Exception
*/
@Override
public void channelUnregistered(ChannelHandlerContext ctx) throws Exception {
super.channelUnregistered(ctx);
Channel channel = ctx.channel();
Long uMid = channel.attr(userId).get();
System.out.println("用户退出:" + uMid);
}
/**
* 接收到的事件
* @param ctx
* @param evt
* @throws Exception
*/
@Override
public void userEventTriggered(ChannelHandlerContext ctx, Object evt) throws Exception {
super.userEventTriggered(ctx, evt);
if (evt instanceof IdleStateEvent) {
IdleStateEvent event = (IdleStateEvent) evt;
System.out.println("event:" + event.state());
}
}
/**
* 通道异常 应该在此关闭通道
*
* @param ctx
* @param cause
* @throws Exception
*/
@Override
public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
super.exceptionCaught(ctx, cause);
}
}
调试工具
Mac OS 的话 可以直接在 App Store 搜索 网络调试工具 那个就可以调试。
欢迎来到这里!
我们正在构建一个小众社区,大家在这里相互信任,以平等 • 自由 • 奔放的价值观进行分享交流。最终,希望大家能够找到与自己志同道合的伙伴,共同成长。
注册 关于