二、分布式通信基础
分布式基于网络进行服务通信,所以需要了解一下网络,才可以更好的理解分布式通信。
网络基础
什么是网络?
类比快递,快递是为了解决两地之间的物品运输问题,网络则是解决两点之间的数据传输问题。
网络分层
OSI 七层模型与 TCP 四层模型为网络通信定制了标准,二者都是基于分层的思想。分层具有以下优点:
- 促进标准化工作,允许各个供应商进行独立开发
- 各个层间相互独立,把网络操作分成低复杂性单元
- 灵活性好,某一层变化不会影响到别的层
- 各层间通过一个接口在邻层通信,解耦
OSI 七层模型
- 应用层,网络进程访问层,为应用程序进程(比如电子邮件、http 服务)提供网络服务
- 表示层,确认通信双方都可以读懂此数据
- 会话层,建立、管理以及终止应用程序之间的会话
- 传输层,用于定义数据如何传输,保证数据可靠性,比如数据的切片(快递公司将自行车拆分,并装箱)、重组(你收到快递后将自行车组装起来)
- 网络层,用于寻址,即找到要进行数据传输的计算机,通常是 IP (主要设备:路由器)
- 数据链路层,IP 地址用于表示点到点之间的地址,但是中间会经过很多交换机,这些路由器则是使用 mac 地址进行寻址的,数据链路层就是负责相邻设备间的寻址 (主要设备:交换机)
- 物理层,用于二进制传输,比如光纤、电信号、光信号等
TCP/IP 四层模型
但是平时学习交流时,通常将 OSI 七层模型与 TCP 四层模型的优点结合为五层模型:
各层之间的协议
- 应用层:http、SMTP
- 传输层:TCP、UDP
- 网络层:IP、ICMP(控制报文协议)、IGMP(internet 组管理协议)
- 链路层:硬件接口、ARP(地址解析协议)、RAPP(反向地址转换协议)
- 物理层:物理传输介质
传输层协议
TCP 相较于 UDP,具有可靠性,但是相对的占用系统资源;而 UDP 没有 TCP 那些可靠的机制,所以相比较于 TCP 更快。
单工、半双工、全双工
根据通信双方的分工和信号传输方向可将通信分为三种方式:单工、半双工与全双工
单工:数据只能在单个方向上传输。比如广播
半双工:数据可以在两个方向上传输,在同一时刻只允许一个方向传输。 比如对讲机
全双工:数据可以在两个方向上传输,在同一时刻也允许两个方向传输。比如手机通话
TCP 和 UDP 都是全双工
TCP 协议
TCP(Transmission Control Protocol)传输控制协议,是主机对主机层的传输控制协议,用于提供可靠的连接服务。为了保证 TCP 协议的可靠性,保证数据正确的传输给对方,在连接时需要进行三次握手,确保双方可以正常通信,在断开连接时需要进行四次挥手,确保双方数据都传输完毕,保证数据传输完毕。
三次握手协议
三次握手(Three-Way-Handshake)即建立 TCP 连接,就是指建立一个 TCP 连接时,客户端和服务端总共需要发送三个包确认连接的建立
借用一下这个小例子,来描述三次握手的流程:
土豆土豆 我是地瓜,收到请回答
地瓜收到,土豆收到请回答
土豆收到
步骤 1 和 2:确认地瓜可以正常收到土豆的消息
步骤 2 和 3:确认土豆可以正常收到地瓜的消息
这三步就可以保证土豆和地瓜正常通信,这就是三次握手。下面是具体流程:
- 客户端发送第一次握手请求,并携带 SYN=j(j 是一个唯一的随机数,代表此次请求),此时客户端此时变为已发送的状态(SYN_SEND)
- 服务端接收到客户端的握手请求,此时进入 LISTEN 打开状态
- 服务端发送 ACK 确认包,此时是第二次握手,ACK=j+1 表示是哪个请求,并再次发送 SYN=k(k 也是唯一随机数,用来代表此次请求)
- 客户但接收到 ACK 信息,此时客户端进入 ESTABLISHED 状态,代表客户端到服务端信息发送正常
- 客户端发送 ACK 确认信息给服务端,此时是第三次握手,告诉服务端可以正常收到他的信息
- 服务端收到 ACK 信息后,将状态变为 ESTABLISHED,双方可以正常通信, 整个握手流程结束
四次挥手协议
四次挥手(Four-Way-Wavehand)即终止 TCP 连接,就是断开一个 TCP 连接时,客户端和服务短短总共需要发送四个包确认连接的断开
目的:保证客户端和服务端都没有消息传递了再关闭
男朋友(客户端):我有点事先不聊了
女朋友(服务端):好的,等下,我还有点事没讲完
...
女朋友(服务端):好了,我没事了挂了吧
男朋友(客户端):喂.... (随即男朋友也挂断了电话)
通过四次通信,确认 tcp 通信已经完成,并关闭连接,这就是四次挥手,下面是具体流程:
- 客户端发送 FIN M(M 是发送 id),表示客户端要结束了,此时客户端进入 FIN_WAIT_1 状态
- 服务端接收到请求后,进入 CLOSE_WAIT 状态(等待关闭),再向客户端发送 ACK 确认包,告诉客户端,我已经接收到了关闭请求
- 等到服务端数据传送完毕,发送 FIN N 给客户端,告诉客户端处理完毕,即将进入关闭状态
- 客户端接收到 FIN N,返回 ACK 确认包,此时服务端进入关闭状态
TCP 长连接、短连接
长连接,指在一个 TCP 连接上可以连续发送多个数据包,在 TCP 连接保持期间,如果没有数据包发送,需要双方发检测包以维持此连接(心跳机制),一般需要自己做在线维持。
过程: 连接→数据传输→保持连接(心跳)→数据传输→保持连接(心跳)→……→关闭连接
短连接是指通信双方有数据交互时,就建立一个 TCP 连接,数据发送完成后,则断开此 TCP 连接,比如 http 连接
TCP 通信原理
对于 TCP 通信来说,每个 TCP Socket 的内核中都有一个发送缓冲区和一个接收缓冲区,TCP 全双工的工作模式及 TCP 的滑动窗口就是依赖于这两个独立的 Buffer 和该 Buffer 的填充状态。
进程调用 Socket 的 send 发送数据的时候,一般情况下是将数据从应用层用户的 Buffer 里复制到 Socket 的内核发送缓冲区,然后 send 就会在上层返回。换句话说,send 返回时,数据不一定会被发送到对端。
接收端接收到数据后,将数据放在接收缓冲区中,若应用进程一直没有调用 Socket 的 read 方法进行读取,那么该数据会一直被缓存在接收缓冲区内。不管进程是否读取 Socket,对端发来的数据都会经过内核接收并缓存到 Socket 的内核接收缓冲区。
read 索要做的工作,就是把内核接收缓冲区中的数据复制到应用层用户的 Buffer 里。
滑动窗口协议
滑动窗口协议用于控制流量,TCP 滑动窗口分为接受窗口和发送窗口。滑动窗口协议是传输层进行流控的一种措施,接收方通过通告发送方自己的窗口大小,从而控制发送方的发送速度,从而达到防止发送方发送速度过快而导致自己被淹没的目的。
视频中发送端向接收端发送 segment 数据片段,接收端接收到数据片段后给发送端发送 ACK 确认包。当发送端接收到 3 个 ack 信号时,即发送端滑动窗口满了的时候,才会再次发送,如果未满,说明接收端仍然在处理。而接收端只有在收到 3 个 segemnt 信号时,滑动窗口才会移动,才能再次接收 segement。从而达到限流的目的。
关于 IO
TCP 进行报文发送以及接收时,会经过一个缓冲区,如下图:
当发送请求的数据大小大于 TCP 发送缓冲区的剩余空间时,这时就会阻塞,直到发送缓冲区的剩余大小足够存放数据。
当接收请求的数据大小大于 TCP 接收缓冲区的剩余空间时,这时就会阻塞,知道接收缓冲区的剩余大小足够存放数据。
这就是 BIO。
为了解决这类问题,就出现了 NIO,Non-Blocking IO 非阻塞 IO。NIO 在底层存在一个 I/O 调度线程,不断的扫描每个 socket 的缓冲区,当发现 TCP 缓冲区空间不足时,主动通知发送端/接收端,当缓冲区大小够的时候,主动通知发送端/接收端进行发送/接收。
- BIO(同步阻塞):等待我的鸡蛋饼做好,然后拿走
- NIO(同步非阻塞):付完钱后,告诉小哥一声,我等会儿过来拿
- AIO(异步非阻塞):告诉小哥鸡蛋饼做好给我送到我家(外卖)
Java 网络通信
Java 中提供了一套网络 API 用于网络通信,可以使用这套 API 进行分布式网络通信。其中就包含 UDP 与 TCP 以及组播 Multicast 的实现:
UDP:
DatagramSocket
DatagramChannel
TCP
Socket/ServerSocket
SocketChannel
Multicast
MulticastSocket
注:组播
- 单播,点对点通信
- 广播,一对多通信
- 组播,介于单播和广播之间,针对组进行通信
除此之外,也可以基于开源框架进行分布式通信,比如 Mina、Netty 等,你可以理解他们是对 socket 的封装和增强。
Socket/ServerSocket
Socket 是对传输层的封装,所以除了 IP 地址以及端口,还需要指定通讯协议(应用层协议,比如 http),才可进行两点之间的通信。
示例代码:
// Socket服务端
public class SocketServer {
public static void main(String[] args) throws Exception {
ServerSocket server = new ServerSocket(8888);
while (true) {
final Socket socket = server.accept();
new Thread() {
@Override
public void run() {
try {
BufferedReader reader = new BufferedReader(new InputStreamReader(socket.getInputStream()));
PrintWriter writer = new PrintWriter(new OutputStreamWriter(socket.getOutputStream()));
while (true) {
//读取客户端发送过来的消息
String line = reader.readLine();
if (line == null) {
break;
} else {
System.out.println(System.currentTimeMillis() + "服务端收到数据:" + line);
}
//给客户端发送一条消息回复
writer.println(" - 我收到了你的消息,客户端");
writer.flush();
}
} catch (IOException e) {
e.printStackTrace();
}
}
}.start();
}
}
}
// Socket客户端
public class SocketClient {
public static void main(String[] args) throws Exception {
Socket socket = new Socket("localhost", 8888);
BufferedReader reader = new BufferedReader(new InputStreamReader(socket.getInputStream())); //读取服务端信息
PrintWriter writer = new PrintWriter(socket.getOutputStream(), true); // 往服务端写数据
writer.println("你好,服务端");
while (true) {
String serverData = reader.readLine();
if (serverData == null) {
break;
} else {
System.out.println(System.currentTimeMillis() + " - 客户端收到数据:" + serverData);
}
}
writer.close();
socket.close();
}
}
Multicast
Java 组播示例:
/**
* 组播
* 如果你在大街上喊一声美女,将会有一组人(女)回头看你
*/
public class Multicast {
public static class MulticastServer {
public static void main(String[] args) {
try {
// 组定义
// 组的地址端处于 224.0.0.0 - 239.255.255.255
InetAddress group = InetAddress.getByName("224.5.5.6");
MulticastSocket server = new MulticastSocket();
for (int i = 0; i < 10; i++) {
String data = "你好年轻人";
server.send(new DatagramPacket(data.getBytes(), data.getBytes().length, group, 8888)); // DatagramPacket: UDP数据包
TimeUnit.SECONDS.sleep(2);
}
} catch (UnknownHostException e) {
e.printStackTrace();
} catch (IOException e) {
e.printStackTrace();
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
public static class MulticastClient {
public static void main(String[] args) {
try {
InetAddress group = InetAddress.getByName("224.5.5.6");
MulticastSocket client = new MulticastSocket(8888);
client.joinGroup(group); // 加入指定的组中
byte[] buf = new byte[256];
while (true) {
DatagramPacket msgPkg = new DatagramPacket(buf, buf.length);
client.receive(msgPkg);
String msg = new String(msgPkg.getData());
System.out.println(msg);
}
} catch (UnknownHostException e) {
e.printStackTrace();
} catch (IOException e) {
e.printStackTrace();
}
}
}
}
欢迎来到这里!
我们正在构建一个小众社区,大家在这里相互信任,以平等 • 自由 • 奔放的价值观进行分享交流。最终,希望大家能够找到与自己志同道合的伙伴,共同成长。
注册 关于