后端 | 基于对象传输的 Netty 初体验

本贴最后更新于 1271 天前,其中的信息可能已经天翻地覆

一.前言

这是一个基于 Netty 进行对象传输的极简教程,笔者使用到的技术及版本如下:
Netty-All 4.1.70.Final
Log4J 2.14.1

笔者从知乎中的 netty 的教程学完后做的笔记,好记性不如烂笔头,记起来偶尔翻一番,总会有新收获。

二.项目目录结构

image.png

  1. common:服务端与客户端共同使用的配置文件及 User 对象
  2. server:服务端负责接收客户端传来的 Object 数据并响应
  3. client:客户端负责主动向服务器发送 Objcet 数据并接收服务端反馈的数据

三.pom 文件

<?xml version="1.0" encoding="UTF-8"?> <project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd"> <modelVersion>4.0.0</modelVersion> <groupId>xyz.hcworld</groupId> <artifactId>netty-one</artifactId> <packaging>pom</packaging> <version>0.0.1</version> <modules> <module>server</module> <module>client</module> <module>common</module> </modules> <properties> <maven.compiler.source>8</maven.compiler.source> <maven.compiler.target>8</maven.compiler.target> <lombok.version>1.18.20</lombok.version> <netty.version>4.1.70.Final</netty.version> <log4j.version>2.14.1</log4j.version> </properties> <dependencies> <dependency> <groupId>org.projectlombok</groupId> <artifactId>lombok</artifactId> <version>${lombok.version}</version> <scope>provided</scope> </dependency> <dependency> <groupId>io.netty</groupId> <artifactId>netty-all</artifactId> <version>${netty.version}</version> </dependency> <dependency> <groupId>org.apache.logging.log4j</groupId> <artifactId>log4j-core</artifactId> <version>${log4j.version}</version> </dependency> </dependencies> <build> <plugins> <plugin> <groupId>org.apache.maven.plugins</groupId> <artifactId>maven-compiler-plugin</artifactId> <version>3.1</version> <configuration> <source>1.8</source> <target>1.8</target> <encoding>UTF-8</encoding> </configuration> </plugin> </plugins> </build> </project>

四.log4J 配置文件

1.Server 端 log4J 配置文件

<?xml version="1.0" encoding="UTF-8"?> <!--日志级别以及优先级排序: OFF > FATAL > ERROR > WARN > INFO > DEBUG > TRACE > ALL --> <!--Configuration后面的status,这个用于设置log4j2自身内部的信息输出,可以不设置,当设置成trace时,你会看到log4j2内部各种详细输出--> <!--monitorInterval:Log4j能够自动检测修改配置 文件和重新配置本身,设置间隔秒数--> <configuration status="WARN" monitorInterval="30"> <!--先定义所有的appender--> <appenders> <!--这个输出控制台的配置--> <console name="Console" target="SYSTEM_OUT"> <!--输出日志的格式--> <PatternLayout pattern="[%d{HH:mm:ss:SSS}] [%p] - %l - %m%n"/> </console> <!-- 这个会打印出所有的info及以下级别的信息,每次大小超过size,则这size大小的日志会自动存入按年份-月份建立的文件夹下面并进行压缩,作为存档--> <RollingFile name="RollingFileInfo" fileName="./logs/server/info.log" filePattern="./logs/server/$${date:yyyy-MM}/info-%d{yyyy-MM-dd}-%i.log"> <!--控制台只输出level及以上级别的信息(onMatch),其他的直接拒绝(onMismatch)--> <ThresholdFilter level="info" onMatch="ACCEPT" onMismatch="DENY"/> <PatternLayout pattern="[%d{HH:mm:ss:SSS}] [%p] - %l - %m%n"/> <Policies> <TimeBasedTriggeringPolicy/> <SizeBasedTriggeringPolicy size="100 MB"/> </Policies> </RollingFile> <RollingFile name="RollingFileWarn" fileName="./logs/server/warn.log" filePattern="./logs/server/$${date:yyyy-MM}/warn-%d{yyyy-MM-dd}-%i.log"> <ThresholdFilter level="warn" onMatch="ACCEPT" onMismatch="DENY"/> <PatternLayout pattern="[%d{HH:mm:ss:SSS}] [%p] - %l - %m%n"/> <Policies> <TimeBasedTriggeringPolicy/> <SizeBasedTriggeringPolicy size="100 MB"/> </Policies> <!-- DefaultRolloverStrategy属性如不设置,则默认为最多同一文件夹下7个文件,这里设置了20 --> <DefaultRolloverStrategy max="20"/> </RollingFile> <RollingFile name="RollingFileError" fileName="./logs/server/error.log" filePattern="./logs/server/$${date:yyyy-MM}/error-%d{yyyy-MM-dd}-%i.log"> <ThresholdFilter level="error" onMatch="ACCEPT" onMismatch="DENY"/> <PatternLayout pattern="[%d{HH:mm:ss:SSS}] [%p] - %l - %m%n"/> <Policies> <TimeBasedTriggeringPolicy/> <SizeBasedTriggeringPolicy size="100 MB"/> </Policies> </RollingFile> </appenders> <!--然后定义logger,只有定义了logger并引入的appender,appender才会生效--> <loggers> <!--过滤掉spring的一些无用的DEBUG信息--> <logger name="org.springframework" level="INFO"></logger> <root level="all"> <appender-ref ref="Console"/> <appender-ref ref="RollingFileInfo"/> <appender-ref ref="RollingFileWarn"/> <appender-ref ref="RollingFileError"/> </root> </loggers> </configuration>

2.Client 端 log4J 配置文件

<?xml version="1.0" encoding="UTF-8"?> <!--日志级别以及优先级排序: OFF > FATAL > ERROR > WARN > INFO > DEBUG > TRACE > ALL --> <!--Configuration后面的status,这个用于设置log4j2自身内部的信息输出,可以不设置,当设置成trace时,你会看到log4j2内部各种详细输出--> <!--monitorInterval:Log4j能够自动检测修改配置 文件和重新配置本身,设置间隔秒数--> <configuration status="WARN" monitorInterval="30"> <!--先定义所有的appender--> <appenders> <!--这个输出控制台的配置--> <console name="Console" target="SYSTEM_OUT"> <!--输出日志的格式--> <PatternLayout pattern="[%d{HH:mm:ss:SSS}] [%p] - %l - %m%n"/> </console> <!-- 这个会打印出所有的info及以下级别的信息,每次大小超过size,则这size大小的日志会自动存入按年份-月份建立的文件夹下面并进行压缩,作为存档--> <RollingFile name="RollingFileInfo" fileName="./logs/client/info.log" filePattern="./logs/client/$${date:yyyy-MM}/info-%d{yyyy-MM-dd}-%i.log"> <!--控制台只输出level及以上级别的信息(onMatch),其他的直接拒绝(onMismatch)--> <ThresholdFilter level="info" onMatch="ACCEPT" onMismatch="DENY"/> <PatternLayout pattern="[%d{HH:mm:ss:SSS}] [%p] - %l - %m%n"/> <Policies> <TimeBasedTriggeringPolicy/> <SizeBasedTriggeringPolicy size="100 MB"/> </Policies> </RollingFile> <RollingFile name="RollingFileWarn" fileName="./logs/client/warn.log" filePattern="./logs/client/$${date:yyyy-MM}/warn-%d{yyyy-MM-dd}-%i.log"> <ThresholdFilter level="warn" onMatch="ACCEPT" onMismatch="DENY"/> <PatternLayout pattern="[%d{HH:mm:ss:SSS}] [%p] - %l - %m%n"/> <Policies> <TimeBasedTriggeringPolicy/> <SizeBasedTriggeringPolicy size="100 MB"/> </Policies> <!-- DefaultRolloverStrategy属性如不设置,则默认为最多同一文件夹下7个文件,这里设置了20 --> <DefaultRolloverStrategy max="20"/> </RollingFile> <RollingFile name="RollingFileError" fileName="./logs/client/error.log" filePattern="./logs/client/$${date:yyyy-MM}/error-%d{yyyy-MM-dd}-%i.log"> <ThresholdFilter level="error" onMatch="ACCEPT" onMismatch="DENY"/> <PatternLayout pattern="[%d{HH:mm:ss:SSS}] [%p] - %l - %m%n"/> <Policies> <TimeBasedTriggeringPolicy/> <SizeBasedTriggeringPolicy size="100 MB"/> </Policies> </RollingFile> </appenders> <!--然后定义logger,只有定义了logger并引入的appender,appender才会生效--> <loggers> <!--过滤掉spring的一些无用的DEBUG信息--> <logger name="org.springframework" level="INFO"></logger> <root level="all"> <appender-ref ref="Console"/> <appender-ref ref="RollingFileInfo"/> <appender-ref ref="RollingFileWarn"/> <appender-ref ref="RollingFileError"/> </root> </loggers> </configuration>

注: log4j 配置文件来源于互联网,有兴趣自行了解,此处不多做篇幅描述

五.服务端源码

1.PojoServerHandler 对象消息处理器

package xyz.hcworld.one.server; import io.netty.buffer.ByteBuf; import io.netty.channel.ChannelFuture; import io.netty.channel.ChannelHandlerContext; import io.netty.channel.ChannelInboundHandlerAdapter; import io.netty.util.CharsetUtil; import io.netty.util.ReferenceCountUtil; import lombok.extern.log4j.Log4j2; import xyz.hcworld.one.common.User; import io.netty.channel.ChannelHandler.Sharable; import java.nio.charset.StandardCharsets; import java.util.ArrayList; import java.util.List; import static io.netty.channel.ChannelFutureListener.FIRE_EXCEPTION_ON_FAILURE; /** * 使用对象进行数据传输(服务器)的消息处理器 * @ClassName: PojoServerHandler * @Author: 张冠诚 * @Date: 2021/9/3 11:13 * @Version: 1.0 */ @Sharable @Log4j2 public class PojoServerHandler extends ChannelInboundHandlerAdapter { /** * 解码成功的消息处理 * @param ctx 上下文 * @param msg 解析后的数据 */ @Override public void channelRead(ChannelHandlerContext ctx, Object msg) { if (msg instanceof String) { log.info("server收到String对象:{}", msg); } else { User user = (User) msg; log.info("server收到User对象:{}", user.toString()); } ctx.write("收到了!"); try { Thread.sleep(5000); } catch (InterruptedException e) { e.printStackTrace(); } } /** * 无论是否有完整的消息被解码成功,只要读到消息,都会触发channelReadComplete * @param ctx 上下文 */ @Override public void channelReadComplete(ChannelHandlerContext ctx) { ctx.flush(); } /** * 异常处理 * @param ctx 上下文 * @param cause 异常消息 */ @Override public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) { log.error("出现异常", cause); ctx.close(); } }

2.OneServerBoot 服务端启动器

package xyz.hcworld.one.server; import io.netty.bootstrap.ServerBootstrap; import io.netty.channel.*; import io.netty.channel.nio.NioEventLoopGroup; import io.netty.channel.socket.SocketChannel; import io.netty.channel.socket.nio.NioServerSocketChannel; import io.netty.handler.codec.serialization.ClassResolvers; import io.netty.handler.codec.serialization.ObjectDecoder; import io.netty.handler.codec.serialization.ObjectEncoder; import xyz.hcworld.one.common.Config; /** * 服务器端启动类 * @ClassName: OneServerBoot * @Author: 张冠诚 * @Date: 2021/8/31 9:38 * @Version: 1.0 */ public class OneServerBoot { public static void main(String[] args) { //建立两个EventloopGroup用来处理连接和消息 EventLoopGroup bossGroup = new NioEventLoopGroup(1); EventLoopGroup workerGroup = new NioEventLoopGroup(); try { ServerBootstrap b = new ServerBootstrap(); b.group(bossGroup, workerGroup) .channel(NioServerSocketChannel.class) .localAddress(Config.PORT) // tcp/ip协议listen函数中的backlog参数(队列),等待连接池的大小 .option(ChannelOption.SO_BACKLOG, 100) .childHandler(new ChannelInitializer<SocketChannel>() { @Override public void initChannel(SocketChannel ch) throws Exception { ChannelPipeline p = ch.pipeline(); p.addLast( // 添加Object的序列化及反序列化encoder和decoder(用的java自动的(反)序列化,性能较差) new ObjectEncoder(), new ObjectDecoder(ClassResolvers.cacheDisabled(null)), // 自定义消息处理器 new PojoServerHandler()); } }) .option(ChannelOption.SO_BACKLOG, 128) .childOption(ChannelOption.SO_KEEPALIVE, true); // 绑定端口并开始接收连接 ChannelFuture f = b.bind().sync(); // 等待所有的socket都关闭。 f.channel().closeFuture().sync(); }catch (Exception e){ System.out.println(e.getMessage()); } } }

六.客户端源码

1.PojoClientHandler 消息处理器

package xyz.hcworld.one.client; import io.netty.channel.ChannelFuture; import io.netty.channel.ChannelHandler.Sharable; import io.netty.channel.ChannelHandlerContext; import io.netty.channel.ChannelInboundHandlerAdapter; import io.netty.handler.timeout.IdleState; import io.netty.handler.timeout.IdleStateEvent; import lombok.extern.log4j.Log4j2; import xyz.hcworld.one.common.Config; import xyz.hcworld.one.common.User; import java.util.concurrent.TimeUnit; import static io.netty.channel.ChannelFutureListener.FIRE_EXCEPTION_ON_FAILURE; /** * 使用对象进行数据传输(客户端) * * @ClassName: PojoClientHandler * @Author: 张冠诚 * @Date: 2021/9/3 11:17 * @Version: 1.0 */ @Sharable @Log4j2 public class PojoClientHandler extends ChannelInboundHandlerAdapter { int i = 0; long startTime = -1; @Override public void channelActive(ChannelHandlerContext ctx) { if (startTime < 0) { startTime = System.currentTimeMillis(); } println("连接到: " + ctx.channel().remoteAddress()); // 在channel active的时候发送消息 ChannelFuture future = ctx.writeAndFlush("对象传输"); // 将ChannelFuture中的Throwable转发到ChannelPipeline中。 future.addListener(FIRE_EXCEPTION_ON_FAILURE); } @Override public void channelRead(ChannelHandlerContext ctx, Object msg) { // 将消息写回channel log.info("客户端收到对象:{}", msg); User user = new User(); user.setUsername("测试账号:" + i); user.setPassword("测试密码:" + i); i++; if (i<=10){ ctx.writeAndFlush(user); } } @Override public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) { // 异常处理 log.error("出现异常", cause); ctx.close(); } @Override public void channelInactive(final ChannelHandlerContext ctx) { println("连接断开:" + ctx.channel().remoteAddress()); } @Override public void userEventTriggered(ChannelHandlerContext ctx, Object evt) { if (!(evt instanceof IdleStateEvent)) { return; } IdleStateEvent e = (IdleStateEvent) evt; if (e.state() == IdleState.READER_IDLE) { // 在Idle状态 println("Idle状态,关闭连接"); ctx.close(); } } @Override public void channelUnregistered(final ChannelHandlerContext ctx) throws Exception { println("sleep:" + OneClientBoot.RECONNECT_DELAY + 's'); i=0; ctx.channel().eventLoop().schedule(() -> { println("重连接: " + Config.HOST + ':' + Config.PORT); OneClientBoot.connect(); }, OneClientBoot.RECONNECT_DELAY, TimeUnit.SECONDS); } void println(String msg) { if (startTime < 0) { log.error("服务下线:{}", msg); } else { log.error("服务运行时间:{},{}", (System.currentTimeMillis() - startTime) / 1000, msg); } } }

2.OneClientBoot 客户端启动器

package xyz.hcworld.one.client; import io.netty.bootstrap.Bootstrap; import io.netty.channel.ChannelFuture; import io.netty.channel.ChannelInitializer; import io.netty.channel.ChannelPipeline; import io.netty.channel.EventLoopGroup; import io.netty.channel.nio.NioEventLoopGroup; import io.netty.channel.socket.SocketChannel; import io.netty.channel.socket.nio.NioSocketChannel; import io.netty.handler.codec.serialization.ClassResolvers; import io.netty.handler.codec.serialization.ObjectDecoder; import io.netty.handler.codec.serialization.ObjectEncoder; import io.netty.handler.timeout.IdleStateHandler; import lombok.extern.log4j.Log4j2; import xyz.hcworld.one.common.Config; import java.util.concurrent.*; /** * @ClassName: OneClientBoot * @Author: 张红尘 * @Date: 2021/8/31 9:47 * @Version: 1.0 */ @Log4j2 public class OneClientBoot extends Thread { /** * 核心线程池大小 */ public static final int CORE_POOL_SIZE = 5; /** * 最大线程池大小 */ public static final int MAX_POOL_SIZE = 10; /** * 阻塞任务队列大小 */ public static final int QUEUE_CAPACITY = 100; /** * 空闲线程存活时间 */ public static final Long KEEP_ALIVE_TIME = 1L; /** * 在reconnect之前 Sleep 5 秒钟 */ static final int RECONNECT_DELAY = Integer.parseInt(System.getProperty("reconnectDelay", "5")); /** * 如果在10秒中之内没有任何相应则重连 */ private static final int READ_TIMEOUT = Integer.parseInt(System.getProperty("readTimeout", "10")); private static final PojoClientHandler POJO_CLIENT_HANDLER = new PojoClientHandler(); private static final Bootstrap BS = new Bootstrap(); @Override public void run() { EventLoopGroup group = new NioEventLoopGroup(); try { BS.group(group) .channel(NioSocketChannel.class) .remoteAddress(Config.HOST, Config.PORT) .handler(new ChannelInitializer<SocketChannel>() { @Override protected void initChannel(SocketChannel ch) throws Exception { ChannelPipeline p = ch.pipeline(); p.addLast( // 添加encoder和decoder new ObjectEncoder(), new ObjectDecoder(ClassResolvers.cacheDisabled(null)), //心跳机制 new IdleStateHandler(READ_TIMEOUT, 0, 0), POJO_CLIENT_HANDLER); } }); // 连接服务器 ChannelFuture f = BS.connect().sync(); } catch (Exception e) { log.error("服务下线:{}", e.getMessage()); } } static void connect() { BS.connect().addListener(future -> { if (future.cause() != null) { POJO_CLIENT_HANDLER.startTime = -1; POJO_CLIENT_HANDLER.println("建立连接失败: " + future.cause()); } }); } public static void main(String[] args) { //通过ThreadPoolExecutor构造函数自定义参数创建 ThreadPoolExecutor executor = new ThreadPoolExecutor( CORE_POOL_SIZE, MAX_POOL_SIZE, KEEP_ALIVE_TIME, TimeUnit.SECONDS, new ArrayBlockingQueue<>(QUEUE_CAPACITY), new ThreadPoolExecutor.CallerRunsPolicy()); // 多线程测试 for (int i = 0; i < 1; i++) { executor.execute(new OneClientBoot()); try { Thread.sleep(400); } catch (InterruptedException e) { e.printStackTrace(); } } } }

疑难杂症

问题一 使用 clean 清除编译文件时报 Process terminated

image.png

image.png

解决方案:setting.xml 配置的路径错误,修改了 setting.xml 文件的路径。切 setting.xml 的缩进不规范。使用 VS Code 格式化后完成清除

项目下载

Github: https://github.com/z875479694h/netty-one

  • Netty

    Netty 是一个基于 NIO 的客户端-服务器编程框架,使用 Netty 可以让你快速、简单地开发出一个可维护、高性能的网络应用,例如实现了某种协议的客户、服务端应用。

    49 引用 • 33 回帖 • 38 关注

相关帖子

欢迎来到这里!

我们正在构建一个小众社区,大家在这里相互信任,以平等 • 自由 • 奔放的价值观进行分享交流。最终,希望大家能够找到与自己志同道合的伙伴,共同成长。

注册 关于
请输入回帖内容 ...