ZooKeeper 源码分析 (一)—会话创建之客户端

本贴最后更新于 2064 天前,其中的信息可能已经渤澥桑田

开发人员主要使用 zk 的客户端,所以我们先来了解 zk 客户端的创建过程原理。

1. 概述

zk 客户端的核心组件如下:

  • ZooKeeper 实例 :客户端入口
  • ClientWatcherManager :客户端 Watcher 管理器
  • HostProvider:客户端地址列表管理器
  • ClientCnxn:客户端核心线程。包含两个线程,即 SendThread 和 EventThread。前者是一个 I/O 线程,主要负责 ZooKeeper 客户端和服务端之间的网络 I/O 通信,后者是一个事件线程,主要负责对服务端事件进行处理。

类图说明:
te

ZooKeeper 客户端的初始化与启动环节,实际上就是 ZooKeeper 对象的实例化过程,分析一个 zk 客户端的构造方法:

 public ZooKeeper(String connectString, int sessionTimeout, Watcher watcher, long sessionId, byte[] sessionPasswd, boolean canBeReadOnly,
  HostProvider aHostProvider, ZKClientConfig clientConfig) throws IOException {
}

connectString:逗号隔开的 host:port对,表示节点路径,比如:127.0.0.1:3002/app/a 表示客户端的root节点为/app/a
sessionTimeout:session过期时间,单位:毫秒
watcher:默认的Watcher,会收到状态更新、节点事件的消息推送
sessionId:重连时特定的session Id
canBeReadOnly:是否是只读模式,这种情况只允许读流量,写入将被拒绝
aHostProvider:
clientConfig:客户端配置信息

客户端初始化和启动过程大体分为三个步骤:

  • 设置默认 Watcher
  • 设置 ZooKeeper 服务器地址列表
  • 创建 ClientCnxn

2.会话的创建过程

2.1.初始化阶段

  1. 初始化 ZooKeeper 对象。
    通过调用 Zookeeper 的构造方法来实例化一个 ZooKeeper 对象,初始化过程中,会创建一个客户端的 Watcher 管理器

  2. 设置会话默认 Watcher
    如果在构造方法中传入一个 Wat 对象,那么客户端会将这个对象作为默认 Watcher 保存在 ClientWatchManager 中。

  3. 构造 ZooKeeper 服务器地址列表管理器:HostProvider
    对于构造方法中传入的服务器地址,客户端会将其存放在服务器地址累不管理器中

  4. 创建并初始化客户端网络连接器:ClientCnxn
    Zook 客户端首先会创建一个网络连接器 ClientCnxn,用来管理客户端与服务器的网络交互。另外,客户端在创建 ClientCnxn 的同事,还会初始化客户端的两个核心队列 outgoingQueue 和 pendingQueue,分别作为客户端的请求发送队列和服务端响应的等待队列。

  5. 初始化 SendThread 和 EventThread。
    客户端会创建两个核心网络线程 SendThread 和 EventThread,前者用于管理客户端和服务端之间的所有网络 I/O,后者用于客户端事件处理。客户端还会将 ClientCnxnSocket 分配给 SendThread 作为底层网络 I/O 处理器,并初始化 EventThread 的待处理事件队列 waitingEvents,用于存放所有等待被客户端处理的事件。

源码分析:

if (clientConfig == null) {
    clientConfig = new ZKClientConfig();
}

this.clientConfig = clientConfig;

// 设置默认Watcher管理器
watchManager = defaultWatchManager();
watchManager.defaultWatcher = watcher;

// 客户端连接地址解析,支持IPV6,这里用到了ChrootPath,表示根节点
ConnectStringParser connectStringParser = new ConnectStringParser(
        connectString);
// 设置服务端地址管理器
hostProvider = aHostProvider;

// 初始化网络连接器
cnxn = new ClientCnxn(connectStringParser.getChrootPath(),
  hostProvider, sessionTimeout, this, watchManager,
  getClientCnxnSocket(), sessionId, sessionPasswd, canBeReadOnly);
  
cnxn.seenRwServerBefore = true; // since user has provided sessionId
cnxn.start();


客户端还会将 ClientCnxnSocket 分配给 SendThread 作为底层网络 I/O 处理器

前面提到了这样一句话,看下具体如何实现:
new ClientCnxn 时调用 getClientCnxnSocket() 创建了一个 ClientCnxnSocket

ClientCnxnSocket 创建过程如下:

private ClientCnxnSocket getClientCnxnSocket() throws IOException {

    // 从配置信息中获取线程名称
    String clientCnxnSocketName = getClientConfig().getProperty(
            ZKClientConfig.ZOOKEEPER_CLIENT_CNXN_SOCKET);
   // 默认NIO方式
   if (clientCnxnSocketName == null) {
	clientCnxnSocketName = ClientCnxnSocketNIO.class.getName();
   }
   try {
        // 1.通过反射获取clientCxn构造器,ZKClientConfig.class是构造器需要的参数
	Constructor clientCxnConstructor = Class.forName(clientCnxnSocketName).getDeclaredConstructor(ZKClientConfig.class);
	
	// 2.实例化ClientCnxnSocket
	ClientCnxnSocket clientCxnSocket = (ClientCnxnSocket) clientCxnConstructor.newInstance(getClientConfig());
	return clientCxnSocket;
    } catch (Exception e) {
	IOException ioe = new IOException("Couldn't instantiate "
	+ clientCnxnSocketName);
	ioe.initCause(e);
	throw ioe;
    }
}

然后执行 new ClientCnxn

public ClientCnxn(String chrootPath, HostProvider hostProvider, int sessionTimeout, ZooKeeper zooKeeper,
  ClientWatchManager watcher, ClientCnxnSocket clientCnxnSocket,
  long sessionId, byte[] sessionPasswd, boolean canBeReadOnly) {
  // 设置参数
  this.zooKeeper = zooKeeper;
  this.watcher = watcher;
  this.sessionId = sessionId;
  this.sessionPasswd = sessionPasswd;
  this.sessionTimeout = sessionTimeout;
  this.hostProvider = hostProvider;
  this.chrootPath = chrootPath;

  connectTimeout = sessionTimeout / hostProvider.size();
  readTimeout = sessionTimeout * 2 / 3;
  readOnly = canBeReadOnly;
  // 初始化SendThread,管理客户端和服务端之间的网络I/O,依赖于clientCnxnSocket,守护线程
  sendThread = new SendThread(clientCnxnSocket);
  // 初始化EventThread,用于事件处理,会被设置为守护线程
  eventThread = new EventThread();
  this.clientConfig=zooKeeper.getClientConfig();
  // 初始化超时机制
  initRequestTimeout();
}

至此,初始化阶段完成。

2.2. 会话创建阶段

  1. 启动 SendThread 和 EventThread
    SendThread 首先会判断当前客户端的状态,进行一系列清理工作,为客户端发送会话创建请求做准备

  2. 获取一个服务器地址
    开始创建 Tcp 连接之前,SendThread 首先需要获取一个 ZooKeeper 服务器的目标地址,通常是从 HostProvider 中随机选出一个,然后委托给 ClientCnxnSocket 去创建与 ZooKeeper 服务器之间的 TCP 连接
    选取规则:serverAddress = hostProvider.next(1000);

  3. 创建 TCP 连接
    获取到一个服务器地址后,ClientCnxnSocket 负责和服务器创建一个 TCP 长链接

  4. 构造 ConnectRequest 请求
    SendThread 会负责根据当前客户端的实际设置,构造出一个 ConnectRequest 请求,该请求代表了客户端试图与服务端创建一个会话。同时,ZooKeeper 客户端还会进一步将该请求包装成网络 I/O 层的 Packet 对象,放入请求发送队列 outgoingQueue 中去

  5. 发送请求
    ClientCnxnSocket 负责从 outgoingQueue 中取出一个待发送的 Packet 对象,将其序列化成 ByteBuffer 向服务端进行发送

源码分析:
初始化 SendThread 时,会触发 run()方法。这里面有个 startConnect(InetSocketAddress addr) 方法,负责建立连接。有两种实现方式,NIONetty

NIO connent:

@Override
void connect(InetSocketAddress addr) throws IOException {
	// 创建socket
    SocketChannel sock = createSock();
   try {
	 // 注册
	registerAndConnect(sock, addr);
  } catch (IOException e) {
	LOG.error("Unable to open socket to " + addr);
  sock.close();
	throw e;
  }
   initialized = false;

  /*
   * Reset incomingBuffer 
   */  
   lenBuffer.clear();
   incomingBuffer = lenBuffer;
}

选择器注册:

void registerAndConnect(SocketChannel sock, InetSocketAddress addr) 
throws IOException {

  // selector注册
 sockKey = sock.register(selector, SelectionKey.OP_CONNECT);
 boolean immediateConnect = sock.connect(addr);
 if (immediateConnect) {
	   // 连接的主要逻辑:设置会话、权限处理、watches处理
        sendThread.primeConnection();
  }
}

Netty 方式的代码在 ClientCnxnSocketNetty 中,代码比较多,就不详细说了。达成的效果是一样的。

接下来是构造构造 ConnectRequest 的过程,都是在 primeConnection() 方法中完成。

// 创建request对象
ConnectRequest conReq = new ConnectRequest(0, lastZxid, sessionTimeout, sessId, sessionPasswd);

// 构造器
public ConnectRequest(
   int protocolVersion,
   long lastZxidSeen,
   int timeOut,
   long sessionId,
   byte[] passwd) {
	this.protocolVersion=protocolVersion;
   this.lastZxidSeen=lastZxidSeen;
   this.timeOut=timeOut;
   this.sessionId=sessionId;
   this.passwd=passwd;
}

数据格式如上:

数据包封装代码如下:

// 设置packet数据包
RequestHeader header = new RequestHeader(-8, OpCode.setWatches);
Packet packet = new Packet(header, new ReplyHeader(), sw, null, null);
// 放入outgoingQueue中去
outgoingQueue.addFirst(packet);

2.3.响应处理阶段

  1. 接收服务端响应
    ClientCnxnSocket 接收到服务端的响应后,会判断当前客户端状态是否是已初始化,如果尚未完成初始化,那么就认为该响应一定是会话创建请求的响应,直接交由 readConnectResult 方法来处理响应。

  2. 处理 Response
    ClientCnxnSocket 会对接收到的服务端响应进行反序列化,得到 ConnectResponse 对象,并充中获取到 ZooKeeper 服务端分配的会话 sessionId

  3. 连接成功
    连接成功后,一方面需要通知 SendThread 线程,进一步对客户端进行会话参数的设置,并更新客户端状态;另一方面,需要通知地址管理器 HostProvider 当前连接成功的服务器地址。

  4. 生成事件:SyncConnected-None
    为了能够让上层应用感知到会话的成功创建,SendThread 会生成一个事件 SyncConnected-None,代表客户端与服务器会话创建成功,并将该事件传递给 EventThread 线程。

  5. 查询 Watcher
    EventThread 线程收到事件后,会从 ClientWatchManager 中查询对应的 Watcher,然后放入 waitingEvents 队列中去

  6. 处理事件
    EventThread 不断从 waitingEvents 队列中取出待处理的 Watcher 对象,然后直接调用该对象的 Process 接口方法,以达到触发 Watcher 的目的。

源码分析:

if (!incomingBuffer.hasRemaining()) {
    incomingBuffer.flip();
   if (incomingBuffer == lenBuffer) {
	recvCount.getAndIncrement();
	readLength();
   } else if (!initialized) {
	 // 未初始化完成,交由readConnectResult方法处理
	readConnectResult();
	enableRead();
	 if (findSendablePacket(outgoingQueue, sendThread.tunnelAuthInProgress()) != null) {
	  // Since SASL authentication has completed (if client is configured to do so),
	  // outgoing packets waiting in the outgoingQueue can now be sent.
	  enableWrite();
	  }
	lenBuffer.clear();
	incomingBuffer = lenBuffer;
	updateLastHeard();
	initialized = true;
	} else {
	// 处理Response
	  sendThread.readResponse(incomingBuffer);
	  lenBuffer.clear();
	  incomingBuffer = lenBuffer;
	  updateLastHeard();
	}
}

处理 Response:

反序列化:

ByteBufferInputStream bbis = new ByteBufferInputStream(incomingBuffer);
BinaryInputArchive bbia = BinaryInputArchive.getArchive(bbis);
ReplyHeader replyHdr = new ReplyHeader();

replyHdr.deserialize(bbia, "header");

分 xid 处理,如果 xid = -2,处理 ping 信息的。打印日志返回。

if (replyHdr.getXid() == -2) {
    // -2 is the xid for pings
  if (LOG.isDebugEnabled()) {
        LOG.debug("Got ping response for sessionid: 0x"
  + Long.toHexString(sessionId)
                + " after "
  + ((System.nanoTime() - lastPingSentNs) / 1000000)
                + "ms");
  }
    return;
}

xid = -4,处理鉴权

if (replyHdr.getXid() == -4) {
    // -4 is the xid for AuthPacket               
 if(replyHdr.getErr() == KeeperException.Code.AUTHFAILED.intValue()) {
        state = States.AUTH_FAILED;                    
 eventThread.queueEvent( new WatchedEvent(Watcher.Event.EventType.None, 
 Watcher.Event.KeeperState.AuthFailed, null) );
  eventThread.queueEventOfDeath();
  }
    if (LOG.isDebugEnabled()) {
        LOG.debug("Got auth sessionid:0x"
  + Long.toHexString(sessionId));
  }
    return;
}

xid = -1 处理通知信息:

WatcherEvent event = new WatcherEvent();
// 反序列化event信息
event.deserialize(bbia, "response");
WatchedEvent we = new WatchedEvent(event);

// 放入event队列中去处理
eventThread.queueEvent( we );

EventThread 事件处理 run():

public void run() {
   try {
      isRunning = true;
      while (true) {
         Object event = waitingEvents.take();
         if (event == eventOfDeath) {
            wasKilled = true;
         } else {
            // 事件处理
            processEvent(event);
         }
         if (wasKilled)
            synchronized (waitingEvents) {
               if (waitingEvents.isEmpty()) {
                  isRunning = false;
                  break;
               }
            }
      }
   } catch (InterruptedException e) {
      LOG.error("Event thread exiting due to interruption", e);
   }

    LOG.info("EventThread shut down for session: 0x{}",
             Long.toHexString(getSessionId()));
}

processEvent() 方法逻辑:调用每个 Watch 的 process 方法,达到触发 Watcher 的目的

if (event instanceof WatcherSetEventPair) {
  // each watcher will process the event
  WatcherSetEventPair pair = (WatcherSetEventPair) event;
  for (Watcher watcher : pair.watchers) {
      try {
	  // watcher触发
          watcher.process(pair.event);
      } catch (Throwable t) {
          LOG.error("Error while calling watcher ", t);
      }
  }
}

客户端会话创建流程:
s

至此,客户端一次完整的会话创建过程就已经完成了。

  • 分布式
    80 引用 • 149 回帖 • 4 关注
  • ZooKeeper

    ZooKeeper 是一个分布式的,开放源码的分布式应用程序协调服务,是 Google 的 Chubby 一个开源的实现,是 Hadoop 和 HBase 的重要组件。它是一个为分布式应用提供一致性服务的软件,提供的功能包括:配置维护、域名服务、分布式同步、组服务等。

    59 引用 • 29 回帖 • 4 关注

相关帖子

欢迎来到这里!

我们正在构建一个小众社区,大家在这里相互信任,以平等 • 自由 • 奔放的价值观进行分享交流。最终,希望大家能够找到与自己志同道合的伙伴,共同成长。

注册 关于
请输入回帖内容 ...