1. 预启动
1.1. QuorumPeerMain
作为启动入口
跟集群模式一样,启动入口也是在 QuorumPeerMain
main 方法中。
1.2. 解析配置文件 zoo.cfg
配置文件包含了 zk 运行时需要的基本参数,比如 tickTime、dataDir 和 clientPort。解析的主要逻辑如下:
try { File configFile = (new VerifyingFileFactory.Builder(LOG) .warnForRelativePath() .failForNonExistingPath() .build()).create(path); Properties cfg = new Properties(); FileInputStream in = new FileInputStream(configFile); try { // 文件加载 cfg.load(in); configFileStr = path; } finally { in.close(); } // 解析配置文件 parseProperties(cfg); } catch (IOException e) { throw new ConfigException("Error processing " + path, e); } catch (IllegalArgumentException e) { throw new ConfigException("Error processing " + path, e); }
1.3. 创建并启动历史文件清理器 DatadirCleanupManager
ZooKeeper 的自动清理文件机制,对快照文件和事务日志定期进行清理。实例创建的代码如下:
DatadirCleanupManager purgeMgr = new DatadirCleanupManager(config.getDataDir(), config.getDataLogDir(), config.getSnapRetainCount(), config.getPurgeInterval());
可以看到,创建实例时需要知道 dataDir、dataLogDir、快照保存个数、清理间隔这些参数。内部基于 TimerTask
起了一个定时任务,会根据 purgeInterval
定期去清理文件。
timer = new Timer("PurgeTask", true); TimerTask task = new PurgeTask(dataLogDir, snapDir, snapRetainCount); timer.scheduleAtFixedRate(task, 0, TimeUnit.HOURS.toMillis(purgeInterval));
1.4. 判断启动模式
根据解析出的服务器地址列表判断走单机模式还是集群模式。
if (args.length == 1 && config.isDistributed()) { runFromConfig(config); } else { LOG.warn("Either no config or no quorum defined in config, running " + " in standalone mode"); // there is only server in the quorum -- run as standalone ZooKeeperServerMain.main(args); }
这里的 args
是启动命令行输入的配置文件名称,当文件名称只有一个,并且配置文件里配置了 distributed=true
时走集群模式,否则走单机模式。单机启动的代码都在 ZooKeeperServerMain.main(args)
中
1.5. 再次进行配置解析
单机启动时,命令输入的参数会有两种情况
- 输入了配置文件名称
- 直接输入了配置参数,比如按顺序输入了
clientPortAddress
,dataDir
,dataLogDir
,tickTime
,maxClientCnxns
所以要根据这两种情况进行不同的解析
if (args.length == 1) { config.parse(args[0]); } else { config.parse(args); }
当命令行输入的文件名称时,根据路径构造 File 对象,进行解析,然后把配置信息读入到 ServerConfig 中
public void parse(String path) throws ConfigException { QuorumPeerConfig config = new QuorumPeerConfig(); // 根据文件名称解析配置文件 config.parse(path); // let qpconfig parse the file and then pull the stuff we are // interested in // 从配置中读取配置信息,给ServerConfig赋值 readFrom(config); }
直接输入配置参数时,需要保证输入参数的顺序,代码中是按顺序取的,顺序见上面描述。
public void parse(String[] args) { if (args.length < 2 || args.length > 4) { throw new IllegalArgumentException("Invalid number of arguments:" + Arrays.toString(args)); } // clientPortAddress是第一顺序 clientPortAddress = new InetSocketAddress(Integer.parseInt(args[0])); // 第二顺序是dataDir dataDir = new File(args[1]); dataLogDir = dataDir; if (args.length >= 3) { // 第三顺序是tickTime tickTime = Integer.parseInt(args[2]); } if (args.length == 4) { // 第四顺序是maxClientCnxns maxClientCnxns = Integer.parseInt(args[3]); } }
2. 初始化
接下来会执行 runFromConfig(ServerConfig config)
方法,进行服务器实例化过程,分步骤来看。
2.1. 创建 FileTxnSnapLog
FileTxnSnapLog 是 ZooKeeper 上层服务于底层数据存储之间的对接层,提供了一系列操作数据文件的接口。包括事务日志文件和快照数据文件。根据 dataDir 和 snapDir 来创建 FileTxnSnapLog。
this.dataDir = new File(dataDir, version + VERSION); this.snapDir = new File(snapDir, version + VERSION);
主要的方法有如下一些,后面会作为专题来讲
// 读取快照文件和事务日志之后恢复服务端数据 restore(DataTree, Map, Integer>, PlayBackListener):long // 把最新的事务日志快速更新到server database,与restore不同的是只处理事务日志 fastForwardFromEdits(DataTree, Map, Integer>,PlayBackListener):long // 把dataTree和sessions放入快照文件中 save(DataTree,ConcurrentHashMap, Integer>, boolean):void // 在dataTree上处理事务 processTransaction(TxnHeader,DataTree ,Map, Integer> , Record ):void
2.2. 创建 MetricsProvider
MetricsProvider 是一个可以收集指标、将当前值发送到外部设备的一个组件。数据在 server 端和 client 端可以共享。在《ZooKeeper 源码分析二-服务端启动之集群模式》讲过,这里不再细说。
2.3. 创建服务器实例 ZooKeeperServer
依赖 txnLog、tickTime、minSessionTimeout、maxSessionTimeout、listenBacklog 等参数实例化 ZooKeeperServer:
ZooKeeperServer zkServer = new ZooKeeperServer(txnLog, config.tickTime, config.minSessionTimeout, config.maxSessionTimeout, config.listenBacklog, null);
2.4. 创建 ServerStats
ServerStats 是一个服务端统计器,包含了最基本的运行时信息:
packetsSent
从 zk 启动开始,或是最近一次重置服务端统计信息之后,服务端向客户端发送的响应包次数packetsReceived
从 zk 启动开始,或是最近一次重置服务端统计信息之后,服务端接收到来自客户端的请求包次数requestLatency
从 zk 启动开始,或是最近一次重置服务端统计信息之后服务端请求处理的最大延时、最小延时、平均延时以及总延时clientResponseStats
提供 jute 序列化缓冲区使用情况的实时统计信息fsyncThresholdExceedCount
同步内存数据到存储设备超过阈值的次数startTime
zk 启动后开始统计的时间
2.5. Registers shutdown handler
ZooKeeperServerShutdownHandler
是 zk 用于处理异常的组件。当系统发生错误时,会使用 CountDownLatch 通知其他线程停止工作
class ZooKeeperServerShutdownHandler { private final CountDownLatch shutdownLatch; ZooKeeperServerShutdownHandler(CountDownLatch shutdownLatch) { this.shutdownLatch = shutdownLatch; } /** * This will be invoked when the server transition to a new server state. * * @param state new server state */ void handle(State state) { if (state == State.ERROR || state == State.SHUTDOWN) { shutdownLatch.countDown(); } } }
2.6. 启动 AdminServer
AdminServer 用来管理 ZooKeeperServer。有两种实现方式 JettyAdminServer 和 DummyAdminServer。
当 zookeeper.admin.enableServer
为 true 时才启动 AdminServer,通过反射的方式创建实例
public static AdminServer createAdminServer() { if (!"false".equals(System.getProperty("zookeeper.admin.enableServer"))) { try { Class<?> jettyAdminServerC = Class.forName("org.apache.zookeeper.server.admin.JettyAdminServer"); Object adminServer = jettyAdminServerC.getConstructor().newInstance(); return (AdminServer) adminServer; } catch (ClassNotFoundException e) { LOG.warn("Unable to start JettyAdminServer", e); } } return new DummyAdminServer(); }
创建完 AdminServer 后设置 ZooKeeperServer,启动 jetty 容器
adminServer = AdminServerFactory.createAdminServer(); adminServer.setZooKeeperServer(zkServer); adminServer.start();
Jetty 容器启动代码如下:
@Override public void start() throws AdminServerException { try { server.start(); } catch (Exception e) { // Server.start() only throws Exception, so let's at least wrap it // in an identifiable subclass throw new AdminServerException(String.format( "Problem starting AdminServer on address %s," + " port %d and command URL %s", address, port, commandUrl), e); } LOG.info(String.format("Started AdminServer on address %s, port %d" + " and command URL %s", address, port, commandUrl)); }
2.7. 创建 ServerCnxnFactory
早期版本,都是自己实现 NIO 框架,从 3.4.0 版本引入了 Netty,可以通过 zookeeper.serverCnxnFactory
来指定使用 NIO 还是 Netty 作为 Zookeeper 服务端网络连接工厂。
2.8. 初始化 ServerCnxnFactory
Zookeeper 首先会初始化一个 Thread,作为整个 ServerCnxnFactory 的主线程,然后再初始化 NIO 服务器。
// 初始化selectorThreads for (int i = 0; i < numSelectorThreads; ++i) { selectorThreads.add(new SelectorThread(i)); } // 初始化expirerThread expirerThread = new ConnectionExpirerThread(); // 初始化acceptThread acceptThread = new AcceptThread(ss, addr, selectorThreads);
2.9. 启动 ServerCnxnFactory 主线程
启动步骤 2.7 中已经初始化的主线程的主逻辑(run 方法)。这时候 ZooKeeper 的 NIO 服务器已经对外开放端口,客户端能够访问 2181 端口,但是此时 ZooKeeper 服务器是无法正常处理客户端请求的
@Override public void start() { stopped = false; if (workerPool == null) { workerPool = new WorkerService( "NIOWorker", numWorkerThreads, false); } for (SelectorThread thread : selectorThreads) { if (thread.getState() == Thread.State.NEW) { thread.start(); } } // ensure thread is started once and only once if (acceptThread.getState() == Thread.State.NEW) { acceptThread.start(); } if (expirerThread.getState() == Thread.State.NEW) { expirerThread.start(); } }
2.10. 恢复本地数据
从本地快照和事务日志文件中进行数据恢复。详细的数据恢复过程后面专题会讲
public void startdata() throws IOException, InterruptedException { //check to see if zkDb is not null if (zkDb == null) { zkDb = new ZKDatabase(this.txnLogFactory); } if (!zkDb.isInitialized()) { loadData(); } }
2.11. 创建并启动会话管理器
创建会话管理器 SessionTracker,SessionTracker 主要负责 ZooKeeper 服务端的会话管理,创建 SessionTracker 时,会设置 expireInterval、NextExpirationTime 和 SessionWithTimeout,还会计算出一个初始化的 SessionID
if (sessionTracker == null) { createSessionTracker(); } startSessionTracker();
2.12. 初始化 ZooKeeper 的请求处理链
典型的责任链方式实现,在 ZooKeeper 服务器上,会有多个请求处理器一次来处理一个客户端请求,在服务器启动的时候,会将这些请求处理器串联起来形成一个请求处理链。单机版服务器的请求处理链主要包括 PrepRequestProcessor
、SyncRequestProcessor
、FinalRequestProcessor
PrepRequestProcessor
-> SyncRequestProcessor
-> FinalRequestProcessor
单机版 ZooKeeper 服务器的请求处理链
protected void setupRequestProcessors() { RequestProcessor finalProcessor = new FinalRequestProcessor(this); RequestProcessor syncProcessor = new SyncRequestProcessor(this, finalProcessor); ((SyncRequestProcessor)syncProcessor).start(); firstProcessor = new PrepRequestProcessor(this, syncProcessor); ((PrepRequestProcessor)firstProcessor).start(); }
2.13. 注册 JMX 服务
ZooKeeper 会将服务器运行时的一些信息以 JMS 的方式暴露给外部,具体实现如下:
protected void registerJMX() { // register with JMX try { jmxServerBean = new ZooKeeperServerBean(this); MBeanRegistry.getInstance().register(jmxServerBean, null); try { jmxDataTreeBean = new DataTreeBean(zkDb.getDataTree()); MBeanRegistry.getInstance().register(jmxDataTreeBean, jmxServerBean); } catch (Exception e) { LOG.warn("Failed to register with JMX", e); jmxDataTreeBean = null; } } catch (Exception e) { LOG.warn("Failed to register with JMX", e); jmxServerBean = null; } }
2.14. 注册 ZooKeeper 服务器实例
前面的步骤过后,ZooKeeper 已经将 ServerCnxnFactory 主线程启动,但是同时 ZooKeeper 依旧无法处理客户端请求,原因是网络层不能够访问 ZooKeeper 实例,经过后续步骤的初始化,ZooKeeper 服务器实例已经初始化完毕,只需要注册给 ServerCnxnFactory 即可,之后 ZooKeeper 就可以对外提供正常的服务了。
final public void setZooKeeperServer(ZooKeeperServer zks) { this.zkServer = zks; if (zks != null) { if (secure) { zks.setSecureServerCnxnFactory(this); } else { zks.setServerCnxnFactory(this); } } }
至此,单机版的 ZooKeeper 服务器启动完毕。
欢迎来到这里!
我们正在构建一个小众社区,大家在这里相互信任,以平等 • 自由 • 奔放的价值观进行分享交流。最终,希望大家能够找到与自己志同道合的伙伴,共同成长。
注册 关于