junx-netty 结合 Springboot 快速开发长连接服务端 + 客户端

本贴最后更新于 2294 天前,其中的信息可能已经物是人非

项目目标

  基于 junx-netty,结合 springboot,开发一个支持长连接的服务端和客户端,长连接支持客户端服务端双向心跳检测、网络拆包粘包处理,能够正常发送和接收消息。
首先在项目中引入 junx-netty:

<dependency> <groupId>io.github.junxworks</groupId> <artifactId>junx-netty</artifactId> <version>1.0.6</version> </dependency>

服务器端实现

  1、springboot 的启动类:

@SpringBootApplication public class ServerApplication { public static void main(String[] args) { SpringApplication.run(ServerApplication.class, args); } }

  2、bean 配置文件 Config:

import java.io.IOException; import org.springframework.context.annotation.Bean; import org.springframework.context.annotation.Configuration; import io.github.junxworks.junx.core.lifecycle.Service; import io.github.junxworks.junx.netty.NettyServer; import io.github.junxworks.junx.netty.ServerConfig; import io.github.junxworks.junx.netty.heartbeat.CommonServerHeartbeatHandlerFactory; import io.github.junxworks.junx.netty.initializer.CommonChannelInitializer; @Configuration public class Config { @Bean(name = "nettyServer", initMethod = "start", destroyMethod = "stop") public Service nettyServer( ServerEventHandler handler) throws IOException { ServerConfig config = new ServerConfig(); //服务器配置可以通过spring自定义配置来实现,百度很多例子,搜springboot自定义配置 config.setPort(8080); //标准的通道初始化器,内部封装了基于长度帧的粘包机制、心跳检测 CommonChannelInitializer initializer = new CommonChannelInitializer(handler); //handler是通过外部注入的,实现代码见ServerEventHandler //下面三个可以根据具体情况配置,一般常用AllIdle即可,因为会加入到定时任务中,所以不宜全部设置。 initializer.setAllIdle(config.getAllIdle()); //设置读、写超时 initializer.setReadIdle(config.getReadIdle()); //设置读超时 initializer.setWriteIdle(config.getWriteIdle()); //设置写超时 initializer.setHeartbeatHandlerFactory(new CommonServerHeartbeatHandlerFactory()); //设置服务器端心跳检测 NettyServer server = new NettyServer(config, initializer); server.setName("TestServer"); return server; } }

  3、服务器端业务逻辑实现类 ServerEventHandler:

import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.springframework.stereotype.Component; import io.github.junxworks.junx.core.util.ExceptionUtils; import io.github.junxworks.junx.core.util.StringUtils; import io.github.junxworks.junx.netty.message.IoRequest; import io.github.junxworks.junx.netty.message.IoResponse; import io.github.junxworks.junx.netty.message.MessageConstants; import io.netty.channel.ChannelHandler.Sharable; import io.netty.channel.ChannelHandlerContext; import io.netty.channel.ChannelInboundHandlerAdapter; @Component //自动注入 @Sharable //handler一定是无状态的,支持多线程共享 public class ServerEventHandler extends ChannelInboundHandlerAdapter { private static final Logger logger = LoggerFactory.getLogger(ServerEventHandler.class); private static final byte[] data = "I`m Server!!!".getBytes(); public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception { IoRequest req = new IoRequest().readFromBytes((byte[]) msg); //IoRequest对象是封装的标准序列化请求对象,支持属性扩展 System.out.println(new String(req.getData())); IoResponse res = new IoResponse();//IoRequest对象是封装的标准序列化应答对象,支持属性扩展 res.setRequestId(req.getId()); res.setServerAddr(ctx.channel().localAddress().toString()); res.setData(data); ctx.writeAndFlush(res.toBytes()); } @Override public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception { if (ctx.channel().isActive()) { IoResponse response = new IoResponse(); response.setRequestId("unknown"); response.setStatusCode(MessageConstants.STATUS_CODE_SERVER_INTERNAL_ERROR); String errorMsg = ExceptionUtils.getCauseMessage(cause); response.setData(StringUtils.isNull(errorMsg) ? "Server internal error.".getBytes() : errorMsg.getBytes()); response.setServerAddr(ctx.channel().localAddress().toString()); ctx.writeAndFlush(response.toBytes()); } logger.error("Netty handler catch exption.", cause); } }

  4、服务器端测试类编写,基于 Junit 的测试类:

@RunWith(SpringRunner.class) @SpringBootTest(classes = ServerApplication.class) public class ServerTest { @Test public void serverTest() throws Exception { new CountDownLatch(1).await(); //block住主线程 } }

  执行 ServerTest,将会看到如下提示:
imagepng
服务器端是以非守护线程执行的,不会 block 住主线程。

客户端实现

  客户端支持长连接,客户端的连接获取是基于连接池的方式获取,连接的调用支持同步和异步调用,同步调用会 block 住当前线程,直到服务器返回。异步调用的回调方法,是由 netty 的 worker 线程调用,因此不能执行耗时太长的操作。客户端的代码如下:
  1、ClientTest 类,其中 syncRequestTest 为同步请求服务器端,asyncRequestTest 为异步请求服务器端。

import java.net.InetSocketAddress; import org.junit.Test; import io.github.junxworks.junx.netty.call.CallFuture; import io.github.junxworks.junx.netty.call.CallUtils; import io.github.junxworks.junx.netty.heartbeat.CommonClientHeartbeatHandlerFactory; import io.github.junxworks.junx.netty.initializer.CommonChannelInitializer; import io.github.junxworks.junx.netty.message.IoRequest; import io.github.junxworks.junx.netty.message.IoResponse; import io.github.junxworks.junx.netty.pool.NettyChannelPool; import io.github.junxworks.junx.netty.pool.NettyChannelPoolManager; import io.netty.channel.Channel; public class ClientTest { private String host = "localhost"; private int port = 8080; private byte[] data = "michael".getBytes(); /** * 同步调用测试 */ @Test public void syncRequestTest() throws Exception { NettyChannelPoolManager poolManager = new NettyChannelPoolManager(); poolManager.start(); InetSocketAddress serverAddr = InetSocketAddress.createUnresolved(host, port); NettyChannelPool pool = poolManager.getPool(serverAddr); if (pool == null) { CommonChannelInitializer initializer = new CommonChannelInitializer(new ClientChannelHandler()); //通用通道初始化器跟server的类似 initializer.setHeartbeatHandlerFactory(new CommonClientHeartbeatHandlerFactory());//长连接,客户端心跳检测 pool = poolManager.getPool(serverAddr, initializer); } Channel ch = pool.acquire(1000); try { IoRequest req = new IoRequest(); req.setRequestTimeout(1000); req.setData(data); CallFuture<IoResponse> syn = CallUtils.call(ch, req); IoResponse res = syn.get(); //线程同步等待应答 System.out.println("[Sync] Handler request id:" + res.getRequestId() + " result:" + new String(res.getData())); } finally { pool.release(ch); } poolManager.stop(); } /** * 异步调用测试 * @throws Exception */ @Test public void asyncRequestTest() throws Exception { NettyChannelPoolManager poolManager = new NettyChannelPoolManager(); poolManager.start(); InetSocketAddress serverAddr = InetSocketAddress.createUnresolved(host, port); NettyChannelPool pool = poolManager.getPool(serverAddr); if (pool == null) { CommonChannelInitializer initializer = new CommonChannelInitializer(new ClientChannelHandler()); //通用通道初始化器跟server的类似 initializer.setHeartbeatHandlerFactory(new CommonClientHeartbeatHandlerFactory());//长连接,客户端心跳检测 pool = poolManager.getPool(serverAddr, initializer); } Channel ch = pool.acquire(1000); try { IoRequest req = new IoRequest(); req.setRequestTimeout(1000); req.setData(data); CallUtils.call(ch, req, new AsyncCallback(req)); //异步回调 } finally { pool.release(ch); } Thread.sleep(1000); //block主线程 poolManager.stop(); } }

  2、客户端业务处理类 ClientChannelHandler:

import io.github.junxworks.junx.netty.call.ReferenceManager; import io.github.junxworks.junx.netty.message.IoResponse; import io.netty.channel.ChannelHandler.Sharable; import io.netty.channel.ChannelHandlerContext; import io.netty.channel.ChannelInboundHandlerAdapter; @Sharable //业务处理类必须无状态 public class ClientChannelHandler extends ChannelInboundHandlerAdapter { @Override public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception { IoResponse res = (IoResponse) new IoResponse().readFromBytes((byte[]) msg); ReferenceManager.future(res.getUUID(), ctx, res); } @Override public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception { cause.printStackTrace(); ctx.close(); } }

  3、客户端异步调用的回调函数实现类 AsyncCallback:

import io.github.junxworks.junx.core.util.StringUtils; import io.github.junxworks.junx.netty.call.Callback; import io.github.junxworks.junx.netty.message.IoRequest; import io.github.junxworks.junx.netty.message.IoResponse; import io.netty.channel.ChannelHandlerContext; public class AsyncCallback implements Callback<IoResponse> { private IoRequest req; public AsyncCallback(IoRequest req) { this.req = req; } @Override public void callback(ChannelHandlerContext ctx, IoResponse t) { System.out.println("[async] Handler request id:" + t.getRequestId() + " result:" + new String(t.getData())); } @Override public void dead() { System.out.println(StringUtils.format("Request \"%s\" is dead.", req.getId())); } }

  demo 的源码可以参考:https://github.com/junxworks/junx/tree/master/junx-sample/src/main/java/io/github/junxworks/junx/test/netty

  • Netty

    Netty 是一个基于 NIO 的客户端-服务器编程框架,使用 Netty 可以让你快速、简单地开发出一个可维护、高性能的网络应用,例如实现了某种协议的客户、服务端应用。

    49 引用 • 33 回帖 • 35 关注
  • Java

    Java 是一种可以撰写跨平台应用软件的面向对象的程序设计语言,是由 Sun Microsystems 公司于 1995 年 5 月推出的。Java 技术具有卓越的通用性、高效性、平台移植性和安全性。

    3200 引用 • 8215 回帖 • 2 关注
  • Junx
    2 引用

相关帖子

欢迎来到这里!

我们正在构建一个小众社区,大家在这里相互信任,以平等 • 自由 • 奔放的价值观进行分享交流。最终,希望大家能够找到与自己志同道合的伙伴,共同成长。

注册 关于
请输入回帖内容 ...