1. 预启动
1.1. QuorumPeerMain
作为启动入口
跟集群模式一样,启动入口也是在 QuorumPeerMain
main 方法中。
1.2. 解析配置文件 zoo.cfg
配置文件包含了 zk 运行时需要的基本参数,比如 tickTime、dataDir 和 clientPort。解析的主要逻辑如下:
try {
File configFile = (new VerifyingFileFactory.Builder(LOG)
.warnForRelativePath()
.failForNonExistingPath()
.build()).create(path);
Properties cfg = new Properties();
FileInputStream in = new FileInputStream(configFile);
try {
// 文件加载
cfg.load(in);
configFileStr = path;
} finally {
in.close();
}
// 解析配置文件
parseProperties(cfg);
} catch (IOException e) {
throw new ConfigException("Error processing " + path, e);
} catch (IllegalArgumentException e) {
throw new ConfigException("Error processing " + path, e);
}
1.3. 创建并启动历史文件清理器 DatadirCleanupManager
ZooKeeper 的自动清理文件机制,对快照文件和事务日志定期进行清理。实例创建的代码如下:
DatadirCleanupManager purgeMgr = new DatadirCleanupManager(config.getDataDir(), config.getDataLogDir(), config.getSnapRetainCount(), config.getPurgeInterval());
可以看到,创建实例时需要知道 dataDir、dataLogDir、快照保存个数、清理间隔这些参数。内部基于 TimerTask
起了一个定时任务,会根据 purgeInterval
定期去清理文件。
timer = new Timer("PurgeTask", true);
TimerTask task = new PurgeTask(dataLogDir, snapDir, snapRetainCount);
timer.scheduleAtFixedRate(task, 0, TimeUnit.HOURS.toMillis(purgeInterval));
1.4. 判断启动模式
根据解析出的服务器地址列表判断走单机模式还是集群模式。
if (args.length == 1 && config.isDistributed()) {
runFromConfig(config);
} else {
LOG.warn("Either no config or no quorum defined in config, running "
+ " in standalone mode");
// there is only server in the quorum -- run as standalone
ZooKeeperServerMain.main(args);
}
这里的 args
是启动命令行输入的配置文件名称,当文件名称只有一个,并且配置文件里配置了 distributed=true
时走集群模式,否则走单机模式。单机启动的代码都在 ZooKeeperServerMain.main(args)
中
1.5. 再次进行配置解析
单机启动时,命令输入的参数会有两种情况
- 输入了配置文件名称
- 直接输入了配置参数,比如按顺序输入了
clientPortAddress
,dataDir
,dataLogDir
,tickTime
,maxClientCnxns
所以要根据这两种情况进行不同的解析
if (args.length == 1) {
config.parse(args[0]);
} else {
config.parse(args);
}
当命令行输入的文件名称时,根据路径构造 File 对象,进行解析,然后把配置信息读入到 ServerConfig 中
public void parse(String path) throws ConfigException {
QuorumPeerConfig config = new QuorumPeerConfig();
// 根据文件名称解析配置文件
config.parse(path);
// let qpconfig parse the file and then pull the stuff we are
// interested in
// 从配置中读取配置信息,给ServerConfig赋值
readFrom(config);
}
直接输入配置参数时,需要保证输入参数的顺序,代码中是按顺序取的,顺序见上面描述。
public void parse(String[] args) {
if (args.length < 2 || args.length > 4) {
throw new IllegalArgumentException("Invalid number of arguments:" + Arrays.toString(args));
}
// clientPortAddress是第一顺序
clientPortAddress = new InetSocketAddress(Integer.parseInt(args[0]));
// 第二顺序是dataDir
dataDir = new File(args[1]);
dataLogDir = dataDir;
if (args.length >= 3) {
// 第三顺序是tickTime
tickTime = Integer.parseInt(args[2]);
}
if (args.length == 4) {
// 第四顺序是maxClientCnxns
maxClientCnxns = Integer.parseInt(args[3]);
}
}
2. 初始化
接下来会执行 runFromConfig(ServerConfig config)
方法,进行服务器实例化过程,分步骤来看。
2.1. 创建 FileTxnSnapLog
FileTxnSnapLog 是 ZooKeeper 上层服务于底层数据存储之间的对接层,提供了一系列操作数据文件的接口。包括事务日志文件和快照数据文件。根据 dataDir 和 snapDir 来创建 FileTxnSnapLog。
this.dataDir = new File(dataDir, version + VERSION);
this.snapDir = new File(snapDir, version + VERSION);
主要的方法有如下一些,后面会作为专题来讲
// 读取快照文件和事务日志之后恢复服务端数据
restore(DataTree, Map, Integer>, PlayBackListener):long
// 把最新的事务日志快速更新到server database,与restore不同的是只处理事务日志
fastForwardFromEdits(DataTree, Map, Integer>,PlayBackListener):long
// 把dataTree和sessions放入快照文件中
save(DataTree,ConcurrentHashMap, Integer>, boolean):void
// 在dataTree上处理事务
processTransaction(TxnHeader,DataTree ,Map, Integer> , Record ):void
2.2. 创建 MetricsProvider
MetricsProvider 是一个可以收集指标、将当前值发送到外部设备的一个组件。数据在 server 端和 client 端可以共享。在《ZooKeeper 源码分析二-服务端启动之集群模式》讲过,这里不再细说。
2.3. 创建服务器实例 ZooKeeperServer
依赖 txnLog、tickTime、minSessionTimeout、maxSessionTimeout、listenBacklog 等参数实例化 ZooKeeperServer:
ZooKeeperServer zkServer = new ZooKeeperServer(txnLog, config.tickTime, config.minSessionTimeout, config.maxSessionTimeout, config.listenBacklog, null);
2.4. 创建 ServerStats
ServerStats 是一个服务端统计器,包含了最基本的运行时信息:
packetsSent
从 zk 启动开始,或是最近一次重置服务端统计信息之后,服务端向客户端发送的响应包次数packetsReceived
从 zk 启动开始,或是最近一次重置服务端统计信息之后,服务端接收到来自客户端的请求包次数requestLatency
从 zk 启动开始,或是最近一次重置服务端统计信息之后服务端请求处理的最大延时、最小延时、平均延时以及总延时clientResponseStats
提供 jute 序列化缓冲区使用情况的实时统计信息fsyncThresholdExceedCount
同步内存数据到存储设备超过阈值的次数startTime
zk 启动后开始统计的时间
2.5. Registers shutdown handler
ZooKeeperServerShutdownHandler
是 zk 用于处理异常的组件。当系统发生错误时,会使用 CountDownLatch 通知其他线程停止工作
class ZooKeeperServerShutdownHandler {
private final CountDownLatch shutdownLatch;
ZooKeeperServerShutdownHandler(CountDownLatch shutdownLatch) {
this.shutdownLatch = shutdownLatch;
}
/**
* This will be invoked when the server transition to a new server state.
*
* @param state new server state
*/
void handle(State state) {
if (state == State.ERROR || state == State.SHUTDOWN) {
shutdownLatch.countDown();
}
}
}
2.6. 启动 AdminServer
AdminServer 用来管理 ZooKeeperServer。有两种实现方式 JettyAdminServer 和 DummyAdminServer。
当 zookeeper.admin.enableServer
为 true 时才启动 AdminServer,通过反射的方式创建实例
public static AdminServer createAdminServer() {
if (!"false".equals(System.getProperty("zookeeper.admin.enableServer"))) {
try {
Class<?> jettyAdminServerC = Class.forName("org.apache.zookeeper.server.admin.JettyAdminServer");
Object adminServer = jettyAdminServerC.getConstructor().newInstance();
return (AdminServer) adminServer;
} catch (ClassNotFoundException e) {
LOG.warn("Unable to start JettyAdminServer", e);
}
}
return new DummyAdminServer();
}
创建完 AdminServer 后设置 ZooKeeperServer,启动 jetty 容器
adminServer = AdminServerFactory.createAdminServer();
adminServer.setZooKeeperServer(zkServer);
adminServer.start();
Jetty 容器启动代码如下:
@Override
public void start() throws AdminServerException {
try {
server.start();
} catch (Exception e) {
// Server.start() only throws Exception, so let's at least wrap it
// in an identifiable subclass
throw new AdminServerException(String.format(
"Problem starting AdminServer on address %s,"
+ " port %d and command URL %s", address, port,
commandUrl), e);
}
LOG.info(String.format("Started AdminServer on address %s, port %d"
+ " and command URL %s", address, port, commandUrl));
}
2.7. 创建 ServerCnxnFactory
早期版本,都是自己实现 NIO 框架,从 3.4.0 版本引入了 Netty,可以通过 zookeeper.serverCnxnFactory
来指定使用 NIO 还是 Netty 作为 Zookeeper 服务端网络连接工厂。
2.8. 初始化 ServerCnxnFactory
Zookeeper 首先会初始化一个 Thread,作为整个 ServerCnxnFactory 的主线程,然后再初始化 NIO 服务器。
// 初始化selectorThreads
for (int i = 0; i < numSelectorThreads; ++i) {
selectorThreads.add(new SelectorThread(i));
}
// 初始化expirerThread
expirerThread = new ConnectionExpirerThread();
// 初始化acceptThread
acceptThread = new AcceptThread(ss, addr, selectorThreads);
2.9. 启动 ServerCnxnFactory 主线程
启动步骤 2.7 中已经初始化的主线程的主逻辑(run 方法)。这时候 ZooKeeper 的 NIO 服务器已经对外开放端口,客户端能够访问 2181 端口,但是此时 ZooKeeper 服务器是无法正常处理客户端请求的
@Override
public void start() {
stopped = false;
if (workerPool == null) {
workerPool = new WorkerService(
"NIOWorker", numWorkerThreads, false);
}
for (SelectorThread thread : selectorThreads) {
if (thread.getState() == Thread.State.NEW) {
thread.start();
}
}
// ensure thread is started once and only once
if (acceptThread.getState() == Thread.State.NEW) {
acceptThread.start();
}
if (expirerThread.getState() == Thread.State.NEW) {
expirerThread.start();
}
}
2.10. 恢复本地数据
从本地快照和事务日志文件中进行数据恢复。详细的数据恢复过程后面专题会讲
public void startdata() throws IOException, InterruptedException {
//check to see if zkDb is not null
if (zkDb == null) {
zkDb = new ZKDatabase(this.txnLogFactory);
}
if (!zkDb.isInitialized()) {
loadData();
}
}
2.11. 创建并启动会话管理器
创建会话管理器 SessionTracker,SessionTracker 主要负责 ZooKeeper 服务端的会话管理,创建 SessionTracker 时,会设置 expireInterval、NextExpirationTime 和 SessionWithTimeout,还会计算出一个初始化的 SessionID
if (sessionTracker == null) {
createSessionTracker();
}
startSessionTracker();
2.12. 初始化 ZooKeeper 的请求处理链
典型的责任链方式实现,在 ZooKeeper 服务器上,会有多个请求处理器一次来处理一个客户端请求,在服务器启动的时候,会将这些请求处理器串联起来形成一个请求处理链。单机版服务器的请求处理链主要包括 PrepRequestProcessor
、SyncRequestProcessor
、FinalRequestProcessor
PrepRequestProcessor
-> SyncRequestProcessor
-> FinalRequestProcessor
单机版 ZooKeeper 服务器的请求处理链
protected void setupRequestProcessors() {
RequestProcessor finalProcessor = new FinalRequestProcessor(this);
RequestProcessor syncProcessor = new SyncRequestProcessor(this,
finalProcessor);
((SyncRequestProcessor)syncProcessor).start();
firstProcessor = new PrepRequestProcessor(this, syncProcessor);
((PrepRequestProcessor)firstProcessor).start();
}
2.13. 注册 JMX 服务
ZooKeeper 会将服务器运行时的一些信息以 JMS 的方式暴露给外部,具体实现如下:
protected void registerJMX() {
// register with JMX
try {
jmxServerBean = new ZooKeeperServerBean(this);
MBeanRegistry.getInstance().register(jmxServerBean, null);
try {
jmxDataTreeBean = new DataTreeBean(zkDb.getDataTree());
MBeanRegistry.getInstance().register(jmxDataTreeBean, jmxServerBean);
} catch (Exception e) {
LOG.warn("Failed to register with JMX", e);
jmxDataTreeBean = null;
}
} catch (Exception e) {
LOG.warn("Failed to register with JMX", e);
jmxServerBean = null;
}
}
2.14. 注册 ZooKeeper 服务器实例
前面的步骤过后,ZooKeeper 已经将 ServerCnxnFactory 主线程启动,但是同时 ZooKeeper 依旧无法处理客户端请求,原因是网络层不能够访问 ZooKeeper 实例,经过后续步骤的初始化,ZooKeeper 服务器实例已经初始化完毕,只需要注册给 ServerCnxnFactory 即可,之后 ZooKeeper 就可以对外提供正常的服务了。
final public void setZooKeeperServer(ZooKeeperServer zks) {
this.zkServer = zks;
if (zks != null) {
if (secure) {
zks.setSecureServerCnxnFactory(this);
} else {
zks.setServerCnxnFactory(this);
}
}
}
至此,单机版的 ZooKeeper 服务器启动完毕。
欢迎来到这里!
我们正在构建一个小众社区,大家在这里相互信任,以平等 • 自由 • 奔放的价值观进行分享交流。最终,希望大家能够找到与自己志同道合的伙伴,共同成长。
注册 关于