1. 启动入口
ZooKeeper 服务端启动入口在 QuorumPeerMain
这个类中,主要逻辑在 initializeAndRun
方法中。主要逻辑为:
- 解析 zk 配置(console 是否输入了配置文件名称)
- 启动负责清理工作的 schedule(利用 Java Timer 实现的定时任务,清理过期文件快照)
- 启动过程(根据配置信息判断走集群模式还是单机模式)
1.1 解析 zoo.cfg 文件
try {
File configFile = (new VerifyingFileFactory.Builder(LOG)
.warnForRelativePath()
.failForNonExistingPath()
.build()).create(path);
Properties cfg = new Properties();
FileInputStream in = new FileInputStream(configFile);
try {
// 加载文件
cfg.load(in);
configFileStr = path;
} finally {
in.close();
}
// 解析配置
parseProperties(cfg);
} catch (IOException e) {
throw new ConfigException("Error processing " + path, e);
} catch (IllegalArgumentException e) {
throw new ConfigException("Error processing " + path, e);
}
1.2 启动 DatadirCleanupManager 历史文件清理器
启动一个 PurgeTask(继承自 TimeTask),去清理历史文件,清理的主要逻辑如代码所示
public static void purge(File dataDir, File snapDir, int num) throws IOException {
if (num < 3) {
throw new IllegalArgumentException(COUNT_ERR_MSG);
}
FileTxnSnapLog txnLog = new FileTxnSnapLog(dataDir, snapDir);
List<File> snaps = txnLog.findNRecentSnapshots(num);
int numSnaps = snaps.size();
if (numSnaps > 0) {
purgeOlderSnapshots(txnLog, snaps.get(numSnaps - 1));
}
}
初始化主体代码如下:
protected void initializeAndRun(String[] args)
throws ConfigException, IOException, AdminServerException {
// 如果启动命令行输入了配置文件名称,则进行zk 配置文件解析
QuorumPeerConfig config = new QuorumPeerConfig();
if (args.length == 1) {
config.parse(args[0]);
}
// Start and schedule the the purge task (启动一个负责清理工作的schedule)
DatadirCleanupManager purgeMgr = new DatadirCleanupManager(config
.getDataDir(), config.getDataLogDir(), config
.getSnapRetainCount(), config.getPurgeInterval());
purgeMgr.start();
if (args.length == 1 && config.isDistributed()) {
// 读取配置信息并启动,集群模式
runFromConfig(config);
} else {
LOG.warn("Either no config or no quorum defined in config, running "
+ " in standalone mode");
// there is only server in the quorum -- run as standalone
// 没有自定义配置,默认单机模式启动
ZooKeeperServerMain.main(args);
}
}
集群模式启动代码分析,逻辑在 runFromConfig(config)
中,单机模式的代码在 ZooKeeperServerMain
中。
2. 集群模式启动
2.1. 注册 log4j JMX Bean
- 对 log4j 注册 JMX MBean ,实现监控、管理能力
// Create and Register the top level Log4J MBean
// org.apache.log4j.jmx.HierarchyDynamicMBean hdm = new org.apache.log4j.jmx.HierarchyDynamicMBean();
Object hdm = Class.forName("org.apache.log4j.jmx.HierarchyDynamicMBean").getConstructor().newInstance();
String mbean = System.getProperty("zookeeper.jmx.log4j.mbean", "log4j:hierarchy=default");
ObjectName mbo = new ObjectName(mbean);
mbs.registerMBean(hdm, mbo);
// Add the root logger to the Hierarchy MBean
// org.apache.log4j.Logger rootLogger =
// org.apache.log4j.Logger.getRootLogger();
Object rootLogger = Class.forName("org.apache.log4j.Logger")
.getMethod("getRootLogger", (Class<?>[]) null)
.invoke(null, (Object[]) null);
// hdm.addLoggerMBean(rootLogger.getName());
Object rootLoggerName = rootLogger.getClass()
.getMethod("getName", (Class<?>[]) null)
.invoke(rootLogger, (Object[]) null);
hdm.getClass().getMethod("addLoggerMBean", String.class)
.invoke(hdm, rootLoggerName);
2.2. 启动 MetricsProvider
- 启动
MetricsProvider
,MetricsProvider 是一个可以收集指标、将当前值发送到外部设备的一个组件。数据在 server 端和 client 端可以共享。
public static MetricsProvider startMetricsProvider(String metricsProviderClassName, Properties configuration) throws MetricsProviderLifeCycleException {
try {
// 实例化metricsProvider,通过类名反射
MetricsProvider metricsProvider = (MetricsProvider) Class.forName(metricsProviderClassName,
true, Thread.currentThread().getContextClassLoader()).newInstance();
metricsProvider.configure(configuration);
// 启动过程
metricsProvider.start();
return metricsProvider;
} catch (ClassNotFoundException | IllegalAccessException | InstantiationException error) {
LOG.error("Cannot boot MetricsProvider {}", metricsProviderClassName, error);
throw new MetricsProviderLifeCycleException("Cannot boot MetricsProvider " + metricsProviderClassName,
error);
} catch (MetricsProviderLifeCycleException error) {
LOG.error("Cannot boot MetricsProvider {}", metricsProviderClassName, error);
throw error;
}
}
2.3. 初始化 QuorumPeer
2.3.1 创建 ServerCnxnFactory 工厂,初始化 QuorumPeer
QuorumPeer 组件介绍: 集群模式下特有的对象,zk 实例的托管者,从集群层面看,代表了 zk 集群中的一台机器,在运行期间,QuorumPeer 会不断的检测当前服务器实例的运行状态,同时根据情况发起 leader 选举。
2.3.2 初始化 quorum auth server 和 learner
会初始化两个对象 SaslQuorumAuthServer
和 SaslQuorumAuthLearner
,都实现了 QuorumAuthServer
,是 zk 授权机制的接口定义。定义的 authenticate
方法会处理授权的主要流程
public void authenticate(Socket sock, DataInputStream din) throws IOException;
SaslQuorumAuthServer
初始化逻辑如下:
从 zoo.cfg 配置文件中读取 quorumRequireSasl 的值,是否需要权限校验,authzHosts 是配置的服务端的 host 列表,只有在列表中的机器才能纳入 zk 集群。然后初始化一个 SaslQuorumServerCallbackHandler
,这是授权回调处理的组件,SASL 授权机制会用该 handler 做一些验证操作。最后创建 serverLogin
zoo.cfg 样例
quorum.auth.enableSasl=true
quorum.auth.learnerRequireSasl=true
quorum.auth.serverRequireSasl=true
quorum.auth.learner.loginContext=QuorumLearner
quorum.auth.server.loginContext=QuorumServer
quorum.auth.kerberos.servicePrincipal=servicename/_HOST
quorum.cnxn.threads.size=20
创建过程:
public SaslQuorumAuthServer(boolean quorumRequireSasl, String loginContext, Set<String> authzHosts)
throws SaslException {
this.quorumRequireSasl = quorumRequireSasl;
try {
AppConfigurationEntry entries[] = Configuration.getConfiguration()
.getAppConfigurationEntry(loginContext);
if (entries == null || entries.length == 0) {
throw new LoginException("SASL-authentication failed"
+ " because the specified JAAS configuration "
+ "section '" + loginContext + "' could not be found.");
}
SaslQuorumServerCallbackHandler saslServerCallbackHandler = new SaslQuorumServerCallbackHandler(
Configuration.getConfiguration(), loginContext, authzHosts);
serverLogin = new Login(loginContext, saslServerCallbackHandler, new ZKConfig());
serverLogin.startThreadIfNeeded();
} catch (Throwable e) {
throw new SaslException(
"Failed to initialize authentication mechanism using SASL",
e);
}
}
SaslQuorumAuthLearner
初始化最终也会创建一个 learnerLogin
,回调处理组件是 SaslClientCallbackHandler
,Login
处理 zk 客户端和服务端登录时的 Kerberos 凭证刷新。
public SaslQuorumAuthLearner(boolean quorumRequireSasl, String quorumServicePrincipal, String loginContext) throws SaslException {
this.quorumRequireSasl = quorumRequireSasl;
this.quorumServicePrincipal = quorumServicePrincipal;
try {
AppConfigurationEntry entries[] = Configuration
.getConfiguration()
.getAppConfigurationEntry(loginContext);
if (entries == null || entries.length == 0) {
throw new LoginException("SASL-authentication failed because"
+ " the specified JAAS configuration "
+ "section '" + loginContext
+ "' could not be found.");
}
this.learnerLogin = new Login(loginContext,
new SaslClientCallbackHandler(null, "QuorumLearner"), new ZKConfig());
this.learnerLogin.startThreadIfNeeded();
} catch (LoginException e) {
throw new SaslException("Failed to initialize authentication mechanism using SASL", e);
}
}
2.3.3 Kerberos 认证原理
2.3.4 loadDataBase
加载数据。把数据从磁盘加载到内存中。调用的是 loadDataBase
,这个方法也能把当前事务信息保存到已提交事务日志记录中,保存到磁盘
snapLog.restore
用于从快照或者事务日志恢复 server 端数据,并返回最大的 zxid
public long loadDataBase() throws IOException {
long startTime = Time.currentElapsedTime();
long zxid = snapLog.restore(dataTree, sessionsWithTimeouts, commitProposalPlaybackListener);
initialized = true;
long loadTime = Time.currentElapsedTime() - startTime;
ServerMetrics.DB_INIT_TIME.add(loadTime);
LOG.info("Snapshot loaded in " + loadTime + " ms");
return zxid;
}
2.3.5 startServerCnxnFactory
server 端要启动的 factory 有两个,cnxnFactory 和 securityCnxnFactory。每个 Factory 的实现均有两种方式,Netty
和 NIO
方式。前面说过 ServerCnxn 是个处理网络 I/O 的组件,需要建立 socket 通道。这两种方式就是建立网络 socket 的不同方式
private void startServerCnxnFactory() {
if (cnxnFactory != null) {
cnxnFactory.start();
}
if (secureCnxnFactory != null) {
secureCnxnFactory.start();
}
}
在介绍 cnxnFactory 通过 NIO 建立 socket 的方式之前,需要了解一下几个组件
WorkerService
:一个 worker 线程池,处理工作任务,里面包含若干 ExecutorService,可以设置工作线程数,shutDown 超时时间。
SelectorThread
:zk 封装的 thread,继承自 Thread,封装了 NIO 的 Selector
、SocketChannel
,SelectionKey
,用于完成网络通信。
类图关系如下:
start 主要逻辑
- workerPool 为空时,初始化工作线程池
- 启动 select 线程
- 启动 acceptThread、expirerThread,确保只会启动一次
- selector 检测多个通道,并知晓通道为读写事件做好准备,管理多个通道
- 处理已经同意建立的连接,通道向选择器注册事件,建立连接。
- 对于建立完成的连接,将 cnxn 添加到指定的 set 集合中
- 线程中断后,通道清理工作,断开连接。
public void start() {
stopped = false;
if (workerPool == null) {
workerPool = new WorkerService(
"NIOWorker", numWorkerThreads, false);
}
for(SelectorThread thread : selectorThreads) {
if (thread.getState() == Thread.State.NEW) {
thread.start();
}
}
// ensure thread is started once and only once
if (acceptThread.getState() == Thread.State.NEW) {
acceptThread.start();
}
if (expirerThread.getState() == Thread.State.NEW) {
expirerThread.start();
}
}
run()主要逻辑:
public void run() {
try {
while (!stopped) {
try {
// 监测多个通道
select();
// 处理连接过程
processAcceptedConnections();
//
processInterestOpsUpdateRequests();
} catch (RuntimeException e) {
LOG.warn("Ignoring unexpected runtime exception", e);
} catch (Exception e) {
LOG.warn("Ignoring unexpected exception", e);
}
}
// 线程中断后,断开连接,通道清理
// Close connections still pending on the selector. Any others
// with in-flight work, let drain out of the work queue.
for (SelectionKey key : selector.keys()) {
NIOServerCnxn cnxn = (NIOServerCnxn) key.attachment();
if (cnxn.isSelectable()) {
cnxn.close();
}
cleanupSelectionKey(key);
}
SocketChannel accepted;
while ((accepted = acceptedQueue.poll()) != null) {
fastCloseSock(accepted);
}
updateQueue.clear();
} finally {
// 关闭selector
closeSelector();
// 通知工作线程进行中断操作
// This will wake up the accept thread and the other selector
// threads, and tell the worker thread pool to begin shutdown.
NIOServerCnxnFactory.this.stop();
LOG.info("selector thread exitted run method");
}
}
cnxnFactory 通过 netty start 的逻辑如下
public void start() {
if (listenBacklog != -1) {
bootstrap.option(ChannelOption.SO_BACKLOG, listenBacklog);
}
LOG.info("binding to port {}", localAddress);
parentChannel = bootstrap.bind(localAddress).syncUninterruptibly().channel();
// Port changes after bind() if the original port was 0, update
// localAddress to get the real port.
localAddress = (InetSocketAddress) parentChannel.localAddress();
LOG.info("bound to port " + getLocalPort());
}
2.3.6 jetty server 启动
启动内嵌的 jetty:
public void start() throws AdminServerException {
try {
server.start();
} catch (Exception e) {
// Server.start() only throws Exception, so let's at least wrap it
// in an identifiable subclass
throw new AdminServerException(String.format(
"Problem starting AdminServer on address %s,"
+ " port %d and command URL %s", address, port,
commandUrl), e);
}
LOG.info(String.format("Started AdminServer on address %s, port %d"
+ " and command URL %s", address, port, commandUrl));
}
2.3.7 startLeaderElection
接下来进入 leader 选举的过程。
- 进行投票
- 发送投票结果
private void starter(QuorumPeer self) {
this.self = self;
// 生成投票
port = self.getVotingView().get(self.getId()).electionAddr.getPort();
proposedLeader = -1;
proposedZxid = -1;
try {
// 生成报文套接字
mySocket = new DatagramSocket(port);
// mySocket.setSoTimeout(20000);
} catch (SocketException e1) {
e1.printStackTrace();
throw new RuntimeException();
}
// 要发送的投票结果
sendqueue = new LinkedBlockingQueue<ToSend>(2 * self.getVotingView().size());
// 收到的投票
recvqueue = new LinkedBlockingQueue<Notification>(2 * self.getVotingView()
.size());
// 把投票结果生成消息发送出去
new Messenger(self.getVotingView().size() * 2, mySocket);
}
3. 流程总结
服务器启动集群模式的大概流程如下所示:
欢迎来到这里!
我们正在构建一个小众社区,大家在这里相互信任,以平等 • 自由 • 奔放的价值观进行分享交流。最终,希望大家能够找到与自己志同道合的伙伴,共同成长。
注册 关于