ZooKeeper 源码分析 (三)—服务端启动之单机模式

本贴最后更新于 2239 天前,其中的信息可能已经时移世异

1. 预启动

1.1. QuorumPeerMain 作为启动入口

跟集群模式一样,启动入口也是在 QuorumPeerMain main 方法中。

1.2. 解析配置文件 zoo.cfg

配置文件包含了 zk 运行时需要的基本参数,比如 tickTime、dataDir 和 clientPort。解析的主要逻辑如下:

try { File configFile = (new VerifyingFileFactory.Builder(LOG) .warnForRelativePath() .failForNonExistingPath() .build()).create(path); Properties cfg = new Properties(); FileInputStream in = new FileInputStream(configFile); try { // 文件加载 cfg.load(in); configFileStr = path; } finally { in.close(); } // 解析配置文件 parseProperties(cfg); } catch (IOException e) { throw new ConfigException("Error processing " + path, e); } catch (IllegalArgumentException e) { throw new ConfigException("Error processing " + path, e); }

1.3. 创建并启动历史文件清理器 DatadirCleanupManager

ZooKeeper 的自动清理文件机制,对快照文件和事务日志定期进行清理。实例创建的代码如下:

DatadirCleanupManager purgeMgr = new DatadirCleanupManager(config.getDataDir(), config.getDataLogDir(), config.getSnapRetainCount(), config.getPurgeInterval());

可以看到,创建实例时需要知道 dataDir、dataLogDir、快照保存个数、清理间隔这些参数。内部基于 TimerTask 起了一个定时任务,会根据 purgeInterval 定期去清理文件。

timer = new Timer("PurgeTask", true); TimerTask task = new PurgeTask(dataLogDir, snapDir, snapRetainCount); timer.scheduleAtFixedRate(task, 0, TimeUnit.HOURS.toMillis(purgeInterval));

1.4. 判断启动模式

根据解析出的服务器地址列表判断走单机模式还是集群模式。

if (args.length == 1 && config.isDistributed()) { runFromConfig(config); } else { LOG.warn("Either no config or no quorum defined in config, running " + " in standalone mode"); // there is only server in the quorum -- run as standalone ZooKeeperServerMain.main(args); }

这里的 args 是启动命令行输入的配置文件名称,当文件名称只有一个,并且配置文件里配置了 distributed=true 时走集群模式,否则走单机模式。单机启动的代码都在 ZooKeeperServerMain.main(args)

1.5. 再次进行配置解析

单机启动时,命令输入的参数会有两种情况

  • 输入了配置文件名称
  • 直接输入了配置参数,比如按顺序输入了 clientPortAddressdataDirdataLogDirtickTimemaxClientCnxns

所以要根据这两种情况进行不同的解析

if (args.length == 1) { config.parse(args[0]); } else { config.parse(args); }

当命令行输入的文件名称时,根据路径构造 File 对象,进行解析,然后把配置信息读入到 ServerConfig 中

public void parse(String path) throws ConfigException { QuorumPeerConfig config = new QuorumPeerConfig(); // 根据文件名称解析配置文件 config.parse(path); // let qpconfig parse the file and then pull the stuff we are // interested in // 从配置中读取配置信息,给ServerConfig赋值 readFrom(config); }

直接输入配置参数时,需要保证输入参数的顺序,代码中是按顺序取的,顺序见上面描述。

public void parse(String[] args) { if (args.length < 2 || args.length > 4) { throw new IllegalArgumentException("Invalid number of arguments:" + Arrays.toString(args)); } // clientPortAddress是第一顺序 clientPortAddress = new InetSocketAddress(Integer.parseInt(args[0])); // 第二顺序是dataDir dataDir = new File(args[1]); dataLogDir = dataDir; if (args.length >= 3) { // 第三顺序是tickTime tickTime = Integer.parseInt(args[2]); } if (args.length == 4) { // 第四顺序是maxClientCnxns maxClientCnxns = Integer.parseInt(args[3]); } }

2. 初始化

接下来会执行 runFromConfig(ServerConfig config) 方法,进行服务器实例化过程,分步骤来看。

2.1. 创建 FileTxnSnapLog

FileTxnSnapLog 是 ZooKeeper 上层服务于底层数据存储之间的对接层,提供了一系列操作数据文件的接口。包括事务日志文件和快照数据文件。根据 dataDir 和 snapDir 来创建 FileTxnSnapLog。

this.dataDir = new File(dataDir, version + VERSION); this.snapDir = new File(snapDir, version + VERSION);

主要的方法有如下一些,后面会作为专题来讲

// 读取快照文件和事务日志之后恢复服务端数据 restore(DataTree, Map, Integer>, PlayBackListener):long // 把最新的事务日志快速更新到server database,与restore不同的是只处理事务日志 fastForwardFromEdits(DataTree, Map, Integer>,PlayBackListener):long // 把dataTree和sessions放入快照文件中 save(DataTree,ConcurrentHashMap, Integer>, boolean):void // 在dataTree上处理事务 processTransaction(TxnHeader,DataTree ,Map, Integer> , Record ):void

2.2. 创建 MetricsProvider

MetricsProvider 是一个可以收集指标、将当前值发送到外部设备的一个组件。数据在 server 端和 client 端可以共享。在《ZooKeeper 源码分析二-服务端启动之集群模式》讲过,这里不再细说。

2.3. 创建服务器实例 ZooKeeperServer

依赖 txnLog、tickTime、minSessionTimeout、maxSessionTimeout、listenBacklog 等参数实例化 ZooKeeperServer:

ZooKeeperServer zkServer = new ZooKeeperServer(txnLog, config.tickTime, config.minSessionTimeout, config.maxSessionTimeout, config.listenBacklog, null);

2.4. 创建 ServerStats

ServerStats 是一个服务端统计器,包含了最基本的运行时信息:

  • packetsSent 从 zk 启动开始,或是最近一次重置服务端统计信息之后,服务端向客户端发送的响应包次数
  • packetsReceived 从 zk 启动开始,或是最近一次重置服务端统计信息之后,服务端接收到来自客户端的请求包次数
  • requestLatency 从 zk 启动开始,或是最近一次重置服务端统计信息之后服务端请求处理的最大延时、最小延时、平均延时以及总延时
  • clientResponseStats 提供 jute 序列化缓冲区使用情况的实时统计信息
  • fsyncThresholdExceedCount 同步内存数据到存储设备超过阈值的次数
  • startTime zk 启动后开始统计的时间

2.5. Registers shutdown handler

ZooKeeperServerShutdownHandler 是 zk 用于处理异常的组件。当系统发生错误时,会使用 CountDownLatch 通知其他线程停止工作

class ZooKeeperServerShutdownHandler { private final CountDownLatch shutdownLatch; ZooKeeperServerShutdownHandler(CountDownLatch shutdownLatch) { this.shutdownLatch = shutdownLatch; } /** * This will be invoked when the server transition to a new server state. * * @param state new server state */ void handle(State state) { if (state == State.ERROR || state == State.SHUTDOWN) { shutdownLatch.countDown(); } } }

2.6. 启动 AdminServer

AdminServer 用来管理 ZooKeeperServer。有两种实现方式 JettyAdminServer 和 DummyAdminServer。

zookeeper.admin.enableServer 为 true 时才启动 AdminServer,通过反射的方式创建实例

public static AdminServer createAdminServer() { if (!"false".equals(System.getProperty("zookeeper.admin.enableServer"))) { try { Class<?> jettyAdminServerC = Class.forName("org.apache.zookeeper.server.admin.JettyAdminServer"); Object adminServer = jettyAdminServerC.getConstructor().newInstance(); return (AdminServer) adminServer; } catch (ClassNotFoundException e) { LOG.warn("Unable to start JettyAdminServer", e); } } return new DummyAdminServer(); }

创建完 AdminServer 后设置 ZooKeeperServer,启动 jetty 容器

adminServer = AdminServerFactory.createAdminServer(); adminServer.setZooKeeperServer(zkServer); adminServer.start();

Jetty 容器启动代码如下:

@Override public void start() throws AdminServerException { try { server.start(); } catch (Exception e) { // Server.start() only throws Exception, so let's at least wrap it // in an identifiable subclass throw new AdminServerException(String.format( "Problem starting AdminServer on address %s," + " port %d and command URL %s", address, port, commandUrl), e); } LOG.info(String.format("Started AdminServer on address %s, port %d" + " and command URL %s", address, port, commandUrl)); }

2.7. 创建 ServerCnxnFactory

早期版本,都是自己实现 NIO 框架,从 3.4.0 版本引入了 Netty,可以通过 zookeeper.serverCnxnFactory 来指定使用 NIO 还是 Netty 作为 Zookeeper 服务端网络连接工厂。

2.8. 初始化 ServerCnxnFactory

Zookeeper 首先会初始化一个 Thread,作为整个 ServerCnxnFactory 的主线程,然后再初始化 NIO 服务器。

// 初始化selectorThreads for (int i = 0; i < numSelectorThreads; ++i) { selectorThreads.add(new SelectorThread(i)); } // 初始化expirerThread expirerThread = new ConnectionExpirerThread(); // 初始化acceptThread acceptThread = new AcceptThread(ss, addr, selectorThreads);

2.9. 启动 ServerCnxnFactory 主线程

启动步骤 2.7 中已经初始化的主线程的主逻辑(run 方法)。这时候 ZooKeeper 的 NIO 服务器已经对外开放端口,客户端能够访问 2181 端口,但是此时 ZooKeeper 服务器是无法正常处理客户端请求的

@Override public void start() { stopped = false; if (workerPool == null) { workerPool = new WorkerService( "NIOWorker", numWorkerThreads, false); } for (SelectorThread thread : selectorThreads) { if (thread.getState() == Thread.State.NEW) { thread.start(); } } // ensure thread is started once and only once if (acceptThread.getState() == Thread.State.NEW) { acceptThread.start(); } if (expirerThread.getState() == Thread.State.NEW) { expirerThread.start(); } }

2.10. 恢复本地数据

从本地快照和事务日志文件中进行数据恢复。详细的数据恢复过程后面专题会讲

public void startdata() throws IOException, InterruptedException { //check to see if zkDb is not null if (zkDb == null) { zkDb = new ZKDatabase(this.txnLogFactory); } if (!zkDb.isInitialized()) { loadData(); } }

2.11. 创建并启动会话管理器

创建会话管理器 SessionTracker,SessionTracker 主要负责 ZooKeeper 服务端的会话管理,创建 SessionTracker 时,会设置 expireInterval、NextExpirationTime 和 SessionWithTimeout,还会计算出一个初始化的 SessionID

if (sessionTracker == null) { createSessionTracker(); } startSessionTracker();

2.12. 初始化 ZooKeeper 的请求处理链

典型的责任链方式实现,在 ZooKeeper 服务器上,会有多个请求处理器一次来处理一个客户端请求,在服务器启动的时候,会将这些请求处理器串联起来形成一个请求处理链。单机版服务器的请求处理链主要包括 PrepRequestProcessorSyncRequestProcessorFinalRequestProcessor

PrepRequestProcessor -> SyncRequestProcessor -> FinalRequestProcessor
单机版 ZooKeeper 服务器的请求处理链

protected void setupRequestProcessors() { RequestProcessor finalProcessor = new FinalRequestProcessor(this); RequestProcessor syncProcessor = new SyncRequestProcessor(this, finalProcessor); ((SyncRequestProcessor)syncProcessor).start(); firstProcessor = new PrepRequestProcessor(this, syncProcessor); ((PrepRequestProcessor)firstProcessor).start(); }

2.13. 注册 JMX 服务

ZooKeeper 会将服务器运行时的一些信息以 JMS 的方式暴露给外部,具体实现如下:

protected void registerJMX() { // register with JMX try { jmxServerBean = new ZooKeeperServerBean(this); MBeanRegistry.getInstance().register(jmxServerBean, null); try { jmxDataTreeBean = new DataTreeBean(zkDb.getDataTree()); MBeanRegistry.getInstance().register(jmxDataTreeBean, jmxServerBean); } catch (Exception e) { LOG.warn("Failed to register with JMX", e); jmxDataTreeBean = null; } } catch (Exception e) { LOG.warn("Failed to register with JMX", e); jmxServerBean = null; } }

2.14. 注册 ZooKeeper 服务器实例

前面的步骤过后,ZooKeeper 已经将 ServerCnxnFactory 主线程启动,但是同时 ZooKeeper 依旧无法处理客户端请求,原因是网络层不能够访问 ZooKeeper 实例,经过后续步骤的初始化,ZooKeeper 服务器实例已经初始化完毕,只需要注册给 ServerCnxnFactory 即可,之后 ZooKeeper 就可以对外提供正常的服务了。

final public void setZooKeeperServer(ZooKeeperServer zks) { this.zkServer = zks; if (zks != null) { if (secure) { zks.setSecureServerCnxnFactory(this); } else { zks.setServerCnxnFactory(this); } } }

至此,单机版的 ZooKeeper 服务器启动完毕。

3. 流程总结

1

  • ZooKeeper

    ZooKeeper 是一个分布式的,开放源码的分布式应用程序协调服务,是 Google 的 Chubby 一个开源的实现,是 Hadoop 和 HBase 的重要组件。它是一个为分布式应用提供一致性服务的软件,提供的功能包括:配置维护、域名服务、分布式同步、组服务等。

    59 引用 • 29 回帖 • 1 关注

相关帖子

欢迎来到这里!

我们正在构建一个小众社区,大家在这里相互信任,以平等 • 自由 • 奔放的价值观进行分享交流。最终,希望大家能够找到与自己志同道合的伙伴,共同成长。

注册 关于
请输入回帖内容 ...