开发人员主要使用 zk 的客户端,所以我们先来了解 zk 客户端的创建过程原理。
1. 概述
zk 客户端的核心组件如下:
- ZooKeeper 实例 :客户端入口
- ClientWatcherManager :客户端 Watcher 管理器
- HostProvider:客户端地址列表管理器
- ClientCnxn:客户端核心线程。包含两个线程,即 SendThread 和 EventThread。前者是一个 I/O 线程,主要负责 ZooKeeper 客户端和服务端之间的网络 I/O 通信,后者是一个事件线程,主要负责对服务端事件进行处理。
类图说明:
ZooKeeper 客户端的初始化与启动环节,实际上就是 ZooKeeper 对象的实例化过程,分析一个 zk 客户端的构造方法:
public ZooKeeper(String connectString, int sessionTimeout, Watcher watcher, long sessionId, byte[] sessionPasswd, boolean canBeReadOnly, HostProvider aHostProvider, ZKClientConfig clientConfig) throws IOException { } connectString:逗号隔开的 host:port对,表示节点路径,比如:127.0.0.1:3002/app/a 表示客户端的root节点为/app/a sessionTimeout:session过期时间,单位:毫秒 watcher:默认的Watcher,会收到状态更新、节点事件的消息推送 sessionId:重连时特定的session Id canBeReadOnly:是否是只读模式,这种情况只允许读流量,写入将被拒绝 aHostProvider: clientConfig:客户端配置信息
客户端初始化和启动过程大体分为三个步骤:
- 设置默认 Watcher
- 设置 ZooKeeper 服务器地址列表
- 创建 ClientCnxn
2.会话的创建过程
2.1.初始化阶段
-
初始化 ZooKeeper 对象。
通过调用 Zookeeper 的构造方法来实例化一个 ZooKeeper 对象,初始化过程中,会创建一个客户端的 Watcher 管理器 -
设置会话默认 Watcher
如果在构造方法中传入一个 Wat 对象,那么客户端会将这个对象作为默认 Watcher 保存在 ClientWatchManager 中。 -
构造 ZooKeeper 服务器地址列表管理器:HostProvider
对于构造方法中传入的服务器地址,客户端会将其存放在服务器地址累不管理器中 -
创建并初始化客户端网络连接器:ClientCnxn
Zook 客户端首先会创建一个网络连接器 ClientCnxn,用来管理客户端与服务器的网络交互。另外,客户端在创建 ClientCnxn 的同事,还会初始化客户端的两个核心队列 outgoingQueue 和 pendingQueue,分别作为客户端的请求发送队列和服务端响应的等待队列。 -
初始化 SendThread 和 EventThread。
客户端会创建两个核心网络线程 SendThread 和 EventThread,前者用于管理客户端和服务端之间的所有网络 I/O,后者用于客户端事件处理。客户端还会将 ClientCnxnSocket 分配给 SendThread 作为底层网络 I/O 处理器,并初始化 EventThread 的待处理事件队列 waitingEvents,用于存放所有等待被客户端处理的事件。
源码分析:
if (clientConfig == null) { clientConfig = new ZKClientConfig(); } this.clientConfig = clientConfig; // 设置默认Watcher管理器 watchManager = defaultWatchManager(); watchManager.defaultWatcher = watcher; // 客户端连接地址解析,支持IPV6,这里用到了ChrootPath,表示根节点 ConnectStringParser connectStringParser = new ConnectStringParser( connectString); // 设置服务端地址管理器 hostProvider = aHostProvider; // 初始化网络连接器 cnxn = new ClientCnxn(connectStringParser.getChrootPath(), hostProvider, sessionTimeout, this, watchManager, getClientCnxnSocket(), sessionId, sessionPasswd, canBeReadOnly); cnxn.seenRwServerBefore = true; // since user has provided sessionId cnxn.start();
客户端还会将 ClientCnxnSocket 分配给 SendThread 作为底层网络 I/O 处理器
前面提到了这样一句话,看下具体如何实现:
new ClientCnxn
时调用 getClientCnxnSocket()
创建了一个 ClientCnxnSocket
。
ClientCnxnSocket
创建过程如下:
private ClientCnxnSocket getClientCnxnSocket() throws IOException { // 从配置信息中获取线程名称 String clientCnxnSocketName = getClientConfig().getProperty( ZKClientConfig.ZOOKEEPER_CLIENT_CNXN_SOCKET); // 默认NIO方式 if (clientCnxnSocketName == null) { clientCnxnSocketName = ClientCnxnSocketNIO.class.getName(); } try { // 1.通过反射获取clientCxn构造器,ZKClientConfig.class是构造器需要的参数 Constructor clientCxnConstructor = Class.forName(clientCnxnSocketName).getDeclaredConstructor(ZKClientConfig.class); // 2.实例化ClientCnxnSocket ClientCnxnSocket clientCxnSocket = (ClientCnxnSocket) clientCxnConstructor.newInstance(getClientConfig()); return clientCxnSocket; } catch (Exception e) { IOException ioe = new IOException("Couldn't instantiate " + clientCnxnSocketName); ioe.initCause(e); throw ioe; } }
然后执行 new ClientCnxn
:
public ClientCnxn(String chrootPath, HostProvider hostProvider, int sessionTimeout, ZooKeeper zooKeeper, ClientWatchManager watcher, ClientCnxnSocket clientCnxnSocket, long sessionId, byte[] sessionPasswd, boolean canBeReadOnly) { // 设置参数 this.zooKeeper = zooKeeper; this.watcher = watcher; this.sessionId = sessionId; this.sessionPasswd = sessionPasswd; this.sessionTimeout = sessionTimeout; this.hostProvider = hostProvider; this.chrootPath = chrootPath; connectTimeout = sessionTimeout / hostProvider.size(); readTimeout = sessionTimeout * 2 / 3; readOnly = canBeReadOnly; // 初始化SendThread,管理客户端和服务端之间的网络I/O,依赖于clientCnxnSocket,守护线程 sendThread = new SendThread(clientCnxnSocket); // 初始化EventThread,用于事件处理,会被设置为守护线程 eventThread = new EventThread(); this.clientConfig=zooKeeper.getClientConfig(); // 初始化超时机制 initRequestTimeout(); }
至此,初始化阶段完成。
2.2. 会话创建阶段
-
启动 SendThread 和 EventThread
SendThread 首先会判断当前客户端的状态,进行一系列清理工作,为客户端发送会话创建请求做准备 -
获取一个服务器地址
开始创建 Tcp 连接之前,SendThread 首先需要获取一个 ZooKeeper 服务器的目标地址,通常是从 HostProvider 中随机选出一个,然后委托给 ClientCnxnSocket 去创建与 ZooKeeper 服务器之间的 TCP 连接
选取规则:serverAddress = hostProvider.next(1000);
-
创建 TCP 连接
获取到一个服务器地址后,ClientCnxnSocket 负责和服务器创建一个 TCP 长链接 -
构造 ConnectRequest 请求
SendThread 会负责根据当前客户端的实际设置,构造出一个 ConnectRequest 请求,该请求代表了客户端试图与服务端创建一个会话。同时,ZooKeeper 客户端还会进一步将该请求包装成网络 I/O 层的 Packet 对象,放入请求发送队列 outgoingQueue 中去 -
发送请求
ClientCnxnSocket 负责从 outgoingQueue 中取出一个待发送的 Packet 对象,将其序列化成 ByteBuffer 向服务端进行发送
源码分析:
初始化 SendThread 时,会触发 run()方法。这里面有个 startConnect(InetSocketAddress addr)
方法,负责建立连接。有两种实现方式,NIO
和 Netty
NIO connent:
@Override void connect(InetSocketAddress addr) throws IOException { // 创建socket SocketChannel sock = createSock(); try { // 注册 registerAndConnect(sock, addr); } catch (IOException e) { LOG.error("Unable to open socket to " + addr); sock.close(); throw e; } initialized = false; /* * Reset incomingBuffer */ lenBuffer.clear(); incomingBuffer = lenBuffer; }
选择器注册:
void registerAndConnect(SocketChannel sock, InetSocketAddress addr) throws IOException { // selector注册 sockKey = sock.register(selector, SelectionKey.OP_CONNECT); boolean immediateConnect = sock.connect(addr); if (immediateConnect) { // 连接的主要逻辑:设置会话、权限处理、watches处理 sendThread.primeConnection(); } }
Netty 方式的代码在 ClientCnxnSocketNetty
中,代码比较多,就不详细说了。达成的效果是一样的。
接下来是构造构造 ConnectRequest 的过程,都是在 primeConnection()
方法中完成。
// 创建request对象 ConnectRequest conReq = new ConnectRequest(0, lastZxid, sessionTimeout, sessId, sessionPasswd); // 构造器 public ConnectRequest( int protocolVersion, long lastZxidSeen, int timeOut, long sessionId, byte[] passwd) { this.protocolVersion=protocolVersion; this.lastZxidSeen=lastZxidSeen; this.timeOut=timeOut; this.sessionId=sessionId; this.passwd=passwd; }
数据格式如上:
数据包封装代码如下:
// 设置packet数据包 RequestHeader header = new RequestHeader(-8, OpCode.setWatches); Packet packet = new Packet(header, new ReplyHeader(), sw, null, null); // 放入outgoingQueue中去 outgoingQueue.addFirst(packet);
2.3.响应处理阶段
-
接收服务端响应
ClientCnxnSocket 接收到服务端的响应后,会判断当前客户端状态是否是已初始化,如果尚未完成初始化,那么就认为该响应一定是会话创建请求的响应,直接交由 readConnectResult 方法来处理响应。 -
处理 Response
ClientCnxnSocket 会对接收到的服务端响应进行反序列化,得到 ConnectResponse 对象,并充中获取到 ZooKeeper 服务端分配的会话 sessionId -
连接成功
连接成功后,一方面需要通知 SendThread 线程,进一步对客户端进行会话参数的设置,并更新客户端状态;另一方面,需要通知地址管理器 HostProvider 当前连接成功的服务器地址。 -
生成事件:SyncConnected-None
为了能够让上层应用感知到会话的成功创建,SendThread 会生成一个事件SyncConnected-None
,代表客户端与服务器会话创建成功,并将该事件传递给 EventThread 线程。 -
查询 Watcher
EventThread 线程收到事件后,会从 ClientWatchManager 中查询对应的 Watcher,然后放入 waitingEvents 队列中去 -
处理事件
EventThread 不断从 waitingEvents 队列中取出待处理的 Watcher 对象,然后直接调用该对象的 Process 接口方法,以达到触发 Watcher 的目的。
源码分析:
if (!incomingBuffer.hasRemaining()) { incomingBuffer.flip(); if (incomingBuffer == lenBuffer) { recvCount.getAndIncrement(); readLength(); } else if (!initialized) { // 未初始化完成,交由readConnectResult方法处理 readConnectResult(); enableRead(); if (findSendablePacket(outgoingQueue, sendThread.tunnelAuthInProgress()) != null) { // Since SASL authentication has completed (if client is configured to do so), // outgoing packets waiting in the outgoingQueue can now be sent. enableWrite(); } lenBuffer.clear(); incomingBuffer = lenBuffer; updateLastHeard(); initialized = true; } else { // 处理Response sendThread.readResponse(incomingBuffer); lenBuffer.clear(); incomingBuffer = lenBuffer; updateLastHeard(); } }
处理 Response:
反序列化:
ByteBufferInputStream bbis = new ByteBufferInputStream(incomingBuffer); BinaryInputArchive bbia = BinaryInputArchive.getArchive(bbis); ReplyHeader replyHdr = new ReplyHeader(); replyHdr.deserialize(bbia, "header");
分 xid 处理,如果 xid = -2,处理 ping 信息的。打印日志返回。
if (replyHdr.getXid() == -2) { // -2 is the xid for pings if (LOG.isDebugEnabled()) { LOG.debug("Got ping response for sessionid: 0x" + Long.toHexString(sessionId) + " after " + ((System.nanoTime() - lastPingSentNs) / 1000000) + "ms"); } return; }
xid = -4,处理鉴权
if (replyHdr.getXid() == -4) { // -4 is the xid for AuthPacket if(replyHdr.getErr() == KeeperException.Code.AUTHFAILED.intValue()) { state = States.AUTH_FAILED; eventThread.queueEvent( new WatchedEvent(Watcher.Event.EventType.None, Watcher.Event.KeeperState.AuthFailed, null) ); eventThread.queueEventOfDeath(); } if (LOG.isDebugEnabled()) { LOG.debug("Got auth sessionid:0x" + Long.toHexString(sessionId)); } return; }
xid = -1 处理通知信息:
WatcherEvent event = new WatcherEvent(); // 反序列化event信息 event.deserialize(bbia, "response"); WatchedEvent we = new WatchedEvent(event); // 放入event队列中去处理 eventThread.queueEvent( we );
EventThread 事件处理 run():
public void run() { try { isRunning = true; while (true) { Object event = waitingEvents.take(); if (event == eventOfDeath) { wasKilled = true; } else { // 事件处理 processEvent(event); } if (wasKilled) synchronized (waitingEvents) { if (waitingEvents.isEmpty()) { isRunning = false; break; } } } } catch (InterruptedException e) { LOG.error("Event thread exiting due to interruption", e); } LOG.info("EventThread shut down for session: 0x{}", Long.toHexString(getSessionId())); }
processEvent() 方法逻辑:调用每个 Watch 的 process 方法,达到触发 Watcher 的目的
if (event instanceof WatcherSetEventPair) { // each watcher will process the event WatcherSetEventPair pair = (WatcherSetEventPair) event; for (Watcher watcher : pair.watchers) { try { // watcher触发 watcher.process(pair.event); } catch (Throwable t) { LOG.error("Error while calling watcher ", t); } } }
客户端会话创建流程:
至此,客户端一次完整的会话创建过程就已经完成了。
欢迎来到这里!
我们正在构建一个小众社区,大家在这里相互信任,以平等 • 自由 • 奔放的价值观进行分享交流。最终,希望大家能够找到与自己志同道合的伙伴,共同成长。
注册 关于