Java NIO 学习

本贴最后更新于 2069 天前,其中的信息可能已经时移世易

1. 什么是 I/O

I/O (输入/输出 ) 指的是计算机与外部世界或者一个程序与计算机的其余部分的之间的接口。

1.1 Input/Output Streams

输入输出流的字面意思是一种不间断的流,在 Java 中,可以表示为不间断的数据流,流中的数据可以是字节、字符、对象等。
河流是一条水流,水源从一个源头流向目的地。类似地,在 Java I/O 中,数据从称为数据源的源流向称为数据接收器的目标。
数据从数据源中读到 Java 程序中,Java 程序将数据写入数据接收器。连接数据源和 Java 程序的流(Stream)叫做输入流(Input Stream),连接 Java 程序和数据接收器的流(Stream)叫做输出流(Output Stream)。
在诸如河流的自然溪流中,源头和目的地通过连续的水流连接。而在 Java I/O 中,Java 程序连接输入流(Input Stream)和输出流(Output Stream)。数据通过输入流从数据源流向 Java 程序,通过输出流从 Java 程序流向数据接收器。换句话说,Java 程序从输入流中读取数据并将数据写到输出流中。下图显示了从输入流到 Java 程序以及从 Java 程序到输出流的数据流。
Old_IO_package.png

1.2 Input/Output Streams 是怎样工作的

要将数据从数据源读取到 Java 程序,您需要执行以下步骤:

  • 识别数据源。它可以是文件,字符串,数组,网络连接等。
  • 使用您标识的数据源构造输入流。
  • 从输入流中读取数据。通常,您在循环中读取数据,直到您从输入流中读取所有数据。输入流的方法返回一个特殊值以指示输入流的结束。
  • 关闭输入流。请注意,构造输入流本身会打开它进行读取。没有明确的步骤来打开输入流。但是,必须在从中读取数据后关闭输入流。

要从 Java 程序将数据写入数据接收器,您需要执行以下步骤:

  • 识别数据接收器。也就是说,确定将写入数据的目的地。它可以是文件,字符串,数组,网络连接等。
  • 使用您标识的数据接收器构造输出流。
  • 将数据写入输出流。
  • 关闭输出流。请注意,构造输出流本身会打开它以进行写入。没有明确的步骤来打开输出流。但是,必须在完成向其写入数据后关闭输出流。

Java 中的输入/输出流类基于装饰器模式。

2. 什么是 NIO

基于流的 I/O 使用流在数据源/接收器和 Java 程序之间传输数据。Java 程序一次一个字节地读取或写入流。这种执行 I/O 操作的方法很慢。新 Input/Output(NIO)解决了旧的基于流的 I/O 中的慢速问题。

在 NIO 中,使用通道(channel)和缓冲区(buffer)来执行 I/O 操作。通道(channel)就流(stream)一样,它表示数据源/接收器和用于数据传输的 Java 程序之间的连接。

通道和流之间有一个差异:

  • 流用于单向数据传输。也就是说,输入流只能将数据从数据源传输到 Java 程序;输出流只能将数据从 Java 程序传输到数据接收器。
  • 但是,通道提供双向数据传输功能。

在 NIO 中,使用通道读取数据以及写入数据,也可以根据需要获取只读通道,只写通道或读写通道。

下图描绘了通道,缓冲区,数据源,数据接收器和 Java 程序之间的交互。

new_IO_package.png

在基于流的 I/O 中,数据传输的基本单位是一个__字节__。在基于通道的 NIO 中,数据传输的基本单元是__缓冲区__。

在基于流的 I/O 中,直接将数据写入流中。在基于通道的 I/O 中,读取数据时,先将数据读取到缓冲区中,再从缓冲区中获取数据;写入数据时,先将数据写入缓冲区,再将缓冲区传递给通道。

缓冲区(buffer)内部实现细节

NIO 中两个重要的缓冲区组件:状态变量和访问方法 (accessor)。

状态变量

可以用三个值指定缓冲区在任意时刻的状态:

  • position
  • limit
  • capacity

通过这三个变量来跟踪缓冲区的状态变化和数据。

position

缓冲区本质上是一个数据,缓冲区类型包括:

  • ByteBuffer
  • CharBuffer
  • ShortBuffer
  • IntBuffer
  • LongBuffer
  • FloadBuffer
  • DoubleBuffer

在从通道中读取数据时,数据被放在底层的数组中,position 变量跟踪已经写了多少数据,更准确的说,它指定了下一个字节将放到数组中的那个位置上。比如说如果我们已经往 buffer 中写入了三个字节,那么 position 被设置为 3,表示下一个字节存放的位置是 3,即数组中的第四个位置。

在向通道中写入数据时,position 追踪从缓冲区中取出了多少数据,更确切的说,它指定下一个字节来自数组中的哪个元素。比如说如果我们已经向通道中写入 4 个字节,那 position 被设置为 4,指向数组中第 5 个元素。

limit

表示还有多少数据写入(将数据从通道读入缓冲区中时),或者还有多少数据取出(将数据从缓冲区中写入通道时)。

position 总是小于或等于 limit。

capacity

缓冲区的容量,即底层数据的大小。

capacity>=limit>=position

举例说明缓存变量的变化过程

新建一个缓冲区,假设缓存区的容量(capacity)为 8。

limit 小于等于 capacity,我们让他指向数据的尾部之后。

position 设置为 0。

缓冲区初始状态如下:

figure1.gif

由于 capacity 不会改变,在下面的讨论中忽略它。

读取数据过程:

(1)第一次读入,从通道中读取 3 个字节写入缓冲区中,此时 position 为 3,即下一个字节存放的位置是 4,limit 不变;

figure2.gif

(2) 第二次读入,从通道中读取两个 2 字节写入缓冲区中,此时 position 变为 5,limit 不变;

figure3.gif

现在我们将缓冲区中的数据写入另一个通道中,在写入之前,需要先调用 flip()方法,该方法做两件事:

    1. 将 limit 设置为当前 position
    1. 将 position 这是为 0

flip 之后,缓冲区的情况如下,

figure4.gif

现在可以写入数据了,此时 position 是 0,limit 是 5,即下一个取出的字节是数组中第一个元素。

写数据过程:

(3)第一次写入,从缓存中取出 4 个字节写入通道中,此时 position 为 4,limit 不变;

figure5gif.gif

(4)第二次写入,从缓存中取出 1 个字节写入通道中,此时 position 为 5,limit 不变;

figure6.gif

最后,在数据写入完成后我们调用 clear()方法,重设缓冲区以接收更多的字节,该方法完成两件事:

    1. 将 limit 设置为和 capacity 相同
    1. 将 position 设置为 0

clear 之后,缓冲区状态如下,

figure7.gif

其实我们可以把上面的 flip()操作看作是读写模式的切换,对于读数据:当通道中的数据写入缓冲区后(写模式),调用 flip 切换到读模式,以便我们读取 buffer 中的数据;对于写数据:当我们把数据写入缓冲区后(写模式),调用 flip 切换到读模式,以便通道读取 buffer 中的数据。

buffers-modes.png

NIO 快速复制文件示例

public static void fastCopy(String src, String dist) throws IOException {

    // 获得源文件的输入字节流
    FileInputStream fin = new FileInputStream(src);

    // 获取输入字节流的文件通道
    FileChannel fcin = fin.getChannel();

    // 获取目标文件的输出字节流
    FileOutputStream fout = new FileOutputStream(dist);

    // 获取输出字节流的文件通道
    FileChannel fcout = fout.getChannel();

    // 为缓冲区分配 1024 个字节
    ByteBuffer buffer = ByteBuffer.allocateDirect(1024);

    while (true) {

        // 从输入通道中读取数据到缓冲区中
        int r = fcin.read(buffer);

        // 返回 -1 表示文件读取完成
        if (r == -1) {
            break;
        }

        // 切换读写模式
        buffer.flip();

        // 把缓冲区的内容写入输出文件中
        fcout.write(buffer);

        // 清空缓冲区
        buffer.clear();
    }
}

选择器(Selector)

NIO 常常被叫做非阻塞 IO,主要是因为 NIO 在网络通信中的非阻塞特性被广泛使用。

NIO 实现了 IO 多路复用中的 Reactor 模型,一个线程 Thread 使用一个选择器 Selector 通过轮询的方式去监听多个通道 Channel 上的事件,从而让一个线程就可以处理多个事件。

通过配置监听的通道 Channel 为非阻塞,那么当 Channel 上的 IO 事件还未到达时,就不会进入阻塞状态一直等待,而是继续轮询其它 Channel,找到 IO 事件已经到达的 Channel 执行。

因为创建和切换线程的开销很大,因此使用一个线程来处理多个事件而不是一个线程处理一个事件,对于 IO 密集型的应用具有很好地性能。

selector.png

1) 创建 selector

Selector selector = Selector.open();

2) 打开一个 ServerSocketChannel

ServerSocketChannel ssc = ServerSocketChannel.open();
ssc.configureBlocking( false );

ServerSocket ss = ssc.socket();
InetSocketAddress address = new InetSocketAddress("localhost",6789);
ss.bind( address );

第一行创建一个新的 ServerSocketChannel ,最后三行将它绑定到给定的端口。第二行将 ServerSocketChannel 设置为 非阻塞的 。我们必须对每一个要使用的套接字通道调用这个方法,否则异步 I/O 就不能工作。

3) 将通道注册到 selector 上

ssc.register(selector, SelectionKey.OP_ACCEPT);

在将通道注册到选择器上时,还需要指定要注册的具体事件,主要有以下几类:

  • SelectionKey.OP_CONNECT
  • SelectionKey.OP_ACCEPT
  • SelectionKey.OP_READ
  • SelectionKey.OP_WRITE
4) 监听事件

selector.select();

使用 select() 来监听到达的事件,它会一直阻塞直到有至少一个事件到达。

5) 获取到达的事件

Set selectionKeys = selector.selectedKeys();
Iterator iterator = selectionKeys.iterator();
while (iterator.hasNext()) {
SelectionKey selectionKey = iterator.next();
if (selectionKey.isAcceptable()) {
.....
} else if (selectionKey.isReadable()) {
.....
}
// 删除处理过的 selectionKey
iterator.remove();
}

6) 事件循环

因为一次 select() 调用不能处理完所有的事件,并且服务器端有可能需要一直监听事件,因此服务器端处理事件的代码一般会放在一个死循环内。

套接字 NIO 实例

public class NIOServer {

  private static ByteBuffer serverBuffer = ByteBuffer.allocate(1024);

  public static void main(String[] args) throws IOException {
    // 创建一个selector
    Selector selector = Selector.open();
    // 创建一个ServerSocketChannel,接收新的连接
    ServerSocketChannel ssc = ServerSocketChannel.open();
    // 设置为非阻塞的,必须设置为false,否则异步IO不能工作
    ssc.configureBlocking(false);

    // 把上面创建的ServerSocketChannel绑定到指定端口
    ServerSocket ss = ssc.socket();
    InetSocketAddress address = new InetSocketAddress("localhost",6789);
    ss.bind(address);

    //将ServerSocketChannel注册到selector上,并监听accept事件
    ssc.register(selector, SelectionKey.OP_ACCEPT);
    while (true) {
      selector.select();
      Set<SelectionKey> selectionKeys = selector.selectedKeys();
      Iterator<SelectionKey> iterator = selectionKeys.iterator();
      while (iterator.hasNext()) {
        SelectionKey selectionKey = iterator.next();
        if (selectionKey.isAcceptable()) {
          ServerSocketChannel ssChannel = (ServerSocketChannel) selectionKey.channel();
          // 服务器为每个新的连接创建SocketChannel
          SocketChannel socketChannel = ssChannel.accept();
          // 设置为非阻塞模式
          socketChannel.configureBlocking(false);
          // 注册到selector上,监听读事件
          socketChannel.register(selector, SelectionKey.OP_READ);
        } else if (selectionKey.isReadable()) {
          //读取数据
          SocketChannel socketChannel = (SocketChannel) selectionKey.channel();
          System.out.println(readDataFromSocketChannel(socketChannel));
          socketChannel.close();
        }
        // 删除处理过的selectionKey
        iterator.remove();
      }
    }
  }

  private static String readDataFromSocketChannel(SocketChannel sc) throws IOException {
    StringBuilder sb = new StringBuilder();
    while (true) {
      int len = sc.read(serverBuffer);
      if (len <= 0) {
        break;
      }
      serverBuffer.flip();
      int limit = serverBuffer.limit();
      char[] dst = new char[limit];
      for (int i = 0; i < limit; i++) {
        dst[i] = (char) serverBuffer.get();
      }
      sb.append(dst);
      serverBuffer.clear();
    }
    return sb.toString();
  }
}
public class NIOClient {

  public static void main(String[] args) throws IOException {
    final CountDownLatch latch = new CountDownLatch(1);
    for (int i = 0; i < 1; i++) {
      new Thread(new Runnable() {
        @Override
        public void run() {
          try {
            latch.await();
            Socket socket = new Socket("localhost",6789);
            OutputStream out=socket.getOutputStream();
            String msg = "hello world!";
            out.write(msg.getBytes());
            socket.close();
          } catch (IOException e) {
            e.printStackTrace();
          } catch (InterruptedException e) {
            e.printStackTrace();
          }
        }
      }).start();
    }
    latch.countDown();
  }
}
  • Java

    Java 是一种可以撰写跨平台应用软件的面向对象的程序设计语言,是由 Sun Microsystems 公司于 1995 年 5 月推出的。Java 技术具有卓越的通用性、高效性、平台移植性和安全性。

    3186 引用 • 8212 回帖

相关帖子

欢迎来到这里!

我们正在构建一个小众社区,大家在这里相互信任,以平等 • 自由 • 奔放的价值观进行分享交流。最终,希望大家能够找到与自己志同道合的伙伴,共同成长。

注册 关于
请输入回帖内容 ...