前言
本文结合了网上一些资料 结合自身经验和实战整理出来的
SpringBoot 集成
添加依赖
<dependency> <groupId>io.netty</groupId> <artifactId>netty-all</artifactId> <version>4.1.36.Final</version> </dependency>
创建一个 NettyService
以下代码直接复制就行 这里不是重点
重点在 NettyServerHandlerInitializer 这个类里面
@Service @Slf4j public class NettyService { @Value("${netty.port}") private Integer port; /** * boss 线程组用于处理连接工作 */ private EventLoopGroup boss = new NioEventLoopGroup(); /** * work 线程组用于数据处理 */ private EventLoopGroup work = new NioEventLoopGroup(); @Autowired private NettyServerHandlerInitializer nettyServerHandlerInitializer; /** * SpringBoot 启动的时候 调用 * @throws InterruptedException */ @PostConstruct public void init() throws InterruptedException { ServerBootstrap bootstrap = new ServerBootstrap(); bootstrap.group(boss, work) // 指定Channel .channel(NioServerSocketChannel.class) //使用指定的端口设置套接字地址 .localAddress(new InetSocketAddress(port)) //服务端可连接队列数,对应TCP/IP协议listen函数中backlog参数 .option(ChannelOption.SO_BACKLOG, 1024) //设置TCP长连接,一般如果两个小时内没有数据的通信时,TCP会自动发送一个活动探测数据报文 .childOption(ChannelOption.SO_KEEPALIVE, true) //将小的数据包包装成更大的帧进行传送,提高网络的负载,即TCP延迟传输 .childOption(ChannelOption.TCP_NODELAY, true) .childHandler(nettyServerHandlerInitializer); ChannelFuture future = bootstrap.bind().sync(); if (future.isSuccess()) { log.info("启动 Netty Server"); } } /** * SpringBoot 销毁的时候 调用 * @throws InterruptedException */ @PreDestroy public void destory() throws InterruptedException { boss.shutdownGracefully().sync(); work.shutdownGracefully().sync(); log.info("关闭Netty"); } }
配置 Netty
Netty 的服务 初始化类
@Component public class NettyServerHandlerInitializer extends ChannelInitializer<Channel> { /** * 换行解码器 最大解码长度 */ private final int messageMaxLength = 1024; /** * 初始化通道 * @param channel * @throws Exception */ @Override protected void initChannel(Channel channel) throws Exception { channel.pipeline() //添加编码器 .addLast(new NettyMessageEncode()) //添加Netty 自带的 换行解码器(用来解决 沾包,拆包) 详细见 https://juejin.im/post/5b67902f6fb9a04fc67c1a24 .addLast(new LineBasedFrameDecoder(messageMaxLength)) //添加自定义的 解码器 .addLast( new NettyMessageDecode()) //添加 接收消息的 处理器 .addLast(new ServiceMessageReceiveHandler()); } }
Netty 编码器
这个环节很重要
第一点,TCP 通讯常见问题存在沾包,拆包 这个 Netty 自带解决方案(例:LineBasedFrameDecoder)
第二点, 利用编码器,我们可以直接用 channel write(object),然后由编码转换为字节
第三点,利用解码器,我们可以由解码器,解码字节,并转换为 JavaBean,然后直接在 Handler 处理
第四点,如果添加了 LineBasedFrameDecoder,那么在给服务端发送消息的时候,结尾要发送换行符
自定义编码器
拦截相应 ImMessageEntity 类型的数据,然后使用 ByteBufUtil 转换为 ByteBuf
@Slf4j public class NettyMessageEncode extends MessageToMessageEncoder<ImMessageEntity> { @Override protected void encode(ChannelHandlerContext channelHandlerContext, ImMessageEntity imMessageEntity, List<Object> list) throws Exception { // 转换为JSON String json = GsonPlugin.toJson(imMessageEntity) + "\n"; log.info(json); list.add(ByteBufUtil.encodeString(channelHandlerContext.alloc(), CharBuffer.wrap(json), Charset.defaultCharset())); } }
自定义解码器
拦截 ByteBuf 数据,编码为 String,并且解析 JSON 为 JavaBean
@Slf4j public class NettyMessageDecode extends MessageToMessageDecoder<ByteBuf> { @Override protected void decode(ChannelHandlerContext ctx, ByteBuf msg, List<Object> list) throws Exception { String message = msg.toString(Charset.defaultCharset()); log.info(message); ImMessageEntity imMessageEntity = GsonPlugin.fromJson(message, ImMessageEntity.class); list.add(imMessageEntity); } }
消息处理器
剩下的看这个类的注释就行。
public class ServiceMessageReceiveHandler extends SimpleChannelInboundHandler<ImMessageEntity> { private static final AttributeKey<Long> userId = AttributeKey.newInstance("userId"); @Override protected void channelRead0(ChannelHandlerContext ctx, ImMessageEntity msg) throws Exception { Channel channel = ctx.channel(); Attribute<Long> attr = channel.attr(userId); System.out.println("消息来自:" + attr.get()); } /** * 通道进入 * @param ctx * @throws Exception */ @Override public void channelRegistered(ChannelHandlerContext ctx) throws Exception { super.channelRegistered(ctx); //假设 这个是用户的ID Long uMid = 1238192381284123L; System.out.println("用户进入:" + uMid); //我们在该通道 放入用户id数据 当然也可以放入其他数据 可以用来绑定用户之类的操作 Channel channel = ctx.channel(); channel.attr(userId).set(uMid); } /** * 通道退出 * @param ctx * @throws Exception */ @Override public void channelUnregistered(ChannelHandlerContext ctx) throws Exception { super.channelUnregistered(ctx); Channel channel = ctx.channel(); Long uMid = channel.attr(userId).get(); System.out.println("用户退出:" + uMid); } /** * 接收到的事件 * @param ctx * @param evt * @throws Exception */ @Override public void userEventTriggered(ChannelHandlerContext ctx, Object evt) throws Exception { super.userEventTriggered(ctx, evt); if (evt instanceof IdleStateEvent) { IdleStateEvent event = (IdleStateEvent) evt; System.out.println("event:" + event.state()); } } /** * 通道异常 应该在此关闭通道 * * @param ctx * @param cause * @throws Exception */ @Override public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception { super.exceptionCaught(ctx, cause); } }
调试工具
Mac OS 的话 可以直接在 App Store 搜索 网络调试工具 那个就可以调试。
欢迎来到这里!
我们正在构建一个小众社区,大家在这里相互信任,以平等 • 自由 • 奔放的价值观进行分享交流。最终,希望大家能够找到与自己志同道合的伙伴,共同成长。
注册 关于