项目目标
基于 junx-netty,结合 springboot,开发一个支持长连接的服务端和客户端,长连接支持客户端服务端双向心跳检测、网络拆包粘包处理,能够正常发送和接收消息。
首先在项目中引入 junx-netty:
<dependency>
<groupId>io.github.junxworks</groupId>
<artifactId>junx-netty</artifactId>
<version>1.0.6</version>
</dependency>
服务器端实现
1、springboot 的启动类:
@SpringBootApplication
public class ServerApplication {
public static void main(String[] args) {
SpringApplication.run(ServerApplication.class, args);
}
}
2、bean 配置文件 Config:
import java.io.IOException;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import io.github.junxworks.junx.core.lifecycle.Service;
import io.github.junxworks.junx.netty.NettyServer;
import io.github.junxworks.junx.netty.ServerConfig;
import io.github.junxworks.junx.netty.heartbeat.CommonServerHeartbeatHandlerFactory;
import io.github.junxworks.junx.netty.initializer.CommonChannelInitializer;
@Configuration
public class Config {
@Bean(name = "nettyServer", initMethod = "start", destroyMethod = "stop")
public Service nettyServer( ServerEventHandler handler) throws IOException {
ServerConfig config = new ServerConfig(); //服务器配置可以通过spring自定义配置来实现,百度很多例子,搜springboot自定义配置
config.setPort(8080);
//标准的通道初始化器,内部封装了基于长度帧的粘包机制、心跳检测
CommonChannelInitializer initializer = new CommonChannelInitializer(handler); //handler是通过外部注入的,实现代码见ServerEventHandler
//下面三个可以根据具体情况配置,一般常用AllIdle即可,因为会加入到定时任务中,所以不宜全部设置。
initializer.setAllIdle(config.getAllIdle()); //设置读、写超时
initializer.setReadIdle(config.getReadIdle()); //设置读超时
initializer.setWriteIdle(config.getWriteIdle()); //设置写超时
initializer.setHeartbeatHandlerFactory(new CommonServerHeartbeatHandlerFactory()); //设置服务器端心跳检测
NettyServer server = new NettyServer(config, initializer);
server.setName("TestServer");
return server;
}
}
3、服务器端业务逻辑实现类 ServerEventHandler:
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.stereotype.Component;
import io.github.junxworks.junx.core.util.ExceptionUtils;
import io.github.junxworks.junx.core.util.StringUtils;
import io.github.junxworks.junx.netty.message.IoRequest;
import io.github.junxworks.junx.netty.message.IoResponse;
import io.github.junxworks.junx.netty.message.MessageConstants;
import io.netty.channel.ChannelHandler.Sharable;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.ChannelInboundHandlerAdapter;
@Component //自动注入
@Sharable //handler一定是无状态的,支持多线程共享
public class ServerEventHandler extends ChannelInboundHandlerAdapter {
private static final Logger logger = LoggerFactory.getLogger(ServerEventHandler.class);
private static final byte[] data = "I`m Server!!!".getBytes();
public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
IoRequest req = new IoRequest().readFromBytes((byte[]) msg); //IoRequest对象是封装的标准序列化请求对象,支持属性扩展
System.out.println(new String(req.getData()));
IoResponse res = new IoResponse();//IoRequest对象是封装的标准序列化应答对象,支持属性扩展
res.setRequestId(req.getId());
res.setServerAddr(ctx.channel().localAddress().toString());
res.setData(data);
ctx.writeAndFlush(res.toBytes());
}
@Override
public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
if (ctx.channel().isActive()) {
IoResponse response = new IoResponse();
response.setRequestId("unknown");
response.setStatusCode(MessageConstants.STATUS_CODE_SERVER_INTERNAL_ERROR);
String errorMsg = ExceptionUtils.getCauseMessage(cause);
response.setData(StringUtils.isNull(errorMsg) ? "Server internal error.".getBytes() : errorMsg.getBytes());
response.setServerAddr(ctx.channel().localAddress().toString());
ctx.writeAndFlush(response.toBytes());
}
logger.error("Netty handler catch exption.", cause);
}
}
4、服务器端测试类编写,基于 Junit 的测试类:
@RunWith(SpringRunner.class)
@SpringBootTest(classes = ServerApplication.class)
public class ServerTest {
@Test
public void serverTest() throws Exception {
new CountDownLatch(1).await(); //block住主线程
}
}
执行 ServerTest,将会看到如下提示:
服务器端是以非守护线程执行的,不会 block 住主线程。
客户端实现
客户端支持长连接,客户端的连接获取是基于连接池的方式获取,连接的调用支持同步和异步调用,同步调用会 block 住当前线程,直到服务器返回。异步调用的回调方法,是由 netty 的 worker 线程调用,因此不能执行耗时太长的操作。客户端的代码如下:
1、ClientTest 类,其中 syncRequestTest 为同步请求服务器端,asyncRequestTest 为异步请求服务器端。
import java.net.InetSocketAddress;
import org.junit.Test;
import io.github.junxworks.junx.netty.call.CallFuture;
import io.github.junxworks.junx.netty.call.CallUtils;
import io.github.junxworks.junx.netty.heartbeat.CommonClientHeartbeatHandlerFactory;
import io.github.junxworks.junx.netty.initializer.CommonChannelInitializer;
import io.github.junxworks.junx.netty.message.IoRequest;
import io.github.junxworks.junx.netty.message.IoResponse;
import io.github.junxworks.junx.netty.pool.NettyChannelPool;
import io.github.junxworks.junx.netty.pool.NettyChannelPoolManager;
import io.netty.channel.Channel;
public class ClientTest {
private String host = "localhost";
private int port = 8080;
private byte[] data = "michael".getBytes();
/**
* 同步调用测试
*/
@Test
public void syncRequestTest() throws Exception {
NettyChannelPoolManager poolManager = new NettyChannelPoolManager();
poolManager.start();
InetSocketAddress serverAddr = InetSocketAddress.createUnresolved(host, port);
NettyChannelPool pool = poolManager.getPool(serverAddr);
if (pool == null) {
CommonChannelInitializer initializer = new CommonChannelInitializer(new ClientChannelHandler()); //通用通道初始化器跟server的类似
initializer.setHeartbeatHandlerFactory(new CommonClientHeartbeatHandlerFactory());//长连接,客户端心跳检测
pool = poolManager.getPool(serverAddr, initializer);
}
Channel ch = pool.acquire(1000);
try {
IoRequest req = new IoRequest();
req.setRequestTimeout(1000);
req.setData(data);
CallFuture<IoResponse> syn = CallUtils.call(ch, req);
IoResponse res = syn.get(); //线程同步等待应答
System.out.println("[Sync] Handler request id:" + res.getRequestId() + " result:" + new String(res.getData()));
} finally {
pool.release(ch);
}
poolManager.stop();
}
/**
* 异步调用测试
* @throws Exception
*/
@Test
public void asyncRequestTest() throws Exception {
NettyChannelPoolManager poolManager = new NettyChannelPoolManager();
poolManager.start();
InetSocketAddress serverAddr = InetSocketAddress.createUnresolved(host, port);
NettyChannelPool pool = poolManager.getPool(serverAddr);
if (pool == null) {
CommonChannelInitializer initializer = new CommonChannelInitializer(new ClientChannelHandler()); //通用通道初始化器跟server的类似
initializer.setHeartbeatHandlerFactory(new CommonClientHeartbeatHandlerFactory());//长连接,客户端心跳检测
pool = poolManager.getPool(serverAddr, initializer);
}
Channel ch = pool.acquire(1000);
try {
IoRequest req = new IoRequest();
req.setRequestTimeout(1000);
req.setData(data);
CallUtils.call(ch, req, new AsyncCallback(req)); //异步回调
} finally {
pool.release(ch);
}
Thread.sleep(1000); //block主线程
poolManager.stop();
}
}
2、客户端业务处理类 ClientChannelHandler:
import io.github.junxworks.junx.netty.call.ReferenceManager;
import io.github.junxworks.junx.netty.message.IoResponse;
import io.netty.channel.ChannelHandler.Sharable;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.ChannelInboundHandlerAdapter;
@Sharable //业务处理类必须无状态
public class ClientChannelHandler extends ChannelInboundHandlerAdapter {
@Override
public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
IoResponse res = (IoResponse) new IoResponse().readFromBytes((byte[]) msg);
ReferenceManager.future(res.getUUID(), ctx, res);
}
@Override
public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
cause.printStackTrace();
ctx.close();
}
}
3、客户端异步调用的回调函数实现类 AsyncCallback:
import io.github.junxworks.junx.core.util.StringUtils;
import io.github.junxworks.junx.netty.call.Callback;
import io.github.junxworks.junx.netty.message.IoRequest;
import io.github.junxworks.junx.netty.message.IoResponse;
import io.netty.channel.ChannelHandlerContext;
public class AsyncCallback implements Callback<IoResponse> {
private IoRequest req;
public AsyncCallback(IoRequest req) {
this.req = req;
}
@Override
public void callback(ChannelHandlerContext ctx, IoResponse t) {
System.out.println("[async] Handler request id:" + t.getRequestId() + " result:" + new String(t.getData()));
}
@Override
public void dead() {
System.out.println(StringUtils.format("Request \"%s\" is dead.", req.getId()));
}
}
demo 的源码可以参考:https://github.com/junxworks/junx/tree/master/junx-sample/src/main/java/io/github/junxworks/junx/test/netty
欢迎来到这里!
我们正在构建一个小众社区,大家在这里相互信任,以平等 • 自由 • 奔放的价值观进行分享交流。最终,希望大家能够找到与自己志同道合的伙伴,共同成长。
注册 关于