这篇文章我们来看下 zk 底层数据存储的技术细节,在 ZooKeeper 中,数据存储分为内存数据存储和磁盘数据存储
1. 内存数据
ZooKeeper 的数据模型是一棵树,可以类比为一个内存数据库,存储了整棵树的内容,包括节点路径,节点数据及其 ACL 信息等,ZooKeeper 会定时将这个数据存储到磁盘上。接下来看一下几个关键的数据模型。
1.1. DataTree
ZooKeeper 内存数据存储的核心组件,代表了内存中一份完整的数据 。
DataTree 的数据结构如下:
1.2. DataNode
DataNode 是数据存储的最小单元,包含着对父节点的引用。还有其他几个属性
byte [] data : 保存节点的数据
Long acl: acl map长度
StatPersisted stat :节点状态
Set<String> children : 当前节点的子节点列表
还提供了几个操作接口:
添加子节点:往 set 集合中添加子节点信息
public synchronized boolean addChild(String child) {
if (children == null) {
// let's be conservative on the typical number of children
children = new HashSet<String>(8);
}
return children.add(child);
}
删除子节点:从 set 集合中移除子节点信息
public synchronized boolean removeChild(String child) {
if (children == null) {
return false;
}
return children.remove(child);
}
查询子节点:当前 node 的所有子节点
public synchronized Set<String> getChildren() {
if (children == null) {
return EMPTY_SET;
}
return Collections.unmodifiableSet(children);
}
1.3. nodes
DataTree 存储所有 ZooKeeper 节点的路径、数据内容及其 ACL 信息,底层的数据结构就是一个典型的 ConcurrentHashMap
结构
private final ConcurrentHashMap<String, DataNode> nodes =
new ConcurrentHashMap<String, DataNode>();
对于 ZooKeeper 数据的所有操作,底层都是对这个 map 结构的操作,nodes 以数据节点的 path 作为 key,value 则是节点的数据内容 DataNode。
对于所有的临时节点,为了方便实时访问跟清理,DataTree 单独将临时节点保存起来:
private final Map<Long, HashSet<String>> ephemerals =
new ConcurrentHashMap<Long, HashSet<String>>();
1.4. ZKDatabase
ZooKeeper 的内存数据库,负责管理 ZooKeeper 所有会话、datatree 存储和事务日志,zkdatabase 会定时向磁盘 dump 快照数据,同时在 ZooKeeper 服务器启动时,通过磁盘恢复内存数据。
写入事务的触发入口在 SyncRequestProcessor
中,前面讲过,zk 请求的处理是采用责任链的方式来处理,这就是其中一个链条。主要逻辑在 run()中处理,看一段处理过程的代码:
// track the number of records written to the log
if (zks.getZKDatabase().append(si)) {
logCount++;
if (logCount > (snapCount / 2 + randRoll)) {
randRoll = r.nextInt(snapCount/2);
// roll the log
zks.getZKDatabase().rollLog();
// take a snapshot
if (snapInProcess != null && snapInProcess.isAlive()) {
LOG.warn("Too busy to snap, skipping");
} else {
snapInProcess = new ZooKeeperThread("Snapshot Thread") {
public void run() {
try {
zks.takeSnapshot();
} catch(Exception e) {
LOG.warn("Unexpected exception", e);
}
}
};
snapInProcess.start();
}
logCount = 0;
}
}
通过 getZKDatabase().append(si)
来写入日志。
2. 事务日志
在了解事务写入过程之前,我们看看日志格式,事务日志是在 append(si)
方法里进行日志写入的。
如果 logStream==null
时,创建文件:logFileWrite = new File(logDir, Util.makeLogName(hdr.getZxid()));
,这里使用 Util.makeLogName
来生成文件内容。我们看下具体格式
2.3. 日志写入
日志写入的过程为:
-
确定是否有事务日志可写。
ZooKeeper 会首先判断 FileTxnLog 是否关联上一个可写的事务日志文件,如果没有,则使用与该事务操作关联的 ZXID 作为后置创建一个事务日志文件,同时构建文件头信息(包含魔数 magic,事务格式版本 version 和 dbid),并立即写入到事务日志文件中去。同时将该文件流放入一个集合 streamsToFlush
。
-
确定事务日志是否需要扩容
检测当前事务日志文件剩余空间不足 4KB 时,就会开始文件空间扩容
-
事务序列化
包括对事务体(Record)和事务头(TxnHeader)的序列化
-
生成 CheckSum
此步骤是为了保证日志文件的完整性数据的准确性。会根据序列化前的字节数组大小来计算 Checksum。
-
事务日志写入文件流
将序列化后的事务头,事务体及 Checksum 值写入到文件流中去
-
事务日志刷入磁盘
上一步骤已经把事务操作写入了文件流,但是由于缓存的原因,无法实时的写入磁盘文件,需要强制刷入磁盘。
没有事务日志文件可写时,创建一个事务日志文件。
if (logStream==null) {
if(LOG.isInfoEnabled()){
LOG.info("Creating new log file: " + Util.makeLogName(hdr.getZxid()));
}
logFileWrite = new File(logDir, Util.makeLogName(hdr.getZxid()));
}
生成 File 流
fos = new FileOutputStream(logFileWrite);
logStream=new BufferedOutputStream(fos);
oa = BinaryOutputArchive.getArchive(logStream);
构建文件头信息
FileHeader fhdr = new FileHeader(TXNLOG_MAGIC,VERSION, dbId);
文件头写入事务日志
logStream.flush();
filePadding.setCurrentSize(fos.getChannel().position());
streamsToFlush.add(fos);
判断是否需要扩容
public static long calculateFileSizeWithPadding(long position, long fileSize, long preAllocSize) {
// If preAllocSize is positive and we are within 4KB of the known end of the file calculate a new file size
if (preAllocSize > 0 && position + 4096 >= fileSize) {
// If we have written more than we have previously preallocated we need to make sure the new
// file size is larger than what we already have
if (position > fileSize) {
fileSize = position + preAllocSize;
fileSize -= fileSize % preAllocSize;
} else {
fileSize += preAllocSize;
}
}
return fileSize;
}
文件写入
long padFile(FileChannel fileChannel) throws IOException {
long newFileSize = calculateFileSizeWithPadding(fileChannel.position(), currentSize, preAllocSize);
if (currentSize != newFileSize) {
fileChannel.write((ByteBuffer) fill.position(0), newFileSize - fill.remaining());
currentSize = newFileSize;
}
return currentSize;
}
序列化事务头和事务体
public static byte[] marshallTxnEntry(TxnHeader hdr, Record txn)
throws IOException {
ByteArrayOutputStream baos = new ByteArrayOutputStream();
OutputArchive boa = BinaryOutputArchive.getArchive(baos);
hdr.serialize(boa, "hdr");
if (txn != null) {
txn.serialize(boa, "txn");
}
return baos.toByteArray();
}
生成 checkSum
Checksum crc = makeChecksumAlgorithm();
crc.update(buf, 0, buf.length);
事务日志写入流并且刷入磁盘文件
oa.writeLong(crc.getValue(), "txnEntryCRC");
Util.writeTxnBytes(oa, buf);
2.4. 日志截断
ZooKeeper 运行过程中,可能会出现这样的情况,非 Leader 上记录的事务 ID 大于 Leader 服务器,这是一个非法的运行状态,在 zk 中只要存在 leader,所有机器的数据必须和 leader 保持一致。
如果出现上述情况,Leader 就会发送 TRUNC 命令给这个机器,要求其进行日志截断。
这段逻辑在 LearnerHandler
run 方法 syncFollower
中处理:
boolean needSnap = syncFollower(peerLastZxid, learnerMaster);
中
实现逻辑比较复杂,就不贴代码,有兴趣可以自己看。
3. 数据快照
用来记录 ZooKeeper 服务器上某一个时刻的全量内存数据内容,并将其写入到指定的磁盘文件中。
数据快照也是使用磁盘目录进行存储,可以通过 dataDir 属性进行配置。命名方式跟实物文件命名类似:snapshot.${ZXID},ZXID 被转换为十六进制的数。
下面看一下生成数据快照的过程:
-
确定是否需要进行数据快照
每进行一次实物日志记录之后,ZooKeeper 都会检测当前是否需要进行数据快照。理论上进行 snapCount 次事务操作后就会开始数据快照。但是数据快照对机器性能有影响,为了避免集群中所有机器统一时刻都在进行数据快照,ZooKeeper 在具体实现过程中,采用了“过半随机”策略。
-
切换事务日志文件
是指当前的事务日志已经写满了,需要重新创建一个新的事务日志。
-
创建数据快照异步线程
为了不影响主流程,需要创建一个单独的异步线程来进行数据快照
-
获取全量数据和会话信息
-
生成快照文件名
-
数据序列化
logCount > (snapCount / 2 + randRoll)
当满足这个条件时,才进行数据快照。logCount 表示当前已经记录的事务日志数量。randRoll 为 1~snapCount / 2 之前的随机数。
重新写入一个事务日志文件之前,要清空流中的信息,进行覆盖操作
zks.getZKDatabase().rollLog();
数据快照生成过程,snapInProcess 正在运行时采用丢弃策略。否则生成一个异步线程进行数据快照
if (snapInProcess != null && snapInProcess.isAlive()) {
LOG.warn("Too busy to snap, skipping");
} else {
snapInProcess = new ZooKeeperThread("Snapshot Thread") {
public void run() {
try {
zks.takeSnapshot();
} catch(Exception e) {
LOG.warn("Unexpected exception", e);
}
}
};
snapInProcess.start();
}
保存快照数据的主要逻辑如下。
生成文件名,创建文件、数据写入、序列化等。
public void save(DataTree dataTree,
ConcurrentHashMap<Long, Integer> sessionsWithTimeouts,
boolean syncSnap)
throws IOException {
long lastZxid = dataTree.lastProcessedZxid;
File snapshotFile = new File(snapDir, Util.makeSnapshotName(lastZxid));
LOG.info("Snapshotting: 0x{} to {}", Long.toHexString(lastZxid),
snapshotFile);
try {
snapLog.serialize(dataTree, sessionsWithTimeouts, snapshotFile, syncSnap);
} catch (IOException e) {
throw e;
}
}
4. 初始化
初始化的过程在之前一篇文章 ZooKeeper 源码分析(二)—服务端启动之集群模式中简单提到过,这篇文章详细看下。
先借用一张图来展示数据初始化的流程。
4.1. 初始化 FileTxnSnapLog
项目启动时执行,在 runFromConfig
中
txnLog = new FileTxnSnapLog(config.dataLogDir, config.dataDir);
final ZooKeeperServer zkServer = new ZooKeeperServer(txnLog,
config.tickTime, config.minSessionTimeout, config.maxSessionTimeout,
config.listenBacklog, null);
4.2. 初始化 ZKDatabase
初始化 data 最终是要拿到最新的事务 ID,这里会判断如果已经初始化完成,则直接获取最新的事务 ID,否则走加载数据的过程获取最新事务 ID。
public void loadData() {
if(zkDb.isInitialized()){
setZxid(zkDb.getDataTreeLastProcessedZxid());
}
else {
setZxid(zkDb.loadDataBase());
}
}
加载数据时,会先初始化一个 DataTree,保存了 zk 上所有的节点信息。还会创建 sessionsWithTimeouts
用于保存所有客户端会话超时时间的记录器。
public long loadDataBase() throws IOException {
long startTime = Time.currentElapsedTime();
long zxid = snapLog.restore(dataTree, sessionsWithTimeouts, commitProposalPlaybackListener);
initialized = true;
long loadTime = Time.currentElapsedTime() - startTime;
ServerMetrics.DB_INIT_TIME.add(loadTime);
LOG.info("Snapshot loaded in " + loadTime + " ms");
return zxid;
}
4.3. 创建 PlayBackListener 监听器
long zxid = snapLog.restore(dataTree, sessionsWithTimeouts, commitProposalPlaybackListener);
传入了监听器信息,用来接收事务应用过程中的回调,通过这个监听器来进行数据订正。
4.4. 处理快照文件
先反序列化快照文件中数据,判断是不是存在文件。
long deserializeResult = snapLog.deserialize(dt, sessions);
在文件路径下查找是否已经存在文件,如果存在则删除文件重新创建,然后把 trustEmptyDB
设置为 true。
File initFile = new File(dataDir.getParent(), "initialize");
if (Files.deleteIfExists(initFile.toPath())) {
LOG.info("Initialize file found, an empty database will not block voting participation");
trustEmptyDB = true;
} else {
trustEmptyDB = autoCreateDB;
}
如果序列化结果 deserializeResult = -1 ,表示没有找到任何快照,需要初始化一个空的 database。
否则走 fastForwardFromEdits
方法进行快照数据的解析,快速恢复内存数据。
if (-1L == deserializeResult) {
/* this means that we couldn't find any snapshot, so we need to
* initialize an empty database (reported in ZOOKEEPER-2325) */
if (txnLog.getLastLoggedZxid() != -1) {
throw new IOException(
"No snapshot found, but there are log entries. " +
"Something is broken!");
}
if (trustEmptyDB) {
/* TODO: (br33d) we should either put a ConcurrentHashMap on restore()
* or use Map on save() */
save(dt, (ConcurrentHashMap<Long, Integer>)sessions, false);
/* return a zxid of 0, since we know the database is empty */
return 0L;
} else {
/* return a zxid of -1, since we are possibly missing data */
LOG.warn("Unexpected empty data tree, setting zxid to -1");
dt.lastProcessedZxid = -1L;
return -1L;
}
}
return fastForwardFromEdits(dt, sessions, listener);
4.5. 解析快照文件
先根据 dataTree 从快照文件中找事务 ID 为 lastProcessedZxid + 1
的数据,如果解析出来的 TxnHeader
为空,则返回 lastProcessedZxid
作为最新的事务 ID,数据恢复过程结束。
如果 TxnHeader
不为空且 hdr.getZxid() < highestZxid
,这是种异常情况,会打印异常日志
如果 TxnHeader
不为空且 hdr.getZxid() <= highestZxid
,则 highestZxid = hdr.getZxid();
然后调用 processTransaction
方法处理事务日志。同时调用 listener.onTxnLoaded(hdr, itr.getTxn());
进行已提交事务日志的保存。
public long fastForwardFromEdits(DataTree dt, Map<Long, Integer> sessions,
PlayBackListener listener) throws IOException {
TxnIterator itr = txnLog.read(dt.lastProcessedZxid+1);
long highestZxid = dt.lastProcessedZxid;
TxnHeader hdr;
try {
while (true) {
// iterator points to
// the first valid txn when initialized
hdr = itr.getHeader();
if (hdr == null) {
//empty logs 已经是最新的事务ID,数据恢复结果,
return dt.lastProcessedZxid;
}
// 异常情况
if (hdr.getZxid() < highestZxid && highestZxid != 0) {
LOG.error("{}(highestZxid) > {}(next log) for type {}",
highestZxid, hdr.getZxid(), hdr.getType());
} else {
// TxnHeader中的zxid作为最新的事务ID
highestZxid = hdr.getZxid();
}
try {
// 事务日志的处理过程
processTransaction(hdr,dt,sessions, itr.getTxn());
} catch(KeeperException.NoNodeException e) {
throw new IOException("Failed to process transaction type: " +
hdr.getType() + " error: " + e.getMessage(), e);
}
// 把当前快照数据当做已提交的事务日志保存
listener.onTxnLoaded(hdr, itr.getTxn());
if (!itr.next())
break;
}
} finally {
if (itr != null) {
itr.close();
}
}
return highestZxid;
}
4.6. 处理事务日志
处理事务的逻辑在 processTransaction()中,先根据 TxnHeader 的 type 来判断是哪种日志类型
- 如果是 createSession,则在 sessions 的 Map 中保存子节点的信息,然后进行日志处理
- 如果是 closeSession,则在 sessions 的 Map 中移除子节点的信息,然后进行日志处理
- 其他类型时,只进行日志处理
processTxn 处理过程是把快照数据解析成一个个 DataNode,然后保存到 dataTree 中。会有异步的流程持久化到磁盘。
public void processTransaction(TxnHeader hdr,DataTree dt,
Map<Long, Integer> sessions, Record txn)
throws KeeperException.NoNodeException {
ProcessTxnResult rc;
switch (hdr.getType()) {
case OpCode.createSession:
sessions.put(hdr.getClientId(),
((CreateSessionTxn) txn).getTimeOut());
// give dataTree a chance to sync its lastProcessedZxid
rc = dt.processTxn(hdr, txn);
break;
case OpCode.closeSession:
sessions.remove(hdr.getClientId());
rc = dt.processTxn(hdr, txn);
break;
default:
rc = dt.processTxn(hdr, txn);
}
}
5. 数据同步
前几篇文章提到过,整个集群完成 leader 选举之后,Learner 会向 Leader 服务器进行注册,当注册完成后,就进入数据同步环节。数据同步就是 Leader 服务器将那些没有在 Learner 服务器上提交过的事务请求同步给 Learner 服务器。大体过程如下:
主要逻辑在 LearnerHandler run()
方法中处理。
5.1. 获取 Leader 状态
在注册最后阶段,Learner 服务器会发送给 Leader 服务器一个 ACKEPOCH 数据包,leader 会从这个数据包中解析出该 Learner 的 currentEpoch 和 lastZxid。
long peerLastZxid;
StateSummary ss = null;
long zxid = qp.getZxid();
long newEpoch = learnerMaster.getEpochToPropose(this.getSid(), lastAcceptedEpoch);
long newLeaderZxid = ZxidUtils.makeZxid(newEpoch, 0);
peerLastZxid = ss.getLastZxid();
5.2. 数据同步初始化
在开始数据同步之前,Leader 服务器会进行数据同步初始化,首先从 ZooKeeper 的内存数据库中提取事务请求的对应的提议缓存队列:Proposals,同时完成对以下三个 ZXID 值的初始化。
- peerLastZxid:该 Learner 服务器最后处理的 ZXID
- minCommittedLog:Leader 服务器提议缓存队列 committedLog 的最小 ZXID
- maxCommittedLog:Leader 服务器提议缓存队列 committedLog 的最大 ZXID
ZooKeeper 集群数据同步通常分为四类,分别是直接差异化同步(DIFF 同步)、先回滚再差异化同步(TRUNC+DIFF 同步)、仅回滚同步(TRUNC 同步)和全量同步(SNAP 同步),在初始化阶段,服务器优先以全量同步方式来同步数据。
5.3. 全量同步(SNAP)
全量同步的场景:
- peerLastZxid < minCommittedLog 时进行全量同步。
- Leader 服务器上没有提议缓存队列,peerLastZxid 不等于 lastProcessZxid。
全量同步就是 leader 服务器将本机上全量的内存数据都同步给 learner,Leader 服务器首先向 Learner 发送一个 SNAP 指令,通知 Learner 即将进行全量数据同步。随后,Leader 从内存数据库中获取到全量的数据节点和会话超时时间记录器,序列化后传输给 Learner。Learner 收到全量数据后,反序列化后载入内存数据库。
else if (peerLastZxid < minCommittedLog && txnLogSyncEnabled) {
// Use txnlog and committedLog to sync
// Calculate sizeLimit that we allow to retrieve txnlog from disk
long sizeLimit = db.calculateTxnLogSizeLimit();
// This method can return empty iterator if the requested zxid
// is older than on-disk txnlog
Iterator<Proposal> txnLogItr = db.getProposalsFromTxnLog(
peerLastZxid, sizeLimit);
if (txnLogItr.hasNext()) {
LOG.info("Use txnlog and committedLog for peer sid: " + getSid());
currentZxid = queueCommittedProposals(txnLogItr, peerLastZxid,
minCommittedLog, maxCommittedLog);
LOG.debug("Queueing committedLog 0x" + Long.toHexString(currentZxid));
Iterator<Proposal> committedLogItr = db.getCommittedLog().iterator();
currentZxid = queueCommittedProposals(committedLogItr, currentZxid,
null, maxCommittedLog);
needSnap = false;
}
// closing the resources
if (txnLogItr instanceof TxnLogProposalIterator) {
TxnLogProposalIterator txnProposalItr = (TxnLogProposalIterator) txnLogItr;
txnProposalItr.close();
}
}
5.4. 仅同步回滚(TRUNC)
如果(peerLastZxid > maxCommittedLog 时仅同步回滚。
是先回滚再差异化同步的简化模式,Leader 会要求 Learner 回滚到 ZXID 值为 maxCommittedLog 对应的事务操作。
else if (peerLastZxid > maxCommittedLog && !isPeerNewEpochZxid) {
// Newer than committedLog, send trunc and done
LOG.debug("Sending TRUNC to follower zxidToSend=0x" +
Long.toHexString(maxCommittedLog) +
" for peer sid:" + getSid());
queueOpPacket(Leader.TRUNC, maxCommittedLog);
currentZxid = maxCommittedLog;
needOpPacket = false;
needSnap = false;
}
5.5. 先回滚再差异化同步(TRUNC+DIFF)
使用场景是:Leader 服务器已经将事务记录到了本地事务日志中,但是没有成功发起 Proposal 流程的时候挂掉。这时候 peerLastZxid 介于 minCommittedLog 和 maxCommittedLog 之间。这个特殊场景就使用先回滚再差异化同步。
5.6. 直接差异化同步(DIFF)
peerLastZxid 在 maxCommittedLog 和 minCommittedLog 之间时进行差异化同步。
Leader 会首先向这个 Learner 发送一个 DIFF 指令,通知 Learner 进入差异化数据同步阶段,Leader 服务器即将把一些 Proposal 同步给自己。后续... 省略
else if ((maxCommittedLog >= peerLastZxid) && (minCommittedLog <= peerLastZxid)) {
// Follower is within commitLog range
LOG.info("Using committedLog for peer sid: " + getSid());
Iterator<Proposal> itr = db.getCommittedLog().iterator();
currentZxid = queueCommittedProposals(itr, peerLastZxid,
null, maxCommittedLog);
needSnap = false;
}
终于完了,一些细节还需要仔细推敲。。。
欢迎来到这里!
我们正在构建一个小众社区,大家在这里相互信任,以平等 • 自由 • 奔放的价值观进行分享交流。最终,希望大家能够找到与自己志同道合的伙伴,共同成长。
注册 关于