ZooKeeper 源码分析 (二)—服务端启动之集群模式

本贴最后更新于 2105 天前,其中的信息可能已经物是人非

1. 启动入口

ZooKeeper 服务端启动入口在 QuorumPeerMain 这个类中,主要逻辑在 initializeAndRun 方法中。主要逻辑为:

  • 解析 zk 配置(console 是否输入了配置文件名称)
  • 启动负责清理工作的 schedule(利用 Java Timer 实现的定时任务,清理过期文件快照)
  • 启动过程(根据配置信息判断走集群模式还是单机模式)

1.1 解析 zoo.cfg 文件

try {
    File configFile = (new VerifyingFileFactory.Builder(LOG)
        .warnForRelativePath()
        .failForNonExistingPath()
        .build()).create(path);
        
    Properties cfg = new Properties();
    FileInputStream in = new FileInputStream(configFile);
    try {
    	// 加载文件
        cfg.load(in);
        configFileStr = path;
    } finally {
        in.close();
    }
    // 解析配置
    parseProperties(cfg);
} catch (IOException e) {
    throw new ConfigException("Error processing " + path, e);
} catch (IllegalArgumentException e) {
    throw new ConfigException("Error processing " + path, e);
} 

1.2 启动 DatadirCleanupManager 历史文件清理器

启动一个 PurgeTask(继承自 TimeTask),去清理历史文件,清理的主要逻辑如代码所示

public static void purge(File dataDir, File snapDir, int num) throws IOException {
    if (num < 3) {
        throw new IllegalArgumentException(COUNT_ERR_MSG);
    }

    FileTxnSnapLog txnLog = new FileTxnSnapLog(dataDir, snapDir);

    List<File> snaps = txnLog.findNRecentSnapshots(num);
    int numSnaps = snaps.size();
    if (numSnaps > 0) {
        purgeOlderSnapshots(txnLog, snaps.get(numSnaps - 1));
    }
}

初始化主体代码如下:

protected void initializeAndRun(String[] args)
        throws ConfigException, IOException, AdminServerException {

    // 如果启动命令行输入了配置文件名称,则进行zk 配置文件解析    
    QuorumPeerConfig config = new QuorumPeerConfig();
    if (args.length == 1) {
        config.parse(args[0]);
    }

    // Start and schedule the the purge task (启动一个负责清理工作的schedule)
    DatadirCleanupManager purgeMgr = new DatadirCleanupManager(config
            .getDataDir(), config.getDataLogDir(), config
            .getSnapRetainCount(), config.getPurgeInterval());
    purgeMgr.start();

    if (args.length == 1 && config.isDistributed()) {
        // 读取配置信息并启动,集群模式
        runFromConfig(config);
    } else {
        LOG.warn("Either no config or no quorum defined in config, running "
                + " in standalone mode");
        // there is only server in the quorum -- run as standalone
        // 没有自定义配置,默认单机模式启动
        ZooKeeperServerMain.main(args);
    }
}

集群模式启动代码分析,逻辑在 runFromConfig(config) 中,单机模式的代码在 ZooKeeperServerMain 中。

2. 集群模式启动

2.1. 注册 log4j JMX Bean

  • 对 log4j 注册 JMX MBean ,实现监控、管理能力

// Create and Register the top level Log4J MBean
// org.apache.log4j.jmx.HierarchyDynamicMBean hdm = new org.apache.log4j.jmx.HierarchyDynamicMBean();
Object hdm = Class.forName("org.apache.log4j.jmx.HierarchyDynamicMBean").getConstructor().newInstance();

String mbean = System.getProperty("zookeeper.jmx.log4j.mbean", "log4j:hierarchy=default");
ObjectName mbo = new ObjectName(mbean);
mbs.registerMBean(hdm, mbo);

// Add the root logger to the Hierarchy MBean
// org.apache.log4j.Logger rootLogger =
// org.apache.log4j.Logger.getRootLogger();
Object rootLogger = Class.forName("org.apache.log4j.Logger")
        .getMethod("getRootLogger", (Class<?>[]) null)
        .invoke(null, (Object[]) null);

// hdm.addLoggerMBean(rootLogger.getName());
Object rootLoggerName = rootLogger.getClass()
        .getMethod("getName", (Class<?>[]) null)
        .invoke(rootLogger, (Object[]) null);
hdm.getClass().getMethod("addLoggerMBean", String.class)
        .invoke(hdm, rootLoggerName);

2.2. 启动 MetricsProvider

  • 启动 MetricsProvider,MetricsProvider 是一个可以收集指标、将当前值发送到外部设备的一个组件。数据在 server 端和 client 端可以共享。
public static MetricsProvider startMetricsProvider(String metricsProviderClassName, Properties configuration) throws MetricsProviderLifeCycleException {
    try {
        // 实例化metricsProvider,通过类名反射
        MetricsProvider metricsProvider = (MetricsProvider) Class.forName(metricsProviderClassName,
                true, Thread.currentThread().getContextClassLoader()).newInstance();

        metricsProvider.configure(configuration);

        // 启动过程
        metricsProvider.start();
        return metricsProvider;
    } catch (ClassNotFoundException | IllegalAccessException | InstantiationException error) {
        LOG.error("Cannot boot MetricsProvider {}", metricsProviderClassName, error);
        throw new MetricsProviderLifeCycleException("Cannot boot MetricsProvider " + metricsProviderClassName,
                error);
    } catch (MetricsProviderLifeCycleException error) {
        LOG.error("Cannot boot MetricsProvider {}", metricsProviderClassName, error);
        throw error;
    }
}

2.3. 初始化 QuorumPeer

2.3.1 创建 ServerCnxnFactory 工厂,初始化 QuorumPeer

QuorumPeer 组件介绍: 集群模式下特有的对象,zk 实例的托管者,从集群层面看,代表了 zk 集群中的一台机器,在运行期间,QuorumPeer 会不断的检测当前服务器实例的运行状态,同时根据情况发起 leader 选举。

2.3.2 初始化 quorum auth server 和 learner

会初始化两个对象 SaslQuorumAuthServerSaslQuorumAuthLearner,都实现了 QuorumAuthServer,是 zk 授权机制的接口定义。定义的 authenticate 方法会处理授权的主要流程

public void authenticate(Socket sock, DataInputStream din) throws IOException;

SaslQuorumAuthServer 初始化逻辑如下:
从 zoo.cfg 配置文件中读取 quorumRequireSasl 的值,是否需要权限校验,authzHosts 是配置的服务端的 host 列表,只有在列表中的机器才能纳入 zk 集群。然后初始化一个 SaslQuorumServerCallbackHandler,这是授权回调处理的组件,SASL 授权机制会用该 handler 做一些验证操作。最后创建 serverLogin

zoo.cfg 样例

quorum.auth.enableSasl=true
quorum.auth.learnerRequireSasl=true
quorum.auth.serverRequireSasl=true
quorum.auth.learner.loginContext=QuorumLearner
quorum.auth.server.loginContext=QuorumServer
quorum.auth.kerberos.servicePrincipal=servicename/_HOST
quorum.cnxn.threads.size=20

创建过程:

public SaslQuorumAuthServer(boolean quorumRequireSasl, String loginContext, Set<String> authzHosts)
            throws SaslException {
    this.quorumRequireSasl = quorumRequireSasl;
    try {
        AppConfigurationEntry entries[] = Configuration.getConfiguration()
                .getAppConfigurationEntry(loginContext);
        if (entries == null || entries.length == 0) {
            throw new LoginException("SASL-authentication failed"
                    + " because the specified JAAS configuration "
                    + "section '" + loginContext + "' could not be found.");
        }
        SaslQuorumServerCallbackHandler saslServerCallbackHandler = new SaslQuorumServerCallbackHandler(
                Configuration.getConfiguration(), loginContext, authzHosts);
        serverLogin = new Login(loginContext, saslServerCallbackHandler, new ZKConfig());
        serverLogin.startThreadIfNeeded();
    } catch (Throwable e) {
        throw new SaslException(
                "Failed to initialize authentication mechanism using SASL",
                e);
    }
}

SaslQuorumAuthLearner 初始化最终也会创建一个 learnerLogin,回调处理组件是 SaslClientCallbackHandlerLogin 处理 zk 客户端和服务端登录时的 Kerberos 凭证刷新。

public SaslQuorumAuthLearner(boolean quorumRequireSasl, String quorumServicePrincipal, String loginContext) throws SaslException {
    this.quorumRequireSasl = quorumRequireSasl;
    this.quorumServicePrincipal = quorumServicePrincipal;
    try {
        AppConfigurationEntry entries[] = Configuration
            .getConfiguration()
            .getAppConfigurationEntry(loginContext);
        if (entries == null || entries.length == 0) {
            throw new LoginException("SASL-authentication failed because"
                                     + " the specified JAAS configuration "
                                     + "section '" + loginContext
                                     + "' could not be found.");
        }
        this.learnerLogin = new Login(loginContext,
                                new SaslClientCallbackHandler(null, "QuorumLearner"), new ZKConfig());
        this.learnerLogin.startThreadIfNeeded();
    } catch (LoginException e) {
        throw new SaslException("Failed to initialize authentication mechanism using SASL", e);
    }
}

2.3.3 Kerberos 认证原理

1

2.3.4 loadDataBase

加载数据。把数据从磁盘加载到内存中。调用的是 loadDataBase,这个方法也能把当前事务信息保存到已提交事务日志记录中,保存到磁盘

snapLog.restore 用于从快照或者事务日志恢复 server 端数据,并返回最大的 zxid

public long loadDataBase() throws IOException {
    long startTime = Time.currentElapsedTime();
    long zxid = snapLog.restore(dataTree, sessionsWithTimeouts, commitProposalPlaybackListener);
    initialized = true;
    long loadTime = Time.currentElapsedTime() - startTime;
    ServerMetrics.DB_INIT_TIME.add(loadTime);
    LOG.info("Snapshot loaded in " + loadTime + " ms");
    return zxid;
}

2.3.5 startServerCnxnFactory

server 端要启动的 factory 有两个,cnxnFactory 和 securityCnxnFactory。每个 Factory 的实现均有两种方式,NettyNIO 方式。前面说过 ServerCnxn 是个处理网络 I/O 的组件,需要建立 socket 通道。这两种方式就是建立网络 socket 的不同方式

private void startServerCnxnFactory() {
    if (cnxnFactory != null) {
        cnxnFactory.start();
    }
    if (secureCnxnFactory != null) {
        secureCnxnFactory.start();
    }
}

在介绍 cnxnFactory 通过 NIO 建立 socket 的方式之前,需要了解一下几个组件

WorkerService:一个 worker 线程池,处理工作任务,里面包含若干 ExecutorService,可以设置工作线程数,shutDown 超时时间。

SelectorThread:zk 封装的 thread,继承自 Thread,封装了 NIO 的 SelectorSocketChannelSelectionKey,用于完成网络通信。

类图关系如下:

2

start 主要逻辑

  • workerPool 为空时,初始化工作线程池
  • 启动 select 线程
  • 启动 acceptThread、expirerThread,确保只会启动一次
  • selector 检测多个通道,并知晓通道为读写事件做好准备,管理多个通道
  • 处理已经同意建立的连接,通道向选择器注册事件,建立连接。
  • 对于建立完成的连接,将 cnxn 添加到指定的 set 集合中
  • 线程中断后,通道清理工作,断开连接。
public void start() {
    stopped = false;
    if (workerPool == null) {
        workerPool = new WorkerService(
            "NIOWorker", numWorkerThreads, false);
    }
    for(SelectorThread thread : selectorThreads) {
        if (thread.getState() == Thread.State.NEW) {
            thread.start();
        }
    }
    // ensure thread is started once and only once
    if (acceptThread.getState() == Thread.State.NEW) {
        acceptThread.start();
    }
    if (expirerThread.getState() == Thread.State.NEW) {
        expirerThread.start();
    }
}

run()主要逻辑:

  public void run() {
    try {
        while (!stopped) {
            try {
            	// 监测多个通道
                select();
                // 处理连接过程
                processAcceptedConnections();
                // 
                processInterestOpsUpdateRequests();
            } catch (RuntimeException e) {
                LOG.warn("Ignoring unexpected runtime exception", e);
            } catch (Exception e) {
                LOG.warn("Ignoring unexpected exception", e);
            }
        }

        // 线程中断后,断开连接,通道清理
        // Close connections still pending on the selector. Any others
        // with in-flight work, let drain out of the work queue.
        for (SelectionKey key : selector.keys()) {
            NIOServerCnxn cnxn = (NIOServerCnxn) key.attachment();
            if (cnxn.isSelectable()) {
                cnxn.close();
            }
            cleanupSelectionKey(key);
        }
        SocketChannel accepted;
        while ((accepted = acceptedQueue.poll()) != null) {
            fastCloseSock(accepted);
        }
        updateQueue.clear();
    } finally {
    	// 关闭selector
        closeSelector();

        // 通知工作线程进行中断操作
        // This will wake up the accept thread and the other selector
        // threads, and tell the worker thread pool to begin shutdown.
        NIOServerCnxnFactory.this.stop();
        LOG.info("selector thread exitted run method");
    }
}

cnxnFactory 通过 netty start 的逻辑如下

public void start() {
    if (listenBacklog != -1) {
        bootstrap.option(ChannelOption.SO_BACKLOG, listenBacklog);
    }
    LOG.info("binding to port {}", localAddress);
    parentChannel = bootstrap.bind(localAddress).syncUninterruptibly().channel();
    // Port changes after bind() if the original port was 0, update
    // localAddress to get the real port.
    localAddress = (InetSocketAddress) parentChannel.localAddress();
    LOG.info("bound to port " + getLocalPort());
}

2.3.6 jetty server 启动

启动内嵌的 jetty:

public void start() throws AdminServerException {
    try {
        server.start();
    } catch (Exception e) {
        // Server.start() only throws Exception, so let's at least wrap it
        // in an identifiable subclass
        throw new AdminServerException(String.format(
                "Problem starting AdminServer on address %s,"
                        + " port %d and command URL %s", address, port,
                commandUrl), e);
    }
    LOG.info(String.format("Started AdminServer on address %s, port %d"
            + " and command URL %s", address, port, commandUrl));
}

2.3.7 startLeaderElection

接下来进入 leader 选举的过程。

  • 进行投票
  • 发送投票结果
private void starter(QuorumPeer self) {
    this.self = self;
    // 生成投票
    port = self.getVotingView().get(self.getId()).electionAddr.getPort();
    proposedLeader = -1;
    proposedZxid = -1;

    try {
    	// 生成报文套接字
        mySocket = new DatagramSocket(port);
        // mySocket.setSoTimeout(20000);
    } catch (SocketException e1) {
        e1.printStackTrace();
        throw new RuntimeException();
    }

    // 要发送的投票结果
    sendqueue = new LinkedBlockingQueue<ToSend>(2 * self.getVotingView().size());
    // 收到的投票
    recvqueue = new LinkedBlockingQueue<Notification>(2 * self.getVotingView()
            .size());

    // 把投票结果生成消息发送出去
    new Messenger(self.getVotingView().size() * 2, mySocket);
}

3. 流程总结

服务器启动集群模式的大概流程如下所示:

1

  • ZooKeeper

    ZooKeeper 是一个分布式的,开放源码的分布式应用程序协调服务,是 Google 的 Chubby 一个开源的实现,是 Hadoop 和 HBase 的重要组件。它是一个为分布式应用提供一致性服务的软件,提供的功能包括:配置维护、域名服务、分布式同步、组服务等。

    59 引用 • 29 回帖 • 9 关注

相关帖子

欢迎来到这里!

我们正在构建一个小众社区,大家在这里相互信任,以平等 • 自由 • 奔放的价值观进行分享交流。最终,希望大家能够找到与自己志同道合的伙伴,共同成长。

注册 关于
请输入回帖内容 ...