Springboot+Netty+WebSocket 配置

本贴最后更新于 496 天前,其中的信息可能已经时过境迁

Spring 版本 2.7.0

Netty 依赖:

<dependency> <groupId>io.netty</groupId> <artifactId>netty-all</artifactId> <version>4.1.90.Final</version> </dependency> <dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-web</artifactId> </dependency>

先看启动类实现 CommandLineRuner:

@SpringBootApplication @EnableCaching @EnableAsync @EnableScheduling @EnableTransactionManagement @Slf4j public class JilijiliImApplication implements CommandLineRunner { @Autowired private ServerBootstrap serverBootstrap; @Qualifier("mainGrpNioEventLoopGroup") @Autowired private NioEventLoopGroup mainNioEventLoopGroup; @Autowired private NioEventLoopGroup subNioEventLoopGroup; @Value("${server.port}") private Integer port; public static void main(String[] args) { SpringApplication.run(JilijiliImApplication.class, args); } @Override public void run(String... args) throws Exception { try { //为空则调用start方法 serverBootstrap.channel(NioServerSocketChannel.class) .group(mainNioEventLoopGroup, subNioEventLoopGroup) .childHandler(new WebSocketServerInitializer()); ChannelFuture channelFuture = serverBootstrap.bind(port + 1).sync(); channelFuture.addListener((future -> { if (future.isSuccess()) { System.out.println("" + " ████ ██ ██ ██ \n" + "░██░██ ░██ ░██ ░██ ██ ██\n" + "░██░░██ ░██ █████ ██████ ██████ ░░██ ██ \n" + "░██ ░░██ ░██ ██░░░██░░░██░ ░░░██░ ░░███ \n" + "░██ ░░██░██░███████ ░██ ░██ ░██ \n" + "░██ ░░████░██░░░░ ░██ ░██ ██ \n" + "░██ ░░███░░██████ ░░██ ░░██ ██ \n" + "░░ ░░░ ░░░░░░ ░░ ░░ ░░ \n" + "连接:ws://localhost:" + (port + 1) + "/ws" + "\n"); } else { log.error("Netty启动失败!!!"); } })); channelFuture.channel().closeFuture().sync(); } catch (Exception e) { e.printStackTrace(); log.error(e.getMessage()); } finally { mainNioEventLoopGroup.shutdownGracefully(); subNioEventLoopGroup.shutdownGracefully(); } } }

自定义 NettyWebSocketServer 或者配置

@Configuration @Slf4j public class NettyWebSocketConfig { @Bean public ServerBootstrap serverBootstrap() { log.info("Netty服务器准备就绪"); return new ServerBootstrap(); } @Bean(value = "mainGrpNioEventLoopGroup") public NioEventLoopGroup mainGrpNioEventLoopGroup() { log.info("mainGrpNioEventLoopGroup 准备就绪"); return new NioEventLoopGroup(); } @Bean(value = "subNioEventLoopGroup") public NioEventLoopGroup subNioEventLoopGroup() { log.info("subNioEventLoopGroup 准备就绪"); return new NioEventLoopGroup(); } }

自定义 WebSocketServerInitializer 做协议操作

public class WebSocketServerInitializer extends ChannelInitializer<SocketChannel> { @Override protected void initChannel(SocketChannel ch) throws Exception { ChannelPipeline pipeline = ch.pipeline(); // 添加 HTTP 相关的处理器 pipeline.addLast(new HttpServerCodec()); //添加聚合器 pipeline.addLast(new HttpObjectAggregator(1024 * 60)); //添加对大数据流的支持 pipeline.addLast(new ChunkedWriteHandler()); //设置websocket连接前缀前缀 pipeline.addLast(new WebSocketServerProtocolHandler("/ws")); // 添加自定义的 WebSocket 处理器 pipeline.addLast(new WebSocketHandler(new WebSocketConnectionManager())); } }

自定义 WebSocket 连接管理器: WebSocketConnectionManager

public class WebSocketConnectionManager { private static final Map<String, Channel> activeConnections = new ConcurrentHashMap<>(); public void addConnection(String userId, Channel channel) { // 设置上线 activeConnections.put(userId, channel); } public void removeConnection(String userId) { // 设置离线 activeConnections.remove(userId); } public Channel getConnection(String userId) { // todo 连接数据库查询 return activeConnections.get(userId); } }

WebSocketHandler 做 websocket 事件监听

public class WebSocketHandler extends SimpleChannelInboundHandler<TextWebSocketFrame> { private final WebSocketConnectionManager connectionManager; public WebSocketHandler(WebSocketConnectionManager connectionManager) { this.connectionManager = connectionManager; } @Override public void channelActive(ChannelHandlerContext ctx) throws Exception { // 新连接建立时调用 String userId = "user123"; // 你可以使用真实的用户ID connectionManager.addConnection(userId, ctx.channel()); System.out.println("新连接建立:" + ctx.channel().id()); } @Override public void channelInactive(ChannelHandlerContext ctx) throws Exception { // 连接关闭时调用 String userId = "user123"; // 你可以使用真实的用户ID connectionManager.removeConnection(userId); System.out.println("连接关闭:" + ctx.channel().id()); } @Override protected void channelRead0(ChannelHandlerContext ctx, TextWebSocketFrame msg) throws Exception { // 处理收到的 WebSocket 消息 String receivedMessage = msg.text(); System.out.println("收到消息:" + receivedMessage); // 你的处理逻辑... // 示例:向客户端回复消息 ctx.channel().writeAndFlush(new TextWebSocketFrame("服务器回复:" + receivedMessage)); } @Override public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception { // 处理异常 cause.printStackTrace(); ctx.close(); } }
  • Spring

    Spring 是一个开源框架,是于 2003 年兴起的一个轻量级的 Java 开发框架,由 Rod Johnson 在其著作《Expert One-On-One J2EE Development and Design》中阐述的部分理念和原型衍生而来。它是为了解决企业应用开发的复杂性而创建的。框架的主要优势之一就是其分层架构,分层架构允许使用者选择使用哪一个组件,同时为 JavaEE 应用程序开发提供集成的框架。

    948 引用 • 1460 回帖

相关帖子

欢迎来到这里!

我们正在构建一个小众社区,大家在这里相互信任,以平等 • 自由 • 奔放的价值观进行分享交流。最终,希望大家能够找到与自己志同道合的伙伴,共同成长。

注册 关于
请输入回帖内容 ...