1. 启动入口
ZooKeeper 服务端启动入口在 QuorumPeerMain
这个类中,主要逻辑在 initializeAndRun
方法中。主要逻辑为:
- 解析 zk 配置(console 是否输入了配置文件名称)
- 启动负责清理工作的 schedule(利用 Java Timer 实现的定时任务,清理过期文件快照)
- 启动过程(根据配置信息判断走集群模式还是单机模式)
1.1 解析 zoo.cfg 文件
try { File configFile = (new VerifyingFileFactory.Builder(LOG) .warnForRelativePath() .failForNonExistingPath() .build()).create(path); Properties cfg = new Properties(); FileInputStream in = new FileInputStream(configFile); try { // 加载文件 cfg.load(in); configFileStr = path; } finally { in.close(); } // 解析配置 parseProperties(cfg); } catch (IOException e) { throw new ConfigException("Error processing " + path, e); } catch (IllegalArgumentException e) { throw new ConfigException("Error processing " + path, e); }
1.2 启动 DatadirCleanupManager 历史文件清理器
启动一个 PurgeTask(继承自 TimeTask),去清理历史文件,清理的主要逻辑如代码所示
public static void purge(File dataDir, File snapDir, int num) throws IOException { if (num < 3) { throw new IllegalArgumentException(COUNT_ERR_MSG); } FileTxnSnapLog txnLog = new FileTxnSnapLog(dataDir, snapDir); List<File> snaps = txnLog.findNRecentSnapshots(num); int numSnaps = snaps.size(); if (numSnaps > 0) { purgeOlderSnapshots(txnLog, snaps.get(numSnaps - 1)); } }
初始化主体代码如下:
protected void initializeAndRun(String[] args) throws ConfigException, IOException, AdminServerException { // 如果启动命令行输入了配置文件名称,则进行zk 配置文件解析 QuorumPeerConfig config = new QuorumPeerConfig(); if (args.length == 1) { config.parse(args[0]); } // Start and schedule the the purge task (启动一个负责清理工作的schedule) DatadirCleanupManager purgeMgr = new DatadirCleanupManager(config .getDataDir(), config.getDataLogDir(), config .getSnapRetainCount(), config.getPurgeInterval()); purgeMgr.start(); if (args.length == 1 && config.isDistributed()) { // 读取配置信息并启动,集群模式 runFromConfig(config); } else { LOG.warn("Either no config or no quorum defined in config, running " + " in standalone mode"); // there is only server in the quorum -- run as standalone // 没有自定义配置,默认单机模式启动 ZooKeeperServerMain.main(args); } }
集群模式启动代码分析,逻辑在 runFromConfig(config)
中,单机模式的代码在 ZooKeeperServerMain
中。
2. 集群模式启动
2.1. 注册 log4j JMX Bean
- 对 log4j 注册 JMX MBean ,实现监控、管理能力
// Create and Register the top level Log4J MBean // org.apache.log4j.jmx.HierarchyDynamicMBean hdm = new org.apache.log4j.jmx.HierarchyDynamicMBean(); Object hdm = Class.forName("org.apache.log4j.jmx.HierarchyDynamicMBean").getConstructor().newInstance(); String mbean = System.getProperty("zookeeper.jmx.log4j.mbean", "log4j:hierarchy=default"); ObjectName mbo = new ObjectName(mbean); mbs.registerMBean(hdm, mbo); // Add the root logger to the Hierarchy MBean // org.apache.log4j.Logger rootLogger = // org.apache.log4j.Logger.getRootLogger(); Object rootLogger = Class.forName("org.apache.log4j.Logger") .getMethod("getRootLogger", (Class<?>[]) null) .invoke(null, (Object[]) null); // hdm.addLoggerMBean(rootLogger.getName()); Object rootLoggerName = rootLogger.getClass() .getMethod("getName", (Class<?>[]) null) .invoke(rootLogger, (Object[]) null); hdm.getClass().getMethod("addLoggerMBean", String.class) .invoke(hdm, rootLoggerName);
2.2. 启动 MetricsProvider
- 启动
MetricsProvider
,MetricsProvider 是一个可以收集指标、将当前值发送到外部设备的一个组件。数据在 server 端和 client 端可以共享。
public static MetricsProvider startMetricsProvider(String metricsProviderClassName, Properties configuration) throws MetricsProviderLifeCycleException { try { // 实例化metricsProvider,通过类名反射 MetricsProvider metricsProvider = (MetricsProvider) Class.forName(metricsProviderClassName, true, Thread.currentThread().getContextClassLoader()).newInstance(); metricsProvider.configure(configuration); // 启动过程 metricsProvider.start(); return metricsProvider; } catch (ClassNotFoundException | IllegalAccessException | InstantiationException error) { LOG.error("Cannot boot MetricsProvider {}", metricsProviderClassName, error); throw new MetricsProviderLifeCycleException("Cannot boot MetricsProvider " + metricsProviderClassName, error); } catch (MetricsProviderLifeCycleException error) { LOG.error("Cannot boot MetricsProvider {}", metricsProviderClassName, error); throw error; } }
2.3. 初始化 QuorumPeer
2.3.1 创建 ServerCnxnFactory 工厂,初始化 QuorumPeer
QuorumPeer 组件介绍: 集群模式下特有的对象,zk 实例的托管者,从集群层面看,代表了 zk 集群中的一台机器,在运行期间,QuorumPeer 会不断的检测当前服务器实例的运行状态,同时根据情况发起 leader 选举。
2.3.2 初始化 quorum auth server 和 learner
会初始化两个对象 SaslQuorumAuthServer
和 SaslQuorumAuthLearner
,都实现了 QuorumAuthServer
,是 zk 授权机制的接口定义。定义的 authenticate
方法会处理授权的主要流程
public void authenticate(Socket sock, DataInputStream din) throws IOException;
SaslQuorumAuthServer
初始化逻辑如下:
从 zoo.cfg 配置文件中读取 quorumRequireSasl 的值,是否需要权限校验,authzHosts 是配置的服务端的 host 列表,只有在列表中的机器才能纳入 zk 集群。然后初始化一个 SaslQuorumServerCallbackHandler
,这是授权回调处理的组件,SASL 授权机制会用该 handler 做一些验证操作。最后创建 serverLogin
zoo.cfg 样例
quorum.auth.enableSasl=true quorum.auth.learnerRequireSasl=true quorum.auth.serverRequireSasl=true quorum.auth.learner.loginContext=QuorumLearner quorum.auth.server.loginContext=QuorumServer quorum.auth.kerberos.servicePrincipal=servicename/_HOST quorum.cnxn.threads.size=20
创建过程:
public SaslQuorumAuthServer(boolean quorumRequireSasl, String loginContext, Set<String> authzHosts) throws SaslException { this.quorumRequireSasl = quorumRequireSasl; try { AppConfigurationEntry entries[] = Configuration.getConfiguration() .getAppConfigurationEntry(loginContext); if (entries == null || entries.length == 0) { throw new LoginException("SASL-authentication failed" + " because the specified JAAS configuration " + "section '" + loginContext + "' could not be found."); } SaslQuorumServerCallbackHandler saslServerCallbackHandler = new SaslQuorumServerCallbackHandler( Configuration.getConfiguration(), loginContext, authzHosts); serverLogin = new Login(loginContext, saslServerCallbackHandler, new ZKConfig()); serverLogin.startThreadIfNeeded(); } catch (Throwable e) { throw new SaslException( "Failed to initialize authentication mechanism using SASL", e); } }
SaslQuorumAuthLearner
初始化最终也会创建一个 learnerLogin
,回调处理组件是 SaslClientCallbackHandler
,Login
处理 zk 客户端和服务端登录时的 Kerberos 凭证刷新。
public SaslQuorumAuthLearner(boolean quorumRequireSasl, String quorumServicePrincipal, String loginContext) throws SaslException { this.quorumRequireSasl = quorumRequireSasl; this.quorumServicePrincipal = quorumServicePrincipal; try { AppConfigurationEntry entries[] = Configuration .getConfiguration() .getAppConfigurationEntry(loginContext); if (entries == null || entries.length == 0) { throw new LoginException("SASL-authentication failed because" + " the specified JAAS configuration " + "section '" + loginContext + "' could not be found."); } this.learnerLogin = new Login(loginContext, new SaslClientCallbackHandler(null, "QuorumLearner"), new ZKConfig()); this.learnerLogin.startThreadIfNeeded(); } catch (LoginException e) { throw new SaslException("Failed to initialize authentication mechanism using SASL", e); } }
2.3.3 Kerberos 认证原理
2.3.4 loadDataBase
加载数据。把数据从磁盘加载到内存中。调用的是 loadDataBase
,这个方法也能把当前事务信息保存到已提交事务日志记录中,保存到磁盘
snapLog.restore
用于从快照或者事务日志恢复 server 端数据,并返回最大的 zxid
public long loadDataBase() throws IOException { long startTime = Time.currentElapsedTime(); long zxid = snapLog.restore(dataTree, sessionsWithTimeouts, commitProposalPlaybackListener); initialized = true; long loadTime = Time.currentElapsedTime() - startTime; ServerMetrics.DB_INIT_TIME.add(loadTime); LOG.info("Snapshot loaded in " + loadTime + " ms"); return zxid; }
2.3.5 startServerCnxnFactory
server 端要启动的 factory 有两个,cnxnFactory 和 securityCnxnFactory。每个 Factory 的实现均有两种方式,Netty
和 NIO
方式。前面说过 ServerCnxn 是个处理网络 I/O 的组件,需要建立 socket 通道。这两种方式就是建立网络 socket 的不同方式
private void startServerCnxnFactory() { if (cnxnFactory != null) { cnxnFactory.start(); } if (secureCnxnFactory != null) { secureCnxnFactory.start(); } }
在介绍 cnxnFactory 通过 NIO 建立 socket 的方式之前,需要了解一下几个组件
WorkerService
:一个 worker 线程池,处理工作任务,里面包含若干 ExecutorService,可以设置工作线程数,shutDown 超时时间。
SelectorThread
:zk 封装的 thread,继承自 Thread,封装了 NIO 的 Selector
、SocketChannel
,SelectionKey
,用于完成网络通信。
类图关系如下:
start 主要逻辑
- workerPool 为空时,初始化工作线程池
- 启动 select 线程
- 启动 acceptThread、expirerThread,确保只会启动一次
- selector 检测多个通道,并知晓通道为读写事件做好准备,管理多个通道
- 处理已经同意建立的连接,通道向选择器注册事件,建立连接。
- 对于建立完成的连接,将 cnxn 添加到指定的 set 集合中
- 线程中断后,通道清理工作,断开连接。
public void start() { stopped = false; if (workerPool == null) { workerPool = new WorkerService( "NIOWorker", numWorkerThreads, false); } for(SelectorThread thread : selectorThreads) { if (thread.getState() == Thread.State.NEW) { thread.start(); } } // ensure thread is started once and only once if (acceptThread.getState() == Thread.State.NEW) { acceptThread.start(); } if (expirerThread.getState() == Thread.State.NEW) { expirerThread.start(); } }
run()主要逻辑:
public void run() { try { while (!stopped) { try { // 监测多个通道 select(); // 处理连接过程 processAcceptedConnections(); // processInterestOpsUpdateRequests(); } catch (RuntimeException e) { LOG.warn("Ignoring unexpected runtime exception", e); } catch (Exception e) { LOG.warn("Ignoring unexpected exception", e); } } // 线程中断后,断开连接,通道清理 // Close connections still pending on the selector. Any others // with in-flight work, let drain out of the work queue. for (SelectionKey key : selector.keys()) { NIOServerCnxn cnxn = (NIOServerCnxn) key.attachment(); if (cnxn.isSelectable()) { cnxn.close(); } cleanupSelectionKey(key); } SocketChannel accepted; while ((accepted = acceptedQueue.poll()) != null) { fastCloseSock(accepted); } updateQueue.clear(); } finally { // 关闭selector closeSelector(); // 通知工作线程进行中断操作 // This will wake up the accept thread and the other selector // threads, and tell the worker thread pool to begin shutdown. NIOServerCnxnFactory.this.stop(); LOG.info("selector thread exitted run method"); } }
cnxnFactory 通过 netty start 的逻辑如下
public void start() { if (listenBacklog != -1) { bootstrap.option(ChannelOption.SO_BACKLOG, listenBacklog); } LOG.info("binding to port {}", localAddress); parentChannel = bootstrap.bind(localAddress).syncUninterruptibly().channel(); // Port changes after bind() if the original port was 0, update // localAddress to get the real port. localAddress = (InetSocketAddress) parentChannel.localAddress(); LOG.info("bound to port " + getLocalPort()); }
2.3.6 jetty server 启动
启动内嵌的 jetty:
public void start() throws AdminServerException { try { server.start(); } catch (Exception e) { // Server.start() only throws Exception, so let's at least wrap it // in an identifiable subclass throw new AdminServerException(String.format( "Problem starting AdminServer on address %s," + " port %d and command URL %s", address, port, commandUrl), e); } LOG.info(String.format("Started AdminServer on address %s, port %d" + " and command URL %s", address, port, commandUrl)); }
2.3.7 startLeaderElection
接下来进入 leader 选举的过程。
- 进行投票
- 发送投票结果
private void starter(QuorumPeer self) { this.self = self; // 生成投票 port = self.getVotingView().get(self.getId()).electionAddr.getPort(); proposedLeader = -1; proposedZxid = -1; try { // 生成报文套接字 mySocket = new DatagramSocket(port); // mySocket.setSoTimeout(20000); } catch (SocketException e1) { e1.printStackTrace(); throw new RuntimeException(); } // 要发送的投票结果 sendqueue = new LinkedBlockingQueue<ToSend>(2 * self.getVotingView().size()); // 收到的投票 recvqueue = new LinkedBlockingQueue<Notification>(2 * self.getVotingView() .size()); // 把投票结果生成消息发送出去 new Messenger(self.getVotingView().size() * 2, mySocket); }
3. 流程总结
服务器启动集群模式的大概流程如下所示:
欢迎来到这里!
我们正在构建一个小众社区,大家在这里相互信任,以平等 • 自由 • 奔放的价值观进行分享交流。最终,希望大家能够找到与自己志同道合的伙伴,共同成长。
注册 关于