3. Netty 初认识 --NIO 入门 --- 示例 (完整代码)

本贴最后更新于 1492 天前,其中的信息可能已经事过景迁

首先启动 TimeServer 服务端开始监听
其次启动 TimeClient 连接服务端,并发送固定指令 QUERY TIME ORDER 查询当前时间
最后服务端返回当前时间

具体代码如下(可运行)

TimeServer.java

package club.wujingjian.com.wujingjian.nio.server;

public class TimeServer {

    public static void main(String[] args) {
        int port = 9090;

        if (args != null && args.length > 0) {
            try {
                port = Integer.parseInt(args[0]);
            } catch (NumberFormatException e) {
                e.printStackTrace();
            }
        }
        MultiplexerTimeServer timeServer = new MultiplexerTimeServer(port);

        new Thread(timeServer, "NIO-MultiplexerTimeServer-001").start();
    }
}

MultiplexerTimeServer.java

package club.wujingjian.com.wujingjian.nio.server;

import club.wujingjian.com.wujingjian.nio.Util;

import java.io.IOError;
import java.io.IOException;
import java.net.InetSocketAddress;
import java.nio.ByteBuffer;
import java.nio.channels.SelectionKey;
import java.nio.channels.Selector;
import java.nio.channels.ServerSocketChannel;
import java.nio.channels.SocketChannel;
import java.util.Date;
import java.util.Iterator;
import java.util.Set;

public class MultiplexerTimeServer implements  Runnable {

    private Selector selector;

    private ServerSocketChannel servChannel;

    private volatile  boolean stop;

    /**
     * 初始化多路复用器、绑定监听端口
     * 进行资源初始化,创建多路复用器Selector、ServerSocketChannel,
     * 对Channel和TCP参数进行配置,(例如,将ServerSocketChannel设置为异步非阻塞模式,它的backlog设置为1024)
     *
     */
    public MultiplexerTimeServer(int port) {
        try {
            selector = Selector.open();
            servChannel = ServerSocketChannel.open();
            servChannel.configureBlocking(false);
            servChannel.socket().bind(new InetSocketAddress(port), 1024);
            //将ServerSocketChannel注册到Selector,监听SelectionKey.OP_ACCEPT操作位
            servChannel.register(selector, SelectionKey.OP_ACCEPT);
            System.out.println(Util.nowDate() + "The time server is start in port : " + port);
        } catch (IOException e) {
            e.printStackTrace();;
            System.exit(1);
        }
    }

    public void stop(){
        this.stop = true;
    }
    @Override
    public void run() {
        while (!stop) {
            /**
             * 循环遍历selector,休眠时间为1s
             */
            try {
                selector.select(1000);
                Set<SelectionKey> selectedKeys = selector.selectedKeys();
                Iterator<SelectionKey> it = selectedKeys.iterator();
                SelectionKey key = null;
                while (it.hasNext()) {
                    key = it.next();
                    it.remove();
                    try {
                        handleInput(key);
                    } catch (Exception e) {
                        key.cancel();
                        if (key.channel() != null) {
                            key.channel().close();
                        }
                    }
                }
            } catch (Throwable throwable) {
                throwable.printStackTrace();
            }
        }

        //多路复用器关闭后,所有注册在上面的channel 和pipe 等资源都会被自动去注册并关闭,所以不需要重复释放资源
        if (selector != null) {
            try {
                selector.close();
            } catch (IOException e) {
                e.printStackTrace();
            }
        }
    }



    private void handleInput(SelectionKey key) throws  IOException {
        if (key.isValid()) {
            /**
             *处理新接入的客户端请求信息,根据SelectionKey的操作位进行判断即可获知网络事件的类型
             */
            if (key.isAcceptable()) {
                //接收新连接
                ServerSocketChannel ssc = (ServerSocketChannel) key.channel();
                //通过ServerSocketChannel的accept()接收客户端的连接请求并创建SocketChannel实例.相当于三次握手
                SocketChannel sc = ssc.accept();
                sc.configureBlocking(false);
                //添加新连接到多路复用器Selector
                sc.register(selector, SelectionKey.OP_READ);
            }
            if (key.isReadable()) {
                /**
                 * 读取客户端的请求消息,首先创建一个ByteBuffer,由于事先无法得知客户端发送的码流大小,作为例程,我们开辟一个1K的缓冲区,
                 * 然后调用SocketChannel的read方法读取请求码流.
                 * 注意: 由于我们已经将SocketChannel设置为异步非阻塞模式,因此他的read是非阻塞的,  socketChannel.read()的返回值有以下三种情况
                 *     返回值大于0:  读取到了字节,对字节进行编解码.
                 *     返回值等于0: 没有读取到字节,属于正常场景,忽略
                 *     返回值为-1 : 链路已经关闭,需要关闭SocketChannel,释放资源
                 */
                SocketChannel sc = (SocketChannel) key.channel();
                ByteBuffer readBuffer = ByteBuffer.allocate(1024);
                int readBytes = sc.read(readBuffer);
                if (readBytes > 0) {
                    //读取到码流以后,我们进行解码,首先对readBuffer进行flip操作,作用是将缓冲区当前的limit设置为position,position设置为0,用于后续对缓冲区的读取操作
                    readBuffer.flip();
                    //根据缓冲区可读的字节个数创建字节数组
                    byte[] bytes = new byte[readBuffer.remaining()];
                    //将缓冲区可读的字节数组复制到新创建的字节数组bytes中.
                    readBuffer.get(bytes);
                    String body = new String(bytes, "UTF-8");
                    System.out.println(Util.nowDate() + "The time server receive order : " + body);
                    //如果请求指令是"QUERY TIME ORDER" 则把服务器的当前时间编码后返回给客户端
                    String currentTime = "QUERY TIME ORDER".equalsIgnoreCase(body) ? new Date(System.currentTimeMillis()).toString() : "BAD ORDER";
                    System.out.println(Util.nowDate() + "服务器准备返回时间为 :" + currentTime);
                    doWrite(sc, currentTime);
                } else if (readBytes < 0) {
                    //对端链路关闭
                    key.cancel();
                    sc.close();
                } else{
                    // 读取到0字节,忽略
                }

            }
        }
    }

    private void doWrite(SocketChannel channel, String response) throws  IOException{
        if (response != null && response.trim().length() > 0) {
            //将字符串编码成字节数组,
            byte [] bytes = response.getBytes();
            //根据字节数组容量创建ByteBuffer
            ByteBuffer writeBuffer = ByteBuffer.allocate(bytes.length);
            //将字节数组复制到缓冲区中
            writeBuffer.put(bytes);
            //将缓冲区的position设置为0,以便准备输出
            writeBuffer.flip();
            /**
             *将缓冲区中的字节数组发送出去
             * 注意:    由于SocketChannel是异步非阻塞的,它并不能保证一次性把需要发送的字节数组发送完毕,此时会出现"写半包"的问题,
             *      我们需要注册写操作,不断轮询Selector将没有发送完的ByteBuffer发送完毕,可以通过ByteBuffer的hasRemaining()方法
             *      判断消息是否发送完成。此处仅仅是个入门示例,没有处理"写半包"的情况.
             */
            channel.write(writeBuffer);
            if (!writeBuffer.hasRemaining()) {
                System.out.println(Util.nowDate() + "发送成功");
            }
        }
    }
}

下面是客户端代码

TimeClient.java

package club.wujingjian.com.wujingjian.nio.client;

public class TimeClient {
    public static void main(String[] args) {
        int port = 9090;
        if (args != null && args.length > 0) {
            try {
                port = Integer.parseInt(args[0]);
            } catch (NumberFormatException e) {
                e.printStackTrace();
            }
        }
        new Thread(new TimeClientHandle("127.0.0.1",port), "TimeClient-001").start();
    }
}

TimeClientHandle.java

package club.wujingjian.com.wujingjian.nio.client;

import club.wujingjian.com.wujingjian.nio.Util;

import java.io.IOException;
import java.net.InetSocketAddress;
import java.nio.ByteBuffer;
import java.nio.channels.SelectionKey;
import java.nio.channels.Selector;
import java.nio.channels.SocketChannel;
import java.util.Iterator;
import java.util.Set;

public class TimeClientHandle implements Runnable {

    private String host;
    private int port ;
    private Selector selector;
    private SocketChannel socketChannel;
    private volatile boolean stop;

    /**
     * 构造函数初始化NIO多路复用器和SocketChannel对象,并将socketChannel设置为异步非阻塞模式
     * @param host
     * @param port
     */
    public TimeClientHandle(String host, int port) {
        this.host = host == null ?"127.0.0.1": host;
        this.port = port;
        try {
            selector = Selector.open();
            socketChannel = SocketChannel.open();
            socketChannel.configureBlocking(false);
        } catch (IOException e) {
            e.printStackTrace();
            System.exit(1);
        }
    }

    @Override
    public void run() {
        try {
            //发送连接请求,作为示例,连接是成功的,所以不需要做重连操作,实际代码中,可能要考虑重连,不能放到while循环外面
            doConnect();
        } catch (IOException e) {
            e.printStackTrace();
            System.exit(1);
        }
        while (!stop) {
            try {
                selector.select(1000);
                Set<SelectionKey> selectedKeys = selector.selectedKeys();
                Iterator<SelectionKey> it = selectedKeys.iterator();
                SelectionKey key = null;
                //当有就绪的Channel时候执行handleInput方法
                while (it.hasNext()) {
                    key = it.next();
                    it.remove();
                    try {
                        handleInput(key);
                    } catch (Exception e) {
                        if (key != null) {
                            key.cancel();
                            if (key.channel() != null) {
                                key.channel().close();
                            }
                        }
                    }
                }
            } catch (Exception e) {
                e.printStackTrace();
                System.exit(1);
            }
        }

        //多路复用器关闭后,所有注册在上面的channel和pipe等资源都会被自动去注册并关闭,所以不需要重复释放资源
        if (selector != null) {
            try {
                selector.close();
            } catch (IOException e) {
                e.printStackTrace();
            }
        }
    }

    private void handleInput(SelectionKey key) throws  IOException {
        if (key.isValid()) {
            //判断是否连接成功
            SocketChannel sc = (SocketChannel) key.channel();
            if (key.isConnectable()) { //如果SelectionKey处于连接状态,说明服务端已经返回ACK应答消息,这是我们需要对结果进行判断
                if (sc.finishConnect()) { //如果sc.finishConnect()返回true,说明客户端连接成功,
                    System.out.println(Util.nowDate() + "客户端连接成功");
                    sc.register(selector, SelectionKey.OP_READ); //将SocketChannel注册到多路复用器上,注册SelectionKey.OP_READ操作位,监听网络读操作,
                    doWrite(sc); //然后发送请求消息给服务端
                } else {
                    System.out.println("连接失败,进程退出");
                    System.exit(1); //连接失败,进程退出
                }
            }

            if (key.isReadable()) {
                ByteBuffer readBuffer = ByteBuffer.allocate(1024);
                int readBytes = sc.read(readBuffer);
                if (readBytes > 0) {
                    readBuffer.flip();
                    byte[] bytes = new byte[readBuffer.remaining()];
                    readBuffer.get(bytes);
                    String body = new String(bytes, "UTF-8");
                    System.out.println(Util.nowDate() + "Now is :" + body);
                    this.stop = true;
                } else if (readBytes < 0) {
                    //对端链路关闭
                    key.cancel();
                    sc.close();
                } else {
                    //读到0 字节,忽略
                }
            }
        }
    }

    private void doConnect() throws  IOException {
        //如果直连成功,则注册到多路复用器上,发送请求消息,读应答
        if (socketChannel.connect(new InetSocketAddress(host, port))) {
            socketChannel.register(selector, SelectionKey.OP_READ);
            doWrite(socketChannel);
        } else {
            /**
             * 如果没有直接连接成功,则说明服务端没有返回TPC握手应答消息,担这并不代表连接失败.
             * 我们需要将SocketChannel注册到多路复用器Selector上,注册SelectionKey.OP_CONNECT,
             * 当服务端返回TCP syn-ack消息后,Selector就能够轮询到这个SocketChannel处于连接就绪状态
             */
            socketChannel.register(selector, SelectionKey.OP_CONNECT);
        }
    }

    private void doWrite(SocketChannel sc) throws IOException {
        byte[] req = "QUERY TIME ORDER".getBytes();
        ByteBuffer writeBuffer = ByteBuffer.allocate(req.length);
        writeBuffer.put(req);
        writeBuffer.flip();
        sc.write(writeBuffer); //此时可能会存在"半包写"问题,实际工作中要考虑
        if (!writeBuffer.hasRemaining()) { //此时如果缓冲区中消息全部发送完成,打印如下消息Send order 2 server succeed.
            System.out.println(Util.nowDate() + "Send order 2 server succeed.");
        }
    }
}

先允许服务器端,后允许客户端代码最后输出分别为:

image.png

image.png

  • Netty

    Netty 是一个基于 NIO 的客户端-服务器编程框架,使用 Netty 可以让你快速、简单地开发出一个可维护、高性能的网络应用,例如实现了某种协议的客户、服务端应用。

    49 引用 • 33 回帖 • 20 关注

相关帖子

欢迎来到这里!

我们正在构建一个小众社区,大家在这里相互信任,以平等 • 自由 • 奔放的价值观进行分享交流。最终,希望大家能够找到与自己志同道合的伙伴,共同成长。

注册 关于
请输入回帖内容 ...

推荐标签 标签

  • Redis

    Redis 是一个开源的使用 ANSI C 语言编写、支持网络、可基于内存亦可持久化的日志型、Key-Value 数据库,并提供多种语言的 API。从 2010 年 3 月 15 日起,Redis 的开发工作由 VMware 主持。从 2013 年 5 月开始,Redis 的开发由 Pivotal 赞助。

    284 引用 • 247 回帖 • 210 关注
  • VirtualBox

    VirtualBox 是一款开源虚拟机软件,最早由德国 Innotek 公司开发,由 Sun Microsystems 公司出品的软件,使用 Qt 编写,在 Sun 被 Oracle 收购后正式更名成 Oracle VM VirtualBox。

    10 引用 • 2 回帖 • 1 关注
  • Swagger

    Swagger 是一款非常流行的 API 开发工具,它遵循 OpenAPI Specification(这是一种通用的、和编程语言无关的 API 描述规范)。Swagger 贯穿整个 API 生命周期,如 API 的设计、编写文档、测试和部署。

    26 引用 • 35 回帖 • 7 关注
  • 架构

    我们平时所说的“架构”主要是指软件架构,这是有关软件整体结构与组件的抽象描述,用于指导软件系统各个方面的设计。另外还有“业务架构”、“网络架构”、“硬件架构”等细分领域。

    139 引用 • 441 回帖
  • 互联网

    互联网(Internet),又称网际网络,或音译因特网、英特网。互联网始于 1969 年美国的阿帕网,是网络与网络之间所串连成的庞大网络,这些网络以一组通用的协议相连,形成逻辑上的单一巨大国际网络。

    96 引用 • 330 回帖
  • SSL

    SSL(Secure Sockets Layer 安全套接层),及其继任者传输层安全(Transport Layer Security,TLS)是为网络通信提供安全及数据完整性的一种安全协议。TLS 与 SSL 在传输层对网络连接进行加密。

    69 引用 • 190 回帖 • 500 关注
  • Q&A

    提问之前请先看《提问的智慧》,好的问题比好的答案更有价值。

    6364 引用 • 28621 回帖 • 263 关注
  • Telegram

    Telegram 是一个非盈利性、基于云端的即时消息服务。它提供了支持各大操作系统平台的开源的客户端,也提供了很多强大的 APIs 给开发者创建自己的客户端和机器人。

    5 引用 • 35 回帖
  • 快应用

    快应用 是基于手机硬件平台的新型应用形态;标准是由主流手机厂商组成的快应用联盟联合制定;快应用标准的诞生将在研发接口、能力接入、开发者服务等层面建设标准平台;以平台化的生态模式对个人开发者和企业开发者全品类开放。

    15 引用 • 127 回帖
  • 酷鸟浏览器

    安全 · 稳定 · 快速
    为跨境从业人员提供专业的跨境浏览器

    3 引用 • 59 回帖 • 21 关注
  • Sublime

    Sublime Text 是一款可以用来写代码、写文章的文本编辑器。支持代码高亮、自动完成,还支持通过插件进行扩展。

    10 引用 • 5 回帖 • 3 关注
  • HBase

    HBase 是一个分布式的、面向列的开源数据库,该技术来源于 Fay Chang 所撰写的 Google 论文 “Bigtable:一个结构化数据的分布式存储系统”。就像 Bigtable 利用了 Google 文件系统所提供的分布式数据存储一样,HBase 在 Hadoop 之上提供了类似于 Bigtable 的能力。

    17 引用 • 6 回帖 • 31 关注
  • Laravel

    Laravel 是一套简洁、优雅的 PHP Web 开发框架。它采用 MVC 设计,是一款崇尚开发效率的全栈框架。

    19 引用 • 23 回帖 • 675 关注
  • 学习

    “梦想从学习开始,事业从实践起步” —— 习近平

    160 引用 • 470 回帖 • 1 关注
  • 微服务

    微服务架构是一种架构模式,它提倡将单一应用划分成一组小的服务。服务之间互相协调,互相配合,为用户提供最终价值。每个服务运行在独立的进程中。服务于服务之间才用轻量级的通信机制互相沟通。每个服务都围绕着具体业务构建,能够被独立的部署。

    96 引用 • 155 回帖 • 1 关注
  • 开源中国

    开源中国是目前中国最大的开源技术社区。传播开源的理念,推广开源项目,为 IT 开发者提供了一个发现、使用、并交流开源技术的平台。目前开源中国社区已收录超过两万款开源软件。

    7 引用 • 86 回帖
  • Chrome

    Chrome 又称 Google 浏览器,是一个由谷歌公司开发的网页浏览器。该浏览器是基于其他开源软件所编写,包括 WebKit,目标是提升稳定性、速度和安全性,并创造出简单且有效率的使用者界面。

    60 引用 • 287 回帖
  • Lute

    Lute 是一款结构化的 Markdown 引擎,支持 Go 和 JavaScript。

    25 引用 • 191 回帖 • 16 关注
  • 分享

    有什么新发现就分享给大家吧!

    240 引用 • 1729 回帖
  • JRebel

    JRebel 是一款 Java 虚拟机插件,它使得 Java 程序员能在不进行重部署的情况下,即时看到代码的改变对一个应用程序带来的影响。

    26 引用 • 78 回帖 • 618 关注
  • 人工智能

    人工智能(Artificial Intelligence)是研究、开发用于模拟、延伸和扩展人的智能的理论、方法、技术及应用系统的一门技术科学。

    66 引用 • 124 回帖
  • Tomcat

    Tomcat 最早是由 Sun Microsystems 开发的一个 Servlet 容器,在 1999 年被捐献给 ASF(Apache Software Foundation),隶属于 Jakarta 项目,现在已经独立为一个顶级项目。Tomcat 主要实现了 JavaEE 中的 Servlet、JSP 规范,同时也提供 HTTP 服务,是市场上非常流行的 Java Web 容器。

    163 引用 • 529 回帖
  • JVM

    JVM(Java Virtual Machine)Java 虚拟机是一个微型操作系统,有自己的硬件构架体系,还有相应的指令系统。能够识别 Java 独特的 .class 文件(字节码),能够将这些文件中的信息读取出来,使得 Java 程序只需要生成 Java 虚拟机上的字节码后就能在不同操作系统平台上进行运行。

    180 引用 • 120 回帖 • 1 关注
  • SVN

    SVN 是 Subversion 的简称,是一个开放源代码的版本控制系统,相较于 RCS、CVS,它采用了分支管理系统,它的设计目标就是取代 CVS。

    29 引用 • 98 回帖 • 686 关注
  • WebComponents

    Web Components 是 W3C 定义的标准,它给了前端开发者扩展浏览器标签的能力,可以方便地定制可复用组件,更好的进行模块化开发,解放了前端开发者的生产力。

    1 引用 • 25 关注
  • iOS

    iOS 是由苹果公司开发的移动操作系统,最早于 2007 年 1 月 9 日的 Macworld 大会上公布这个系统,最初是设计给 iPhone 使用的,后来陆续套用到 iPod touch、iPad 以及 Apple TV 等产品上。iOS 与苹果的 Mac OS X 操作系统一样,属于类 Unix 的商业操作系统。

    84 引用 • 139 回帖 • 1 关注
  • JSON

    JSON (JavaScript Object Notation)是一种轻量级的数据交换格式。易于人类阅读和编写。同时也易于机器解析和生成。

    51 引用 • 190 回帖