84-使用非阻塞 I/O 改写回射客户端

这个程序,应该是相当复杂的。读完它需要一些耐心,不过我会力求突显程序的结构,删除无关的代码。

1. 回顾旧程序

旧版本程序的结构如下:

while(1) {
  rfds = {stdin, sockfd};
  select(rfds);
  if (stdin in rfds) {
    read(stdin);
    // 风险代码,可能产生阻塞
    writen(sockfd);
  }

  if (sockfd in rfds) {
    read(sockfd);
    writen(stdout);
  }
}

之前分析过,它的弱点在于 writen(sockfd) 会导致阻塞。为了解决这个问题,就不能使用 writen 函数,而应该改为 write,但是如果改成了 write,我们就不能保证 write 能一次将数据全部写入发送缓冲区。这导致的另一个麻烦就是我们不得不设置应用层缓冲区(不使用多进程多线程)。

那么新的写法看起来像是这样:

char to[4096];
int start = 0;
int end = 0;

while(1) {
  // ...
  // 往发送缓冲区添加数据
  nr = read(stdin, &to[end], 4096 - end); 
  end += nr;
  // 将发送缓冲区的数据发送出去
  nw = write(sockfd, &to[start], end - start);
  start += nw;
  // ...
  // to 中没有发送完的,就等着下一次再发送吧。

}

下面我们正式一点说。

2. 程序设计

2.1 思路


这里写图片描述
图1 数据流动过程

观察图 1,客户端设置了两个缓冲区:

  • to:存放从标准输入读取到的数据
  • from:保存从服务器发来的数据

to 中的灰色部分,表示已经发给服务了,而绿色部分,表示尚未发送的。 from 的灰色部分,表示已经写到标准输出了,绿色部分,表示还尚未写入到标准输出的。空白部分表示空闲。

使用指针 tostart, toend, fromstart, fromend, 就可以得到这三个区域的任何一个位置。

2.2 程序伪代码

  • 精简版本
while(1) {
  rfds = {stdin, sockfd};
  wfds = {stdout, sockfd};

  select(&rfds, &wfds);
  // 1. 读标准输入到缓冲区 to
  if (stdin in rfds) {
    read(stdin, toend);
    wfds.insert(sockfd);
  }

  // 2. 读套接字到缓冲区 from
  if (sockfd in rfds) {
    read(sockfd, fromend);
    wfds.insert(stdout);
  }

  // 3. 写数据到标准输出
  if (stdout in wfds) {
    write(stdout, fromstart);
  }

  // 4. 写数据到套接字
  if (sockfd in wfds) {
    write(sockfd, tostart);
  }
}

当你在阅读详细版本的时候,请参考精简版本的 4 个步骤来搞清逻辑。

  • 详细版本
char *to = malloc(length);
char *from = malloc(length);
tostart = toend = to;
fromstart = fromend = from;

fd_set rfds, wfds;
int stdinclosed = 0; // 标准输入是否关闭
int servclosed = 0; // 服务器是否关闭

// 重要!设置描述符为非阻塞 IO
setNonblock(sockfd);
setNonblock(stdin);
setNonblock(stdout);

while(1) {
  // 这一部分,表示将能够监听的 I/O 加入到对应的监听集合中去。
  // 如果缓冲区没有空闲,就没有监听的意义了。
  rfds.clear();
  // 查看收发缓冲区有没有空闲,有空闲就监听。
  if (stdinclosed == 0 && toend < to + length) rfds.insert(stdin);
  if (servclosed == 0 && fromend < from + length) rfds.insert(sockfd);

  // 查看收发缓冲区有没有数据,有数据就监听
  if (tostart < toend) wfds.insert(sockfd);
  if (fromstart < fromend) wfds.insert(stdout);

  // 有事件再往下执行
  select(&rfds, &wfds);

  // 1. 标准输入有数据则读入发送缓冲区
  if (stdin in rfds) {
     n = to + length - toend; // 白色部分空闲区大小
     nr = read(stdin, toend, n);
     if (nr < 0) {
       // 尽管 select 通知有数据可读,但是我们还是得预防 EWOUDLBLOCK 发生的可能性。
       // 如果使用阻塞 IO,结果就是程序在 read 上阻塞,这是不应该发生的情况。
       if (errno != EWOULDBLOCK) exit(1);
     }
     else if (nr == 0) {
        stdinclosed = 1; // 标准输入关闭,标志位置位
        if (tostart == toend) {
          // 发送缓冲区没有数据要发送了(没有绿色部分),半关闭。
          shutdown(sockfd, SHUT_WR);
        }
     }
     else {
       toend += nr; // 扩大绿色部分大小
       wfds.insert(sockfd); // 提前通知有写事件,你完全可以不写这一行
     }
  }

  // 2. 套接字有数据可读则读入 from 缓冲区
  if (sockfd in rfds) {
    n = from + length - fromend;
    nr = read(sockfd, fromend, n);  // 将数据读取到白色区域
    if (nr < 0) {
       if (errno != EWOULDBLOCK) exit(1);
    }
    else if (nr == 0) {
      servclosed = 1; // 服务器关闭标志位置位,此时不能直接退出,因为接收缓冲区可能还有绿色部分。
      if (fromstart == fromend) {
        // 接收缓冲区空闲,可以安全退出。如果标准输出速度非常慢,这个 if 很可能执行不到。
        LOG("1:finished\n");
        break;
      }
    }
    else {
      fromend += nr; // 绿色部分变长
      wfds.insert(stdout);
    }
  }

  // 3. 处理接收缓冲区(from 有绿色部分,将其写入标准输出)
  if (stdout in wfds && fromend - fromstart > 0) {
    n = fromend - fromstart;
    nw = write(stdout, fromstart, n);
    if (nw < 0) {
      if (errno != EWOULDBLOCK) exit(1); // 不是 EWOULDBLOCK 则出错
    }
    else {
      fromstart += nw; // 灰色部分变长,绿色减少
      if (fromstart == fromend) {
        fromstart = fromend = from; // 重置
        // 全部处理完成
        if (servclosed) { // 如果服务器已经关闭,说明数据全部处理完毕
          LOG("2:finished\n");
          break;
        }
      }
    }
  }


  // 4. 发送缓冲区有数据可发送(to 中有绿色部分,发送到服务器)
  if (sockfd in wfds && toend - tostart > 0) {
    n = toend - tostart;
    nw = write(sockfd, tostart, n);
    if (nw < 0) {
      if (errno != EWOULDBLOCK) exit(1); // 不是 EWOULDBLOCK 则出错
    }
    else {
      tostart += nw; // 灰色变长,绿色变短
      if (tostart == toend) { // 全部处理完毕
        tostart = toend = to; // 重置
        if (stdinclosed) {
          // 只有标准输入已经关闭的情况下才能关闭
          shutdonw(sockfd, SHUT_WR);
        }
      }
    }
  }
}

2.3 项目代码

这一段程序确实很长,需要考虑的东西太多。更加详细的代码请参考 gitos 托管的代码。

git clone https://git.oschina.net/ivan_allen/unp.git

如果你已经 clone 过这个代码了,请使用 git pull 更新一下。本节程序所使用的程序路径是 unp/program/nonblockio/nbio.

3. 实验

本次实验仍然分成两个部分,即缓冲区大小分别设置为 4096 和 1024000.

3.1 4096 字节

$ ./run_client.sh 4096 -v


这里写图片描述
图1 缓冲区大小为 4096 字节

3.2 1024000 字节

$ ./run_client.sh 1024000 -v


这里写图片描述
图2 缓冲区大小为 1024000 字节

3.3 结果分析

很幸运的是,大缓冲区下,客户端也没有阻塞。从图 1 和图 2 中的结果看,客户端与服务器的数据传输和处理的瓶颈在于步骤 2 和步骤 3.

注意到步骤 1 和步骤 4(将数据全部发到服务器)早已完成,而步骤 2 和步骤 3 的速度却很滞后。原因在于服务器端的 read 函数缓冲区太小,只有 4096 字节。可以通过适当的增大服务器端缓冲大小,图 3 显示的结果是将服务器 read 函数缓冲大小更改为 65536 后的结果。


这里写图片描述
图3 服务器缓冲大小更改为 65536

4. 标准输出比网络 IO 慢

注意观察图 1、2、3 中,客户端最后一行是 "1:finished!" 结束,如果标准输出的速度比网络 IO 还要慢,则下面这个步骤 2 中的这个 if 是不成立的:

if (服务器关闭) {
  // ...
  servclosed = 1;
  if (fromstart == fromend) {
    // 接收缓冲区空闲,可以安全退出。如果标准输出速度非常慢,这个 if 很可能执行不到。
    LOG("1:finished\n");
    break;
  }
}

此时应该执行到步骤 3 中的 if:


if (fromstart == fromend) {
  fromstart = fromend = from; // 重置
  // 全部处理完成
  if (servclosed) { // 如果服务器已经关闭,说明数据全部处理完毕
    LOG("2:finished\n");
    break;
  }
}

通过在客户端开启 --slow 选项,可以让标准输出速度慢下来。


这里写图片描述
图4 标准输出 IO 慢于网络 IO,最后一行输出 "2:finished!"

5. 总结

  • 掌握非阻塞 I/O + 缓冲区的客户端实现方法

思考:有些同学并没有将 stdin, stdout, sockfd 设置成非阻塞 IO,程序也能正常工作,这样做可以吗?如果有问题,问题在哪里?

最后,要提的是非阻塞 I/O 处理起来确实很麻烦,有时候代码的复杂程度可能会让你得不偿失。使用多线程 + 阻塞 I/O 其实是更为推荐的方法,而且程序的效率也不会比非阻塞 I/O 差多少(unp 一书中对不同的设计进行了测试对比)。

  • 3
    点赞
  • 1
    收藏
    觉得还不错? 一键收藏
  • 2
    评论

“相关推荐”对你有帮助么?

  • 非常没帮助
  • 没帮助
  • 一般
  • 有帮助
  • 非常有帮助
提交
评论 2
添加红包

请填写红包祝福语或标题

红包个数最小为10个

红包金额最低5元

当前余额3.43前往充值 >
需支付:10.00
成就一亿技术人!
领取后你会自动成为博主和红包主的粉丝 规则
hope_wisdom
发出的红包
实付
使用余额支付
点击重新获取
扫码支付
钱包余额 0

抵扣说明:

1.余额是钱包充值的虚拟货币,按照1:1的比例进行支付金额的抵扣。
2.余额无法直接购买下载,可以购买VIP、付费专栏及课程。

余额充值