ZooKeeper 源码分析 (五) - 数据与存储

本贴最后更新于 2227 天前,其中的信息可能已经沧海桑田

这篇文章我们来看下 zk 底层数据存储的技术细节,在 ZooKeeper 中,数据存储分为内存数据存储磁盘数据存储

1. 内存数据

ZooKeeper 的数据模型是一棵树,可以类比为一个内存数据库,存储了整棵树的内容,包括节点路径,节点数据及其 ACL 信息等,ZooKeeper 会定时将这个数据存储到磁盘上。接下来看一下几个关键的数据模型。

1.1. DataTree

ZooKeeper 内存数据存储的核心组件,代表了内存中一份完整的数据 。

DataTree 的数据结构如下:
WX201904010745032x.png

1.2. DataNode

DataNode 是数据存储的最小单元,包含着对父节点的引用。还有其他几个属性

byte [] data : 保存节点的数据 Long acl: acl map长度 StatPersisted stat :节点状态 Set<String> children : 当前节点的子节点列表

还提供了几个操作接口:
添加子节点:往 set 集合中添加子节点信息

public synchronized boolean addChild(String child) { if (children == null) { // let's be conservative on the typical number of children children = new HashSet<String>(8); } return children.add(child); }

删除子节点:从 set 集合中移除子节点信息

public synchronized boolean removeChild(String child) { if (children == null) { return false; } return children.remove(child); }

查询子节点:当前 node 的所有子节点

public synchronized Set<String> getChildren() { if (children == null) { return EMPTY_SET; } return Collections.unmodifiableSet(children); }

1.3. nodes

DataTree 存储所有 ZooKeeper 节点的路径、数据内容及其 ACL 信息,底层的数据结构就是一个典型的 ConcurrentHashMap 结构

private final ConcurrentHashMap<String, DataNode> nodes = new ConcurrentHashMap<String, DataNode>();

对于 ZooKeeper 数据的所有操作,底层都是对这个 map 结构的操作,nodes 以数据节点的 path 作为 key,value 则是节点的数据内容 DataNode。

对于所有的临时节点,为了方便实时访问跟清理,DataTree 单独将临时节点保存起来:

private final Map<Long, HashSet<String>> ephemerals = new ConcurrentHashMap<Long, HashSet<String>>();

1.4. ZKDatabase

ZooKeeper 的内存数据库,负责管理 ZooKeeper 所有会话、datatree 存储和事务日志,zkdatabase 会定时向磁盘 dump 快照数据,同时在 ZooKeeper 服务器启动时,通过磁盘恢复内存数据。

写入事务的触发入口在 SyncRequestProcessor 中,前面讲过,zk 请求的处理是采用责任链的方式来处理,这就是其中一个链条。主要逻辑在 run()中处理,看一段处理过程的代码:

// track the number of records written to the log if (zks.getZKDatabase().append(si)) { logCount++; if (logCount > (snapCount / 2 + randRoll)) { randRoll = r.nextInt(snapCount/2); // roll the log zks.getZKDatabase().rollLog(); // take a snapshot if (snapInProcess != null && snapInProcess.isAlive()) { LOG.warn("Too busy to snap, skipping"); } else { snapInProcess = new ZooKeeperThread("Snapshot Thread") { public void run() { try { zks.takeSnapshot(); } catch(Exception e) { LOG.warn("Unexpected exception", e); } } }; snapInProcess.start(); } logCount = 0; } }

通过 getZKDatabase().append(si) 来写入日志。

2. 事务日志

在了解事务写入过程之前,我们看看日志格式,事务日志是在 append(si) 方法里进行日志写入的。

如果 logStream==null 时,创建文件:logFileWrite = new File(logDir, Util.makeLogName(hdr.getZxid()));,这里使用 Util.makeLogName 来生成文件内容。我们看下具体格式

// LOG_FILE_PREFIX常量为 `log` public static String makeLogName(long zxid) { return FileTxnLog.LOG_FILE_PREFIX + "." + Long.toHexString(zxid); }

后缀是根据事务 ID (zxid)生成一个 16 进制的数据。这样做的目的是为了方便的根据事务 ID 查询相关日志。另外 zxid 生成本身是有规律的(高 32 位代表当前 Leader 周期 epoch,低 32 位是操作序列好),因此将 zxid 作为文件后缀,我们清楚的看出当前运行时 ZooKeeper 的 leader 周期
生成的日志名称是类似于这样的:

log.2c01631713

2.3. 日志写入

日志写入的过程为:

  • 确定是否有事务日志可写。
    ZooKeeper 会首先判断 FileTxnLog 是否关联上一个可写的事务日志文件,如果没有,则使用与该事务操作关联的 ZXID 作为后置创建一个事务日志文件,同时构建文件头信息(包含魔数 magic,事务格式版本 version 和 dbid),并立即写入到事务日志文件中去。同时将该文件流放入一个集合 streamsToFlush

  • 确定事务日志是否需要扩容
    检测当前事务日志文件剩余空间不足 4KB 时,就会开始文件空间扩容

  • 事务序列化
    包括对事务体(Record)和事务头(TxnHeader)的序列化

  • 生成 CheckSum
    此步骤是为了保证日志文件的完整性数据的准确性。会根据序列化前的字节数组大小来计算 Checksum。

  • 事务日志写入文件流
    将序列化后的事务头,事务体及 Checksum 值写入到文件流中去

  • 事务日志刷入磁盘
    上一步骤已经把事务操作写入了文件流,但是由于缓存的原因,无法实时的写入磁盘文件,需要强制刷入磁盘。

没有事务日志文件可写时,创建一个事务日志文件。

if (logStream==null) { if(LOG.isInfoEnabled()){ LOG.info("Creating new log file: " + Util.makeLogName(hdr.getZxid())); } logFileWrite = new File(logDir, Util.makeLogName(hdr.getZxid())); }

生成 File 流

fos = new FileOutputStream(logFileWrite); logStream=new BufferedOutputStream(fos); oa = BinaryOutputArchive.getArchive(logStream);

构建文件头信息

FileHeader fhdr = new FileHeader(TXNLOG_MAGIC,VERSION, dbId);

文件头写入事务日志

logStream.flush(); filePadding.setCurrentSize(fos.getChannel().position()); streamsToFlush.add(fos);

判断是否需要扩容

public static long calculateFileSizeWithPadding(long position, long fileSize, long preAllocSize) { // If preAllocSize is positive and we are within 4KB of the known end of the file calculate a new file size if (preAllocSize > 0 && position + 4096 >= fileSize) { // If we have written more than we have previously preallocated we need to make sure the new // file size is larger than what we already have if (position > fileSize) { fileSize = position + preAllocSize; fileSize -= fileSize % preAllocSize; } else { fileSize += preAllocSize; } } return fileSize; }

文件写入

long padFile(FileChannel fileChannel) throws IOException { long newFileSize = calculateFileSizeWithPadding(fileChannel.position(), currentSize, preAllocSize); if (currentSize != newFileSize) { fileChannel.write((ByteBuffer) fill.position(0), newFileSize - fill.remaining()); currentSize = newFileSize; } return currentSize; }

序列化事务头和事务体

public static byte[] marshallTxnEntry(TxnHeader hdr, Record txn) throws IOException { ByteArrayOutputStream baos = new ByteArrayOutputStream(); OutputArchive boa = BinaryOutputArchive.getArchive(baos); hdr.serialize(boa, "hdr"); if (txn != null) { txn.serialize(boa, "txn"); } return baos.toByteArray(); }

生成 checkSum

Checksum crc = makeChecksumAlgorithm(); crc.update(buf, 0, buf.length);

事务日志写入流并且刷入磁盘文件

oa.writeLong(crc.getValue(), "txnEntryCRC"); Util.writeTxnBytes(oa, buf);

2.4. 日志截断

ZooKeeper 运行过程中,可能会出现这样的情况,非 Leader 上记录的事务 ID 大于 Leader 服务器,这是一个非法的运行状态,在 zk 中只要存在 leader,所有机器的数据必须和 leader 保持一致。

如果出现上述情况,Leader 就会发送 TRUNC 命令给这个机器,要求其进行日志截断。
这段逻辑在 LearnerHandler run 方法 syncFollower 中处理:

boolean needSnap = syncFollower(peerLastZxid, learnerMaster);
实现逻辑比较复杂,就不贴代码,有兴趣可以自己看。

3. 数据快照

用来记录 ZooKeeper 服务器上某一个时刻的全量内存数据内容,并将其写入到指定的磁盘文件中。
数据快照也是使用磁盘目录进行存储,可以通过 dataDir 属性进行配置。命名方式跟实物文件命名类似:snapshot.${ZXID},ZXID 被转换为十六进制的数。
下面看一下生成数据快照的过程:

  • 确定是否需要进行数据快照
    每进行一次实物日志记录之后,ZooKeeper 都会检测当前是否需要进行数据快照。理论上进行 snapCount 次事务操作后就会开始数据快照。但是数据快照对机器性能有影响,为了避免集群中所有机器统一时刻都在进行数据快照,ZooKeeper 在具体实现过程中,采用了“过半随机”策略。

  • 切换事务日志文件
    是指当前的事务日志已经写满了,需要重新创建一个新的事务日志。

  • 创建数据快照异步线程
    为了不影响主流程,需要创建一个单独的异步线程来进行数据快照

  • 获取全量数据和会话信息

  • 生成快照文件名

  • 数据序列化

logCount > (snapCount / 2 + randRoll) 当满足这个条件时,才进行数据快照。logCount 表示当前已经记录的事务日志数量。randRoll 为 1~snapCount / 2 之前的随机数。

重新写入一个事务日志文件之前,要清空流中的信息,进行覆盖操作
zks.getZKDatabase().rollLog();

数据快照生成过程,snapInProcess 正在运行时采用丢弃策略。否则生成一个异步线程进行数据快照

if (snapInProcess != null && snapInProcess.isAlive()) { LOG.warn("Too busy to snap, skipping"); } else { snapInProcess = new ZooKeeperThread("Snapshot Thread") { public void run() { try { zks.takeSnapshot(); } catch(Exception e) { LOG.warn("Unexpected exception", e); } } }; snapInProcess.start(); }

保存快照数据的主要逻辑如下。
生成文件名,创建文件、数据写入、序列化等。

public void save(DataTree dataTree, ConcurrentHashMap<Long, Integer> sessionsWithTimeouts, boolean syncSnap) throws IOException { long lastZxid = dataTree.lastProcessedZxid; File snapshotFile = new File(snapDir, Util.makeSnapshotName(lastZxid)); LOG.info("Snapshotting: 0x{} to {}", Long.toHexString(lastZxid), snapshotFile); try { snapLog.serialize(dataTree, sessionsWithTimeouts, snapshotFile, syncSnap); } catch (IOException e) { throw e; } }

4. 初始化

初始化的过程在之前一篇文章 ZooKeeper 源码分析(二)—服务端启动之集群模式中简单提到过,这篇文章详细看下。
先借用一张图来展示数据初始化的流程。

WX201904040851572x.png

4.1. 初始化 FileTxnSnapLog

项目启动时执行,在 runFromConfig

txnLog = new FileTxnSnapLog(config.dataLogDir, config.dataDir); final ZooKeeperServer zkServer = new ZooKeeperServer(txnLog, config.tickTime, config.minSessionTimeout, config.maxSessionTimeout, config.listenBacklog, null);

4.2. 初始化 ZKDatabase

初始化 data 最终是要拿到最新的事务 ID,这里会判断如果已经初始化完成,则直接获取最新的事务 ID,否则走加载数据的过程获取最新事务 ID。

public void loadData() { if(zkDb.isInitialized()){ setZxid(zkDb.getDataTreeLastProcessedZxid()); } else { setZxid(zkDb.loadDataBase()); } }

加载数据时,会先初始化一个 DataTree,保存了 zk 上所有的节点信息。还会创建 sessionsWithTimeouts 用于保存所有客户端会话超时时间的记录器。

public long loadDataBase() throws IOException { long startTime = Time.currentElapsedTime(); long zxid = snapLog.restore(dataTree, sessionsWithTimeouts, commitProposalPlaybackListener); initialized = true; long loadTime = Time.currentElapsedTime() - startTime; ServerMetrics.DB_INIT_TIME.add(loadTime); LOG.info("Snapshot loaded in " + loadTime + " ms"); return zxid; }

4.3. 创建 PlayBackListener 监听器

long zxid = snapLog.restore(dataTree, sessionsWithTimeouts, commitProposalPlaybackListener); 传入了监听器信息,用来接收事务应用过程中的回调,通过这个监听器来进行数据订正。

4.4. 处理快照文件

先反序列化快照文件中数据,判断是不是存在文件。
long deserializeResult = snapLog.deserialize(dt, sessions);

在文件路径下查找是否已经存在文件,如果存在则删除文件重新创建,然后把 trustEmptyDB 设置为 true。

File initFile = new File(dataDir.getParent(), "initialize"); if (Files.deleteIfExists(initFile.toPath())) { LOG.info("Initialize file found, an empty database will not block voting participation"); trustEmptyDB = true; } else { trustEmptyDB = autoCreateDB; }

如果序列化结果 deserializeResult = -1 ,表示没有找到任何快照,需要初始化一个空的 database。
否则走 fastForwardFromEdits 方法进行快照数据的解析,快速恢复内存数据。

if (-1L == deserializeResult) { /* this means that we couldn't find any snapshot, so we need to * initialize an empty database (reported in ZOOKEEPER-2325) */ if (txnLog.getLastLoggedZxid() != -1) { throw new IOException( "No snapshot found, but there are log entries. " + "Something is broken!"); } if (trustEmptyDB) { /* TODO: (br33d) we should either put a ConcurrentHashMap on restore() * or use Map on save() */ save(dt, (ConcurrentHashMap<Long, Integer>)sessions, false); /* return a zxid of 0, since we know the database is empty */ return 0L; } else { /* return a zxid of -1, since we are possibly missing data */ LOG.warn("Unexpected empty data tree, setting zxid to -1"); dt.lastProcessedZxid = -1L; return -1L; } } return fastForwardFromEdits(dt, sessions, listener);

4.5. 解析快照文件

先根据 dataTree 从快照文件中找事务 ID 为 lastProcessedZxid + 1 的数据,如果解析出来的 TxnHeader 为空,则返回 lastProcessedZxid 作为最新的事务 ID,数据恢复过程结束。
如果 TxnHeader 不为空且 hdr.getZxid() < highestZxid,这是种异常情况,会打印异常日志
如果 TxnHeader 不为空且 hdr.getZxid() <= highestZxid,则 highestZxid = hdr.getZxid();

然后调用 processTransaction 方法处理事务日志。同时调用 listener.onTxnLoaded(hdr, itr.getTxn()); 进行已提交事务日志的保存。

public long fastForwardFromEdits(DataTree dt, Map<Long, Integer> sessions, PlayBackListener listener) throws IOException { TxnIterator itr = txnLog.read(dt.lastProcessedZxid+1); long highestZxid = dt.lastProcessedZxid; TxnHeader hdr; try { while (true) { // iterator points to // the first valid txn when initialized hdr = itr.getHeader(); if (hdr == null) { //empty logs 已经是最新的事务ID,数据恢复结果, return dt.lastProcessedZxid; } // 异常情况 if (hdr.getZxid() < highestZxid && highestZxid != 0) { LOG.error("{}(highestZxid) > {}(next log) for type {}", highestZxid, hdr.getZxid(), hdr.getType()); } else { // TxnHeader中的zxid作为最新的事务ID highestZxid = hdr.getZxid(); } try { // 事务日志的处理过程 processTransaction(hdr,dt,sessions, itr.getTxn()); } catch(KeeperException.NoNodeException e) { throw new IOException("Failed to process transaction type: " + hdr.getType() + " error: " + e.getMessage(), e); } // 把当前快照数据当做已提交的事务日志保存 listener.onTxnLoaded(hdr, itr.getTxn()); if (!itr.next()) break; } } finally { if (itr != null) { itr.close(); } } return highestZxid; }

4.6. 处理事务日志

处理事务的逻辑在 processTransaction()中,先根据 TxnHeader 的 type 来判断是哪种日志类型

  • 如果是 createSession,则在 sessions 的 Map 中保存子节点的信息,然后进行日志处理
  • 如果是 closeSession,则在 sessions 的 Map 中移除子节点的信息,然后进行日志处理
  • 其他类型时,只进行日志处理

processTxn 处理过程是把快照数据解析成一个个 DataNode,然后保存到 dataTree 中。会有异步的流程持久化到磁盘。

public void processTransaction(TxnHeader hdr,DataTree dt, Map<Long, Integer> sessions, Record txn) throws KeeperException.NoNodeException { ProcessTxnResult rc; switch (hdr.getType()) { case OpCode.createSession: sessions.put(hdr.getClientId(), ((CreateSessionTxn) txn).getTimeOut()); // give dataTree a chance to sync its lastProcessedZxid rc = dt.processTxn(hdr, txn); break; case OpCode.closeSession: sessions.remove(hdr.getClientId()); rc = dt.processTxn(hdr, txn); break; default: rc = dt.processTxn(hdr, txn); } }

5. 数据同步

前几篇文章提到过,整个集群完成 leader 选举之后,Learner 会向 Leader 服务器进行注册,当注册完成后,就进入数据同步环节。数据同步就是 Leader 服务器将那些没有在 Learner 服务器上提交过的事务请求同步给 Learner 服务器。大体过程如下:

WX201904040841032x.png

主要逻辑在 LearnerHandler run() 方法中处理。

5.1. 获取 Leader 状态

在注册最后阶段,Learner 服务器会发送给 Leader 服务器一个 ACKEPOCH 数据包,leader 会从这个数据包中解析出该 Learner 的 currentEpoch 和 lastZxid。

long peerLastZxid; StateSummary ss = null; long zxid = qp.getZxid(); long newEpoch = learnerMaster.getEpochToPropose(this.getSid(), lastAcceptedEpoch); long newLeaderZxid = ZxidUtils.makeZxid(newEpoch, 0); peerLastZxid = ss.getLastZxid();

5.2. 数据同步初始化

在开始数据同步之前,Leader 服务器会进行数据同步初始化,首先从 ZooKeeper 的内存数据库中提取事务请求的对应的提议缓存队列:Proposals,同时完成对以下三个 ZXID 值的初始化。

  • peerLastZxid:该 Learner 服务器最后处理的 ZXID
  • minCommittedLog:Leader 服务器提议缓存队列 committedLog 的最小 ZXID
  • maxCommittedLog:Leader 服务器提议缓存队列 committedLog 的最大 ZXID

ZooKeeper 集群数据同步通常分为四类,分别是直接差异化同步(DIFF 同步)、先回滚再差异化同步(TRUNC+DIFF 同步)、仅回滚同步(TRUNC 同步)和全量同步(SNAP 同步),在初始化阶段,服务器优先以全量同步方式来同步数据。

5.3. 全量同步(SNAP)

全量同步的场景:

  • peerLastZxid < minCommittedLog 时进行全量同步。
  • Leader 服务器上没有提议缓存队列,peerLastZxid 不等于 lastProcessZxid。

全量同步就是 leader 服务器将本机上全量的内存数据都同步给 learner,Leader 服务器首先向 Learner 发送一个 SNAP 指令,通知 Learner 即将进行全量数据同步。随后,Leader 从内存数据库中获取到全量的数据节点和会话超时时间记录器,序列化后传输给 Learner。Learner 收到全量数据后,反序列化后载入内存数据库。

else if (peerLastZxid < minCommittedLog && txnLogSyncEnabled) { // Use txnlog and committedLog to sync // Calculate sizeLimit that we allow to retrieve txnlog from disk long sizeLimit = db.calculateTxnLogSizeLimit(); // This method can return empty iterator if the requested zxid // is older than on-disk txnlog Iterator<Proposal> txnLogItr = db.getProposalsFromTxnLog( peerLastZxid, sizeLimit); if (txnLogItr.hasNext()) { LOG.info("Use txnlog and committedLog for peer sid: " + getSid()); currentZxid = queueCommittedProposals(txnLogItr, peerLastZxid, minCommittedLog, maxCommittedLog); LOG.debug("Queueing committedLog 0x" + Long.toHexString(currentZxid)); Iterator<Proposal> committedLogItr = db.getCommittedLog().iterator(); currentZxid = queueCommittedProposals(committedLogItr, currentZxid, null, maxCommittedLog); needSnap = false; } // closing the resources if (txnLogItr instanceof TxnLogProposalIterator) { TxnLogProposalIterator txnProposalItr = (TxnLogProposalIterator) txnLogItr; txnProposalItr.close(); } }

5.4. 仅同步回滚(TRUNC)

如果(peerLastZxid > maxCommittedLog 时仅同步回滚。
是先回滚再差异化同步的简化模式,Leader 会要求 Learner 回滚到 ZXID 值为 maxCommittedLog 对应的事务操作。

else if (peerLastZxid > maxCommittedLog && !isPeerNewEpochZxid) { // Newer than committedLog, send trunc and done LOG.debug("Sending TRUNC to follower zxidToSend=0x" + Long.toHexString(maxCommittedLog) + " for peer sid:" + getSid()); queueOpPacket(Leader.TRUNC, maxCommittedLog); currentZxid = maxCommittedLog; needOpPacket = false; needSnap = false; }

5.5. 先回滚再差异化同步(TRUNC+DIFF)

使用场景是:Leader 服务器已经将事务记录到了本地事务日志中,但是没有成功发起 Proposal 流程的时候挂掉。这时候 peerLastZxid 介于 minCommittedLog 和 maxCommittedLog 之间。这个特殊场景就使用先回滚再差异化同步。

5.6. 直接差异化同步(DIFF)

peerLastZxid 在 maxCommittedLog 和 minCommittedLog 之间时进行差异化同步。
Leader 会首先向这个 Learner 发送一个 DIFF 指令,通知 Learner 进入差异化数据同步阶段,Leader 服务器即将把一些 Proposal 同步给自己。后续... 省略

else if ((maxCommittedLog >= peerLastZxid) && (minCommittedLog <= peerLastZxid)) { // Follower is within commitLog range LOG.info("Using committedLog for peer sid: " + getSid()); Iterator<Proposal> itr = db.getCommittedLog().iterator(); currentZxid = queueCommittedProposals(itr, peerLastZxid, null, maxCommittedLog); needSnap = false; }

终于完了,一些细节还需要仔细推敲。。。

  • ZooKeeper

    ZooKeeper 是一个分布式的,开放源码的分布式应用程序协调服务,是 Google 的 Chubby 一个开源的实现,是 Hadoop 和 HBase 的重要组件。它是一个为分布式应用提供一致性服务的软件,提供的功能包括:配置维护、域名服务、分布式同步、组服务等。

    59 引用 • 29 回帖

相关帖子

欢迎来到这里!

我们正在构建一个小众社区,大家在这里相互信任,以平等 • 自由 • 奔放的价值观进行分享交流。最终,希望大家能够找到与自己志同道合的伙伴,共同成长。

注册 关于
请输入回帖内容 ...