Spring 版本 2.7.0
Netty 依赖:
<dependency>
<groupId>io.netty</groupId>
<artifactId>netty-all</artifactId>
<version>4.1.90.Final</version>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-web</artifactId>
</dependency>
先看启动类实现 CommandLineRuner:
@SpringBootApplication
@EnableCaching
@EnableAsync
@EnableScheduling
@EnableTransactionManagement
@Slf4j
public class JilijiliImApplication implements CommandLineRunner {
@Autowired
private ServerBootstrap serverBootstrap;
@Qualifier("mainGrpNioEventLoopGroup")
@Autowired
private NioEventLoopGroup mainNioEventLoopGroup;
@Autowired
private NioEventLoopGroup subNioEventLoopGroup;
@Value("${server.port}")
private Integer port;
public static void main(String[] args) {
SpringApplication.run(JilijiliImApplication.class, args);
}
@Override
public void run(String... args) throws Exception {
try {
//为空则调用start方法
serverBootstrap.channel(NioServerSocketChannel.class)
.group(mainNioEventLoopGroup, subNioEventLoopGroup)
.childHandler(new WebSocketServerInitializer());
ChannelFuture channelFuture = serverBootstrap.bind(port + 1).sync();
channelFuture.addListener((future -> {
if (future.isSuccess()) {
System.out.println("" +
" ████ ██ ██ ██ \n" +
"░██░██ ░██ ░██ ░██ ██ ██\n" +
"░██░░██ ░██ █████ ██████ ██████ ░░██ ██ \n" +
"░██ ░░██ ░██ ██░░░██░░░██░ ░░░██░ ░░███ \n" +
"░██ ░░██░██░███████ ░██ ░██ ░██ \n" +
"░██ ░░████░██░░░░ ░██ ░██ ██ \n" +
"░██ ░░███░░██████ ░░██ ░░██ ██ \n" +
"░░ ░░░ ░░░░░░ ░░ ░░ ░░ \n" +
"连接:ws://localhost:" + (port + 1) + "/ws" + "\n");
} else {
log.error("Netty启动失败!!!");
}
}));
channelFuture.channel().closeFuture().sync();
} catch (Exception e) {
e.printStackTrace();
log.error(e.getMessage());
} finally {
mainNioEventLoopGroup.shutdownGracefully();
subNioEventLoopGroup.shutdownGracefully();
}
}
}
自定义 NettyWebSocketServer 或者配置
@Configuration
@Slf4j
public class NettyWebSocketConfig {
@Bean
public ServerBootstrap serverBootstrap() {
log.info("Netty服务器准备就绪");
return new ServerBootstrap();
}
@Bean(value = "mainGrpNioEventLoopGroup")
public NioEventLoopGroup mainGrpNioEventLoopGroup() {
log.info("mainGrpNioEventLoopGroup 准备就绪");
return new NioEventLoopGroup();
}
@Bean(value = "subNioEventLoopGroup")
public NioEventLoopGroup subNioEventLoopGroup() {
log.info("subNioEventLoopGroup 准备就绪");
return new NioEventLoopGroup();
}
}
自定义 WebSocketServerInitializer 做协议操作
public class WebSocketServerInitializer extends ChannelInitializer<SocketChannel> {
@Override
protected void initChannel(SocketChannel ch) throws Exception {
ChannelPipeline pipeline = ch.pipeline();
// 添加 HTTP 相关的处理器
pipeline.addLast(new HttpServerCodec());
//添加聚合器
pipeline.addLast(new HttpObjectAggregator(1024 * 60));
//添加对大数据流的支持
pipeline.addLast(new ChunkedWriteHandler());
//设置websocket连接前缀前缀
pipeline.addLast(new WebSocketServerProtocolHandler("/ws"));
// 添加自定义的 WebSocket 处理器
pipeline.addLast(new WebSocketHandler(new WebSocketConnectionManager()));
}
}
自定义 WebSocket 连接管理器: WebSocketConnectionManager
public class WebSocketConnectionManager {
private static final Map<String, Channel> activeConnections = new ConcurrentHashMap<>();
public void addConnection(String userId, Channel channel) {
// 设置上线
activeConnections.put(userId, channel);
}
public void removeConnection(String userId) {
// 设置离线
activeConnections.remove(userId);
}
public Channel getConnection(String userId) {
// todo 连接数据库查询
return activeConnections.get(userId);
}
}
WebSocketHandler 做 websocket 事件监听
public class WebSocketHandler extends SimpleChannelInboundHandler<TextWebSocketFrame> {
private final WebSocketConnectionManager connectionManager;
public WebSocketHandler(WebSocketConnectionManager connectionManager) {
this.connectionManager = connectionManager;
}
@Override
public void channelActive(ChannelHandlerContext ctx) throws Exception {
// 新连接建立时调用
String userId = "user123"; // 你可以使用真实的用户ID
connectionManager.addConnection(userId, ctx.channel());
System.out.println("新连接建立:" + ctx.channel().id());
}
@Override
public void channelInactive(ChannelHandlerContext ctx) throws Exception {
// 连接关闭时调用
String userId = "user123"; // 你可以使用真实的用户ID
connectionManager.removeConnection(userId);
System.out.println("连接关闭:" + ctx.channel().id());
}
@Override
protected void channelRead0(ChannelHandlerContext ctx, TextWebSocketFrame msg) throws Exception {
// 处理收到的 WebSocket 消息
String receivedMessage = msg.text();
System.out.println("收到消息:" + receivedMessage);
// 你的处理逻辑...
// 示例:向客户端回复消息
ctx.channel().writeAndFlush(new TextWebSocketFrame("服务器回复:" + receivedMessage));
}
@Override
public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
// 处理异常
cause.printStackTrace();
ctx.close();
}
}
欢迎来到这里!
我们正在构建一个小众社区,大家在这里相互信任,以平等 • 自由 • 奔放的价值观进行分享交流。最终,希望大家能够找到与自己志同道合的伙伴,共同成长。
注册 关于