开发人员主要使用 zk 的客户端,所以我们先来了解 zk 客户端的创建过程原理。
1. 概述
zk 客户端的核心组件如下:
- ZooKeeper 实例 :客户端入口
- ClientWatcherManager :客户端 Watcher 管理器
- HostProvider:客户端地址列表管理器
- ClientCnxn:客户端核心线程。包含两个线程,即 SendThread 和 EventThread。前者是一个 I/O 线程,主要负责 ZooKeeper 客户端和服务端之间的网络 I/O 通信,后者是一个事件线程,主要负责对服务端事件进行处理。
类图说明:
ZooKeeper 客户端的初始化与启动环节,实际上就是 ZooKeeper 对象的实例化过程,分析一个 zk 客户端的构造方法:
public ZooKeeper(String connectString, int sessionTimeout, Watcher watcher, long sessionId, byte[] sessionPasswd, boolean canBeReadOnly,
HostProvider aHostProvider, ZKClientConfig clientConfig) throws IOException {
}
connectString:逗号隔开的 host:port对,表示节点路径,比如:127.0.0.1:3002/app/a 表示客户端的root节点为/app/a
sessionTimeout:session过期时间,单位:毫秒
watcher:默认的Watcher,会收到状态更新、节点事件的消息推送
sessionId:重连时特定的session Id
canBeReadOnly:是否是只读模式,这种情况只允许读流量,写入将被拒绝
aHostProvider:
clientConfig:客户端配置信息
客户端初始化和启动过程大体分为三个步骤:
- 设置默认 Watcher
- 设置 ZooKeeper 服务器地址列表
- 创建 ClientCnxn
2.会话的创建过程
2.1.初始化阶段
-
初始化 ZooKeeper 对象。
通过调用 Zookeeper 的构造方法来实例化一个 ZooKeeper 对象,初始化过程中,会创建一个客户端的 Watcher 管理器 -
设置会话默认 Watcher
如果在构造方法中传入一个 Wat 对象,那么客户端会将这个对象作为默认 Watcher 保存在 ClientWatchManager 中。 -
构造 ZooKeeper 服务器地址列表管理器:HostProvider
对于构造方法中传入的服务器地址,客户端会将其存放在服务器地址累不管理器中 -
创建并初始化客户端网络连接器:ClientCnxn
Zook 客户端首先会创建一个网络连接器 ClientCnxn,用来管理客户端与服务器的网络交互。另外,客户端在创建 ClientCnxn 的同事,还会初始化客户端的两个核心队列 outgoingQueue 和 pendingQueue,分别作为客户端的请求发送队列和服务端响应的等待队列。 -
初始化 SendThread 和 EventThread。
客户端会创建两个核心网络线程 SendThread 和 EventThread,前者用于管理客户端和服务端之间的所有网络 I/O,后者用于客户端事件处理。客户端还会将 ClientCnxnSocket 分配给 SendThread 作为底层网络 I/O 处理器,并初始化 EventThread 的待处理事件队列 waitingEvents,用于存放所有等待被客户端处理的事件。
源码分析:
if (clientConfig == null) {
clientConfig = new ZKClientConfig();
}
this.clientConfig = clientConfig;
// 设置默认Watcher管理器
watchManager = defaultWatchManager();
watchManager.defaultWatcher = watcher;
// 客户端连接地址解析,支持IPV6,这里用到了ChrootPath,表示根节点
ConnectStringParser connectStringParser = new ConnectStringParser(
connectString);
// 设置服务端地址管理器
hostProvider = aHostProvider;
// 初始化网络连接器
cnxn = new ClientCnxn(connectStringParser.getChrootPath(),
hostProvider, sessionTimeout, this, watchManager,
getClientCnxnSocket(), sessionId, sessionPasswd, canBeReadOnly);
cnxn.seenRwServerBefore = true; // since user has provided sessionId
cnxn.start();
客户端还会将 ClientCnxnSocket 分配给 SendThread 作为底层网络 I/O 处理器
前面提到了这样一句话,看下具体如何实现:
new ClientCnxn
时调用 getClientCnxnSocket()
创建了一个 ClientCnxnSocket
。
ClientCnxnSocket
创建过程如下:
private ClientCnxnSocket getClientCnxnSocket() throws IOException {
// 从配置信息中获取线程名称
String clientCnxnSocketName = getClientConfig().getProperty(
ZKClientConfig.ZOOKEEPER_CLIENT_CNXN_SOCKET);
// 默认NIO方式
if (clientCnxnSocketName == null) {
clientCnxnSocketName = ClientCnxnSocketNIO.class.getName();
}
try {
// 1.通过反射获取clientCxn构造器,ZKClientConfig.class是构造器需要的参数
Constructor clientCxnConstructor = Class.forName(clientCnxnSocketName).getDeclaredConstructor(ZKClientConfig.class);
// 2.实例化ClientCnxnSocket
ClientCnxnSocket clientCxnSocket = (ClientCnxnSocket) clientCxnConstructor.newInstance(getClientConfig());
return clientCxnSocket;
} catch (Exception e) {
IOException ioe = new IOException("Couldn't instantiate "
+ clientCnxnSocketName);
ioe.initCause(e);
throw ioe;
}
}
然后执行 new ClientCnxn
:
public ClientCnxn(String chrootPath, HostProvider hostProvider, int sessionTimeout, ZooKeeper zooKeeper,
ClientWatchManager watcher, ClientCnxnSocket clientCnxnSocket,
long sessionId, byte[] sessionPasswd, boolean canBeReadOnly) {
// 设置参数
this.zooKeeper = zooKeeper;
this.watcher = watcher;
this.sessionId = sessionId;
this.sessionPasswd = sessionPasswd;
this.sessionTimeout = sessionTimeout;
this.hostProvider = hostProvider;
this.chrootPath = chrootPath;
connectTimeout = sessionTimeout / hostProvider.size();
readTimeout = sessionTimeout * 2 / 3;
readOnly = canBeReadOnly;
// 初始化SendThread,管理客户端和服务端之间的网络I/O,依赖于clientCnxnSocket,守护线程
sendThread = new SendThread(clientCnxnSocket);
// 初始化EventThread,用于事件处理,会被设置为守护线程
eventThread = new EventThread();
this.clientConfig=zooKeeper.getClientConfig();
// 初始化超时机制
initRequestTimeout();
}
至此,初始化阶段完成。
2.2. 会话创建阶段
-
启动 SendThread 和 EventThread
SendThread 首先会判断当前客户端的状态,进行一系列清理工作,为客户端发送会话创建请求做准备 -
获取一个服务器地址
开始创建 Tcp 连接之前,SendThread 首先需要获取一个 ZooKeeper 服务器的目标地址,通常是从 HostProvider 中随机选出一个,然后委托给 ClientCnxnSocket 去创建与 ZooKeeper 服务器之间的 TCP 连接
选取规则:serverAddress = hostProvider.next(1000);
-
创建 TCP 连接
获取到一个服务器地址后,ClientCnxnSocket 负责和服务器创建一个 TCP 长链接 -
构造 ConnectRequest 请求
SendThread 会负责根据当前客户端的实际设置,构造出一个 ConnectRequest 请求,该请求代表了客户端试图与服务端创建一个会话。同时,ZooKeeper 客户端还会进一步将该请求包装成网络 I/O 层的 Packet 对象,放入请求发送队列 outgoingQueue 中去 -
发送请求
ClientCnxnSocket 负责从 outgoingQueue 中取出一个待发送的 Packet 对象,将其序列化成 ByteBuffer 向服务端进行发送
源码分析:
初始化 SendThread 时,会触发 run()方法。这里面有个 startConnect(InetSocketAddress addr)
方法,负责建立连接。有两种实现方式,NIO
和 Netty
NIO connent:
@Override
void connect(InetSocketAddress addr) throws IOException {
// 创建socket
SocketChannel sock = createSock();
try {
// 注册
registerAndConnect(sock, addr);
} catch (IOException e) {
LOG.error("Unable to open socket to " + addr);
sock.close();
throw e;
}
initialized = false;
/*
* Reset incomingBuffer
*/
lenBuffer.clear();
incomingBuffer = lenBuffer;
}
选择器注册:
void registerAndConnect(SocketChannel sock, InetSocketAddress addr)
throws IOException {
// selector注册
sockKey = sock.register(selector, SelectionKey.OP_CONNECT);
boolean immediateConnect = sock.connect(addr);
if (immediateConnect) {
// 连接的主要逻辑:设置会话、权限处理、watches处理
sendThread.primeConnection();
}
}
Netty 方式的代码在 ClientCnxnSocketNetty
中,代码比较多,就不详细说了。达成的效果是一样的。
接下来是构造构造 ConnectRequest 的过程,都是在 primeConnection()
方法中完成。
// 创建request对象
ConnectRequest conReq = new ConnectRequest(0, lastZxid, sessionTimeout, sessId, sessionPasswd);
// 构造器
public ConnectRequest(
int protocolVersion,
long lastZxidSeen,
int timeOut,
long sessionId,
byte[] passwd) {
this.protocolVersion=protocolVersion;
this.lastZxidSeen=lastZxidSeen;
this.timeOut=timeOut;
this.sessionId=sessionId;
this.passwd=passwd;
}
数据格式如上:
数据包封装代码如下:
// 设置packet数据包
RequestHeader header = new RequestHeader(-8, OpCode.setWatches);
Packet packet = new Packet(header, new ReplyHeader(), sw, null, null);
// 放入outgoingQueue中去
outgoingQueue.addFirst(packet);
2.3.响应处理阶段
-
接收服务端响应
ClientCnxnSocket 接收到服务端的响应后,会判断当前客户端状态是否是已初始化,如果尚未完成初始化,那么就认为该响应一定是会话创建请求的响应,直接交由 readConnectResult 方法来处理响应。 -
处理 Response
ClientCnxnSocket 会对接收到的服务端响应进行反序列化,得到 ConnectResponse 对象,并充中获取到 ZooKeeper 服务端分配的会话 sessionId -
连接成功
连接成功后,一方面需要通知 SendThread 线程,进一步对客户端进行会话参数的设置,并更新客户端状态;另一方面,需要通知地址管理器 HostProvider 当前连接成功的服务器地址。 -
生成事件:SyncConnected-None
为了能够让上层应用感知到会话的成功创建,SendThread 会生成一个事件SyncConnected-None
,代表客户端与服务器会话创建成功,并将该事件传递给 EventThread 线程。 -
查询 Watcher
EventThread 线程收到事件后,会从 ClientWatchManager 中查询对应的 Watcher,然后放入 waitingEvents 队列中去 -
处理事件
EventThread 不断从 waitingEvents 队列中取出待处理的 Watcher 对象,然后直接调用该对象的 Process 接口方法,以达到触发 Watcher 的目的。
源码分析:
if (!incomingBuffer.hasRemaining()) {
incomingBuffer.flip();
if (incomingBuffer == lenBuffer) {
recvCount.getAndIncrement();
readLength();
} else if (!initialized) {
// 未初始化完成,交由readConnectResult方法处理
readConnectResult();
enableRead();
if (findSendablePacket(outgoingQueue, sendThread.tunnelAuthInProgress()) != null) {
// Since SASL authentication has completed (if client is configured to do so),
// outgoing packets waiting in the outgoingQueue can now be sent.
enableWrite();
}
lenBuffer.clear();
incomingBuffer = lenBuffer;
updateLastHeard();
initialized = true;
} else {
// 处理Response
sendThread.readResponse(incomingBuffer);
lenBuffer.clear();
incomingBuffer = lenBuffer;
updateLastHeard();
}
}
处理 Response:
反序列化:
ByteBufferInputStream bbis = new ByteBufferInputStream(incomingBuffer);
BinaryInputArchive bbia = BinaryInputArchive.getArchive(bbis);
ReplyHeader replyHdr = new ReplyHeader();
replyHdr.deserialize(bbia, "header");
分 xid 处理,如果 xid = -2,处理 ping 信息的。打印日志返回。
if (replyHdr.getXid() == -2) {
// -2 is the xid for pings
if (LOG.isDebugEnabled()) {
LOG.debug("Got ping response for sessionid: 0x"
+ Long.toHexString(sessionId)
+ " after "
+ ((System.nanoTime() - lastPingSentNs) / 1000000)
+ "ms");
}
return;
}
xid = -4,处理鉴权
if (replyHdr.getXid() == -4) {
// -4 is the xid for AuthPacket
if(replyHdr.getErr() == KeeperException.Code.AUTHFAILED.intValue()) {
state = States.AUTH_FAILED;
eventThread.queueEvent( new WatchedEvent(Watcher.Event.EventType.None,
Watcher.Event.KeeperState.AuthFailed, null) );
eventThread.queueEventOfDeath();
}
if (LOG.isDebugEnabled()) {
LOG.debug("Got auth sessionid:0x"
+ Long.toHexString(sessionId));
}
return;
}
xid = -1 处理通知信息:
WatcherEvent event = new WatcherEvent();
// 反序列化event信息
event.deserialize(bbia, "response");
WatchedEvent we = new WatchedEvent(event);
// 放入event队列中去处理
eventThread.queueEvent( we );
EventThread 事件处理 run():
public void run() {
try {
isRunning = true;
while (true) {
Object event = waitingEvents.take();
if (event == eventOfDeath) {
wasKilled = true;
} else {
// 事件处理
processEvent(event);
}
if (wasKilled)
synchronized (waitingEvents) {
if (waitingEvents.isEmpty()) {
isRunning = false;
break;
}
}
}
} catch (InterruptedException e) {
LOG.error("Event thread exiting due to interruption", e);
}
LOG.info("EventThread shut down for session: 0x{}",
Long.toHexString(getSessionId()));
}
processEvent() 方法逻辑:调用每个 Watch 的 process 方法,达到触发 Watcher 的目的
if (event instanceof WatcherSetEventPair) {
// each watcher will process the event
WatcherSetEventPair pair = (WatcherSetEventPair) event;
for (Watcher watcher : pair.watchers) {
try {
// watcher触发
watcher.process(pair.event);
} catch (Throwable t) {
LOG.error("Error while calling watcher ", t);
}
}
}
客户端会话创建流程:
至此,客户端一次完整的会话创建过程就已经完成了。
欢迎来到这里!
我们正在构建一个小众社区,大家在这里相互信任,以平等 • 自由 • 奔放的价值观进行分享交流。最终,希望大家能够找到与自己志同道合的伙伴,共同成长。
注册 关于