4. Netty 初认识 --AIO 编程

本贴最后更新于 1858 天前,其中的信息可能已经时移世异

通过上篇博文我们已能发现 NIO 编程难度确实比同步阻塞 BIO 大很多,而且之前的 NIO 并没有考虑"半包读","半包写",如果加上这些,会更加复杂,那为什么 NIO 使用越来越广泛,它的优点如下:

  1. 客户端发起的连接操作是异步的,可以通过在多路复用器注册 OP_CONNECT 等待后续结果,不需要像之前的客户端那样被同步阻塞。

  2. SocketChannel 的读写操作都是异步的,如果没有可读写的数据它不会同步等待,直接返回,这样 I/O 通信线程就可以处理其他的链路,不需要同步等待这个链路可用。

  3. 线程模型的优化:由于 JDK 的 Selector 在 Linux 等主流操作系统上通过 epoll 实现,它没有连接句柄数的限制(只受限于操作系统的最大句柄数或者对单个进程的句柄限制),这意味着一个 Selector 线程可用同时处理成千上万个客户端连接,而且性能不会随着客户端的增加而线性下降,一会那次,它非常适合做高性能、高负载的网络服务器。

    JDK1.7 升级了 NIO 类库,升级后的 NIO 类库被称为 NIO2.0,引人注目的是,java 正式提供了异步文件 I/O 操作,同时提供了与 UNIX 网络编程事件驱动 I/O 对应的 AIO; 它不需要通过多路复用器 Selector 对注册的通道进行轮询操作即可实现异步读写。下面以 AIO 举例上篇文章的客户端连接服务器获取当前时间。

NIO2.0 提供了异步文件通道和异步套接字通道的实现,异步通道提供两种方式后去操作结果:

通过 java.util.concurrent.Future 类来标识异步操作的结果;
在执行异步操作的时候传入一个 java.nio.channels.

CompletionHandler 接口的实现类作为操作完成的回调。

服务端
TimeServer.java

package club.wujingjian.com.wujingjian.aio.server; public class TimeServer { public static void main(String[] args) { int port = 9090; if (args != null && args.length > 0) { try { port = Integer.parseInt(args[0]); } catch (NumberFormatException e) { e.printStackTrace(); } } //创建异步的时间服务器处理类 AsyncTimeServerHandler timeServer = new AsyncTimeServerHandler(port); //启动异步时间服务器处理类的线程 new Thread(timeServer, "AIO-AsyncTimeServerHandler-001").start(); } }

AsyncTimeServerHandler.java

package club.wujingjian.com.wujingjian.aio.server; import java.io.IOException; import java.net.InetSocketAddress; import java.nio.channels.AsynchronousServerSocketChannel; import java.util.concurrent.CountDownLatch; public class AsyncTimeServerHandler implements Runnable { private int port; CountDownLatch latch; AsynchronousServerSocketChannel asynchronousServerSocketChannel; public AsyncTimeServerHandler(int port) { this.port = port; try { //创建一个异步服务端通道 AsynchronousServerSocketChannel asynchronousServerSocketChannel = AsynchronousServerSocketChannel.open(); //调用bind方法绑定监听端口,如果端口合法且没被占用,绑定成功 asynchronousServerSocketChannel.bind(new InetSocketAddress(port)); //打印启动成功提示到控制台 System.out.println("The time server is start in port :" + port); } catch (IOException e) { e.printStackTrace(); } } @Override public void run() { //CountDownLatch作用是在完成一组正在执行的操作之前,允许当前的线程一直阻塞,在本例中我们让线程在此阻塞,防止服务端执行完成退出 //在实际项目中不需要启动独立线程来处理AsynchronousServerSocketChannel,这里仅仅做个示例展示 latch = new CountDownLatch(1); doAccept(); //用于接收客户端的连接 try { latch.await(); } catch (InterruptedException e) { e.printStackTrace(); } } public void doAccept(){ //AcceptCompletionHandler 用来接收通知消息 asynchronousServerSocketChannel.accept(this, new AcceptCompletionHandler()); } }

AcceptCompletionHandler.java

package club.wujingjian.com.wujingjian.aio.server; import java.nio.ByteBuffer; import java.nio.channels.AsynchronousSocketChannel; import java.nio.channels.CompletionHandler; public class AcceptCompletionHandler implements CompletionHandler<AsynchronousSocketChannel, AsyncTimeServerHandler> { @Override public void completed(AsynchronousSocketChannel result, AsyncTimeServerHandler attachment) { //从attachment获取成员变量AsynchronousServerSocketChannel,然后继续调用它的accept方法. //当我们调用AsynchronousServerSocketChannel的accept方法后,如果有新的客户端连接接入,系统将回调我们传入的CompletionHandler示例的completed方法,表示新 //的客户端已经接入成功,因为一个AsynchronousSocketChannel可以接收成千上万个客户端,搜易我们这里要继续调用它的accept方法,接收其他的客户端连接,最终形成 //一个循环.每当接收一个客户读连接成功之后,再一步接收新的客户端连接 attachment.asynchronousServerSocketChannel.accept(attachment, this); //链路建立成功后,服务端需要接收客户端的请求,创建ByteBuffer ByteBuffer buffer = ByteBuffer.allocate(1024); /** AsynchronousSocketChannel的read方法参数解释如下: * 第一个参数ByteBuffer dst: 接收缓冲区,用于从异步Chanenl中读取数据包; * 第二个参数A attachemnt: 异步Channel携带的附件,通知回调的时候作为入参使用; * 第三个参数CompletionHandler: 接收通知回调的业务handler,此处用ReadCompletionHandler */ result.read(buffer, buffer, new ReadCompletionHandler(result)); } @Override public void failed(Throwable exc, AsyncTimeServerHandler attachment) { exc.printStackTrace(); attachment.latch.countDown(); } }

ReadCompletionHandler.java

package club.wujingjian.com.wujingjian.aio.server; import java.io.IOException; import java.io.UnsupportedEncodingException; import java.nio.ByteBuffer; import java.nio.channels.AsynchronousSocketChannel; import java.nio.channels.CompletionHandler; import java.util.Date; public class ReadCompletionHandler implements CompletionHandler<Integer, ByteBuffer> { private AsynchronousSocketChannel channel; //将AsynchronousSocketChannel 传入构造参数中,主要用于读取半包消息和发送应答,本示例不对半包读写进行具体说明 public ReadCompletionHandler(AsynchronousSocketChannel channel) { if (this.channel == null) { this.channel = channel; } } @Override //读取到消息的处理 public void completed(Integer result, ByteBuffer attachment) { attachment.flip();//为后续从缓冲区读取数据做准备 byte[] body = new byte[attachment.remaining()]; attachment.get(body); try { String req = new String(body, "UTF-8"); System.out.println("The time server receive order : " + req); String currentTime = "QUERY TIME ORDER".equalsIgnoreCase(req) ? new Date(System.currentTimeMillis()).toString() : "BAD ORDER"; doWrite(currentTime); } catch (UnsupportedEncodingException e) { e.printStackTrace(); } } private void doWrite(String currentTime) { //对当前时间进行合法性校验 if (currentTime != null && currentTime.trim().length() > 0) { byte [] bytes = currentTime.getBytes(); ByteBuffer writeBuffer = ByteBuffer.allocate(bytes.length); writeBuffer.put(bytes); writeBuffer.flip(); channel.write(writeBuffer, writeBuffer, new CompletionHandler<Integer, ByteBuffer>() { @Override public void completed(Integer result, ByteBuffer buffer) { //如果没有发送完成,继续发送 if (buffer.hasRemaining()) { channel.write(buffer, buffer, this); } } @Override public void failed(Throwable exc, ByteBuffer attachment) { try { channel.close(); } catch (IOException e) { //ignore on close } } }); } } @Override public void failed(Throwable exc, ByteBuffer attachment) { try { this.channel.close(); } catch (IOException e) { e.printStackTrace(); } } }

客户端

TimeClient.java

package club.wujingjian.com.wujingjian.aio.client; public class TimeClient { public static void main(String[] args) { int port = 9090; if (args != null && args.length > 0) { try { port = Integer.parseInt(args[0]); } catch (NumberFormatException e) { e.printStackTrace(); } } new Thread(new AsyncTimeClientHandler("127.0.0.1",port), "AIO-AsyncTimeClientHandler-001").start(); } }

AsyncTimeClientHandler.java

package club.wujingjian.com.wujingjian.aio.client; import java.io.IOException; import java.io.UnsupportedEncodingException; import java.net.InetSocketAddress; import java.nio.ByteBuffer; import java.nio.channels.AsynchronousSocketChannel; import java.nio.channels.CompletionHandler; import java.util.concurrent.CountDownLatch; public class AsyncTimeClientHandler implements CompletionHandler<Void,AsyncTimeClientHandler>, Runnable { private AsynchronousSocketChannel client; private String host; private int port; private CountDownLatch latch; public AsyncTimeClientHandler(String host, int port) { this.host = host; this.port = port; try { client = AsynchronousSocketChannel.open(); } catch (IOException e) { e.printStackTrace(); } } @Override public void run() { latch = new CountDownLatch(1);//创建CountDownLatch进行等待,防止异步操作没有执行完线程就退出 //client.connect第二个参数A attach : AsynchronousSocketChannel的附件,用于回调通知时作为入参被传递,调用者可以自定义 //client.connect第三个参数CompletionHandler: 异步操作回调通知接口,由调用者实现 //本例中我们第二,三个参数都是用了AsyncTimeClientHandler类本身,因为它实现了CompletionHandler接口 client.connect(new InetSocketAddress(host, port), this, this); try { latch.await(); } catch (InterruptedException e) { e.printStackTrace(); } try { client.close(); } catch (IOException e1) { e1.printStackTrace(); } } @Override public void completed(Void result, AsyncTimeClientHandler attachment) { byte[] req = "QUERY TIME ORDER".getBytes(); ByteBuffer writeBuffer = ByteBuffer.allocate(req.length); writeBuffer.put(req); writeBuffer.flip(); client.write(writeBuffer, writeBuffer, new CompletionHandler<Integer, ByteBuffer>() { @Override public void completed(Integer result, ByteBuffer buffer) { if (buffer.hasRemaining()) { client.write(buffer,buffer,this); } else { ByteBuffer readBuffer = ByteBuffer.allocate(1024); client.read(readBuffer, readBuffer, new CompletionHandler<Integer, ByteBuffer>() { @Override public void completed(Integer result, ByteBuffer buffer) { buffer.flip(); byte[] bytes = new byte[buffer.remaining()]; buffer.get(bytes); String body; try { body = new String(bytes, "UTF-8"); System.out.println("Now is : " + body); } catch (UnsupportedEncodingException e) { e.printStackTrace(); } } @Override public void failed(Throwable exc, ByteBuffer attachment) { try { client.close(); latch.countDown(); } catch (IOException e) { //ignore on close } } }); } } @Override public void failed(Throwable exc, ByteBuffer attachment) { try { client.close(); latch.countDown(); } catch (IOException e) { // ignore on close } } }); } @Override public void failed(Throwable exc, AsyncTimeClientHandler attachment) { try { client.close(); latch.countDown(); } catch (IOException e) { // ignore on close } } }

运行结果 Server 端
image.png
客户端:

image.png

  • Netty

    Netty 是一个基于 NIO 的客户端-服务器编程框架,使用 Netty 可以让你快速、简单地开发出一个可维护、高性能的网络应用,例如实现了某种协议的客户、服务端应用。

    49 引用 • 33 回帖 • 34 关注

相关帖子

欢迎来到这里!

我们正在构建一个小众社区,大家在这里相互信任,以平等 • 自由 • 奔放的价值观进行分享交流。最终,希望大家能够找到与自己志同道合的伙伴,共同成长。

注册 关于
请输入回帖内容 ...

推荐标签 标签

  • ActiveMQ

    ActiveMQ 是 Apache 旗下的一款开源消息总线系统,它完整实现了 JMS 规范,是一个企业级的消息中间件。

    19 引用 • 13 回帖 • 678 关注
  • 叶归
    5 引用 • 16 回帖 • 10 关注
  • 30Seconds

    📙 前端知识精选集,包含 HTML、CSS、JavaScript、React、Node、安全等方面,每天仅需 30 秒。

    • 精选常见面试题,帮助您准备下一次面试
    • 精选常见交互,帮助您拥有简洁酷炫的站点
    • 精选有用的 React 片段,帮助你获取最佳实践
    • 精选常见代码集,帮助您提高打码效率
    • 整理前端界的最新资讯,邀您一同探索新世界
    488 引用 • 384 回帖 • 5 关注
  • 正则表达式

    正则表达式(Regular Expression)使用单个字符串来描述、匹配一系列遵循某个句法规则的字符串。

    31 引用 • 94 回帖 • 1 关注
  • Q&A

    提问之前请先看《提问的智慧》,好的问题比好的答案更有价值。

    9417 引用 • 42904 回帖 • 109 关注
  • 链滴

    链滴是一个记录生活的地方。

    记录生活,连接点滴

    171 引用 • 3842 回帖
  • MyBatis

    MyBatis 本是 Apache 软件基金会 的一个开源项目 iBatis,2010 年这个项目由 Apache 软件基金会迁移到了 google code,并且改名为 MyBatis ,2013 年 11 月再次迁移到了 GitHub。

    173 引用 • 414 回帖 • 367 关注
  • 支付宝

    支付宝是全球领先的独立第三方支付平台,致力于为广大用户提供安全快速的电子支付/网上支付/安全支付/手机支付体验,及转账收款/水电煤缴费/信用卡还款/AA 收款等生活服务应用。

    29 引用 • 347 回帖
  • CentOS

    CentOS(Community Enterprise Operating System)是 Linux 发行版之一,它是来自于 Red Hat Enterprise Linux 依照开放源代码规定释出的源代码所编译而成。由于出自同样的源代码,因此有些要求高度稳定的服务器以 CentOS 替代商业版的 Red Hat Enterprise Linux 使用。两者的不同在于 CentOS 并不包含封闭源代码软件。

    239 引用 • 224 回帖 • 1 关注
  • 代码片段

    代码片段分为 CSS 与 JS 两种代码,添加在 [设置 - 外观 - 代码片段] 中,这些代码会在思源笔记加载时自动执行,用于改善笔记的样式或功能。

    用户在该标签下分享代码片段时需在帖子标题前添加 [css] [js] 用于区分代码片段类型。

    133 引用 • 890 回帖 • 1 关注
  • abitmean

    有点意思就行了

    31 关注
  • PostgreSQL

    PostgreSQL 是一款功能强大的企业级数据库系统,在 BSD 开源许可证下发布。

    22 引用 • 22 回帖 • 1 关注
  • CodeMirror
    1 引用 • 2 回帖 • 155 关注
  • ZooKeeper

    ZooKeeper 是一个分布式的,开放源码的分布式应用程序协调服务,是 Google 的 Chubby 一个开源的实现,是 Hadoop 和 HBase 的重要组件。它是一个为分布式应用提供一致性服务的软件,提供的功能包括:配置维护、域名服务、分布式同步、组服务等。

    59 引用 • 29 回帖 • 2 关注
  • HBase

    HBase 是一个分布式的、面向列的开源数据库,该技术来源于 Fay Chang 所撰写的 Google 论文 “Bigtable:一个结构化数据的分布式存储系统”。就像 Bigtable 利用了 Google 文件系统所提供的分布式数据存储一样,HBase 在 Hadoop 之上提供了类似于 Bigtable 的能力。

    17 引用 • 6 回帖 • 60 关注
  • 导航

    各种网址链接、内容导航。

    43 引用 • 177 回帖 • 2 关注
  • API

    应用程序编程接口(Application Programming Interface)是一些预先定义的函数,目的是提供应用程序与开发人员基于某软件或硬件得以访问一组例程的能力,而又无需访问源码,或理解内部工作机制的细节。

    79 引用 • 431 回帖
  • 小说

    小说是以刻画人物形象为中心,通过完整的故事情节和环境描写来反映社会生活的文学体裁。

    31 引用 • 108 回帖 • 1 关注
  • AWS
    11 引用 • 28 回帖 • 9 关注
  • Solo

    Solo 是一款小而美的开源博客系统,专为程序员设计。Solo 有着非常活跃的社区,可将文章作为帖子推送到社区,来自社区的回帖将作为博客评论进行联动(具体细节请浏览 B3log 构思 - 分布式社区网络)。

    这是一种全新的网络社区体验,让热爱记录和分享的你不再感到孤单!

    1440 引用 • 10067 回帖 • 491 关注
  • Tomcat

    Tomcat 最早是由 Sun Microsystems 开发的一个 Servlet 容器,在 1999 年被捐献给 ASF(Apache Software Foundation),隶属于 Jakarta 项目,现在已经独立为一个顶级项目。Tomcat 主要实现了 JavaEE 中的 Servlet、JSP 规范,同时也提供 HTTP 服务,是市场上非常流行的 Java Web 容器。

    162 引用 • 529 回帖 • 4 关注
  • WebSocket

    WebSocket 是 HTML5 中定义的一种新协议,它实现了浏览器与服务器之间的全双工通信(full-duplex)。

    48 引用 • 206 回帖 • 297 关注
  • FlowUs

    FlowUs.息流 个人及团队的新一代生产力工具。

    让复杂的信息管理更轻松、自由、充满创意。

    1 引用
  • 工具

    子曰:“工欲善其事,必先利其器。”

    295 引用 • 750 回帖 • 1 关注
  • Eclipse

    Eclipse 是一个开放源代码的、基于 Java 的可扩展开发平台。就其本身而言,它只是一个框架和一组服务,用于通过插件组件构建开发环境。

    76 引用 • 258 回帖 • 630 关注
  • Access
    1 引用 • 3 回帖 • 4 关注
  • 分享

    有什么新发现就分享给大家吧!

    247 引用 • 1794 回帖