弗兰兹·卡夫卡,生活于奥匈帝国统治下的捷克德语小说家,本职为保险业职员。主要作品有小说《审判》、《城堡》、《变形记》等。
another
Kafka 是由 Apache 软件基金会开发的一个开源
流处理
平台,由 Scala 和 Java 编写。Kafka 是一种高吞吐量
的分布式``发布订阅``消息系统
,它可以处理消费者规模的网站中的所有动作流数据。
两个 kafka 有一个共同特别: 很会写
消息系统
低耦合、可靠投递、广播、流量控制、最终一致性等一系列功能,成为异步 RPC 的主要手段之一。
什么是消息系统
producer 发送消息给 broker,broker 持有数据,在合适的时机发送给 consumer,consumer 确认后,broker 删除消息数据。
优化点无外乎吞吐量、性能、可靠性、事务。
扩展概念
- 消息队列模型 sender -> queue -> receiver p2p
- 发布/订阅模型 publish -> topic -> subscribe
- push
- pull
什么时候使用消息系统
适合场景
- 业务解耦,领域更清晰。区分业务核心系统
- 最终一致性(反之,强一致性,需要接收方回调确认,同步 RPC 更合适)
- 广播,1 VS N,稳定上游服务
- 错峰流控,拉平峰值,避免木桶
- 日志同步
不适合场景
- 强事务保证
- 延迟敏感,实时响应
kafka 好在哪
吞吐量/延时
- 吞吐量: 每秒能够处理的消息 or 字节数。
- 延时: 客户端发送请求、服务端处理请求并发送相应给客户端。
延时越低,吞吐越高?
通常情况下,我们认为延时越低,单位时间可以处理的请求变多,所以吞吐量增加。但是两者并不是正相关关系。
e.g. kafka 处理一条消息需要花费 2ms,吞吐量为 1000/2=500。如果通过 batch,批量发送,每 8ms 发送一次 600 条,延时=2ms+8ms=10ms,600*(1000/10)=60000。
消息持久化
保存在硬盘,不会丢失,可以重放。and 性能很高!!! 后面聊原因。
负载均衡和故障转移
多副本、多分区,保障高可用。
伸缩性
自身无状态,方便扩展。
名词解析
- message -> 消息
- broker -> kafka 服务器
- topic -> 主题,逻辑概念,一类消息,一个消息内容体
- partition -> 分区,消息实际存储的物理位置。有序队列,维护 offset。
- 同一个 topic 可以在不同 broker 上维护不同的分区(负载均衡)
- 同一个 topic 可以在不同 broker 上维护同一个分区(冗余机制,故障转移)
- replica -> 副本(partition)。分为 leader replica 和 follower replica。和 Master-Slave 不同,follower 只从 leader 同步数据,不提供读写。只有在 leader 挂了之后,才会选举 follower 作为 leader 提供服务。kafka 保障同一个 partition 的 replica 在不同的 broker,否则无法提供故障转移。同一个 topic 可以有不同的 leader,同一个 topic+partition 只有一个 leader。
- ISR(is-sync replica) -> 同步副本集合。如果 follower 延迟过大,会被踢出集合,追赶上数据之后,重新向 leader 申请,加入 ISR 集合。并不是所有的 follower 都可以成为 leader,ISR 集合中的 follower 可以竞选 leader。通过 replica.lag.time.max.ms(默认 10s)设置 follower 同步时间,通过 RetchRequest(offset)同步 leader 信息。
- offset -> 位移、偏移量
- 上次提交的位移:group 确认的 offset
- 当前位置:读取后,未提交
- HW:ISR 确认已同步后,leader 增加 HW。
- LEO: leader 接收到的最新一条 producer 发送的数据
- consumer 只能消费到 HW,未同步给所有 ISR 成员的消息无法消费
- leader 保存 LEO、HW 和 remote LEO, min(LEO, remote LEO) 更新 HW
- follower 轮询 leader,purgatory 暂存请求,500ms
- 新版本 epoch 保存 leader 变更版本,维护 kv (epoch, offset)
-
producer -> 生产者
-
consumer -> 消费者
-
group -> 组。通过维护各 group 的 offset,每条消息只会被发送到同一个 group 下的一个 consumer,实现不同模型。
- 一个 group 有一或多个 consumer
- 一个消息可以发送给多个 group
-
controller -> 控制器。选举 broker 作为 controller,管理和协调 kafka 集群,和 zookeeper 通信。
-
coordinator -> 协调者。用于实现成员管理、消费分配方案制定(rebalance)以及提交位移等,每个 group 选举 consumer 作为协调者。
kafka 高性能的秘密
顺序写?
网上的教程经常看到介绍,写入耗时主要集中在磁头寻道和盘片旋转,而数据传输速度很快。kafka 采用了顺序写,所以效率高。不免有些疑问:
- 顺序写性能高,为什么还有随机写?
- 磁盘不会被占用,每次写入都需要寻道、旋转,那么顺序写的优势在哪?
原因
- 因为写入的是不同的文件,占用连续的 page。顺序写,不能修改。
- 增加前提:一次写入一个文件且文件足够大。
所以本质原因在于追加写,"每个 partition 是一个文件"。
读取时,识别顺序写,会进行预读。
PageCache
- Kafka 不会每次都写磁盘,而是写入分页存储 PageCache 就认为 producer 成功。
- 操作系统决定什么时候将 PageCache 写入磁盘(flush)。增加 flush 时间间隔,可以提升吞吐
- flush 时为顺序写入,不会有额外的性能损耗。
- 读取时,优先读取 PageCache。
PageCache 为缓存,数据会不会丢失?
因为是操作系统管理,所以 kafka 进程挂了,数据不会丢失。如果操作系统掉电。。。依靠副本
Zero Copy
java FileChannel.transferTO
linux sendfile
partition
- leader 针对 partition 而不是 broker
- partition 不是一个文件而是一个文件夹
- partition 是我们能操作的最小概念
如果一直追加会导致文件过大,不便于使用(读写)和维护(删除旧数据),kafka 为此采用了几种措施
- 区分 segment
- 增加索引,包括 index 和 timeindex
segment
segment=log+index+timeindex
- 命名规则为 segment 文件最后一条消息的 offset 值。
- log.segment.bytes 日志切割(默认 1G)
index 和 timeindex
- index,位移索引,间隔创建索引指向物理偏移地址。间隔通过 log.index.interval.bytes 设置,默认 4MB。
- timeindex,时间索引,为满足时序型统计需求。
def append(largestOffset: Long,
largestTimestamp: Long,
shallowOffsetOfMaxTimestamp: Long,
records: MemoryRecords): Unit = {
if (records.sizeInBytes > 0) {
trace(s"Inserting ${records.sizeInBytes} bytes at end offset $largestOffset at position ${log.sizeInBytes} " +
s"with largest timestamp $largestTimestamp at shallow offset $shallowOffsetOfMaxTimestamp")
val physicalPosition = log.sizeInBytes()
if (physicalPosition == 0)
rollingBasedTimestamp = Some(largestTimestamp)
ensureOffsetInRange(largestOffset)
// append the messages
val appendedBytes = log.append(records)
trace(s"Appended $appendedBytes to ${log.file} at end offset $largestOffset")
// Update the in memory max timestamp and corresponding offset.
if (largestTimestamp > maxTimestampSoFar) {
maxTimestampSoFar = largestTimestamp
offsetOfMaxTimestamp = shallowOffsetOfMaxTimestamp
}
// append an entry to the index (if needed)
if (bytesSinceLastIndexEntry > indexIntervalBytes) {
offsetIndex.append(largestOffset, physicalPosition)
timeIndex.maybeAppend(maxTimestampSoFar, offsetOfMaxTimestamp)
bytesSinceLastIndexEntry = 0
}
bytesSinceLastIndexEntry += records.sizeInBytes
}
}
索引文件预分配空间,切分时裁剪。
p.s. producer 发送消息时,可以指定时间戳。如果机器时区不同,或者 retry、网络延时等导致时间混乱,按照时间索引进行查询时,导致查询不到消息。?? 时间会在发送时获取本机时间
long timestamp = record.timestamp() == null ? time.milliseconds() : record.timestamp();
producer
producer 配置
通过配置文件了解细节
- bootstrap.servers 指定其中一个,会自动找到 leader,但是如果指定的机器挂了,无法切换
- acks 0, 1, all|-1。 0 表示无需确认,1 表示 leader 确认,-1 表示所有 ISR 确认。
- buffer.memory 缓存消息的缓冲区大小 32MB,过小会影响吞吐。写入速度超过发送速度,停止&等待 IO 发送,still 追不上会报错。
- compression.type 开启压缩,提升 IO 吞吐,增加 CPU 压力。需要看服务器是 IO 密集型 or 计算密集型。 属性 0: 无压缩,1: GZIP,2: Snappy,3: LZ4
- retries 重试,屏蔽网络抖动 or leader 选举 or NotController,导致消息重复发送。详细参见
RetriableException
- retry.backoff.ms 重试间隔
- batch.size 批量发送大小,默认 16KB,增加可提升吞吐
- linger.ms 发送时间,默认为 0,立即发送,不判断 batch.size 大小。
- max.request.size 消息大小,因为存在 header 等,实际大小大于消息本身
- request.timeout.ms 超时时间,默认 30s。broker 给 producer 反馈
- partitioner.class
- key.serializer & value.serializer
- interceptor.classes 自定义拦截器
自定义 serializer
public class FastJsonSerializer implements Serializer {
@Override
public void configure(Map configs, boolean isKey) {
}
@Override
public byte[] serialize(String topic, Object data) {
return JSONObject.toJSONBytes(data);
}
@Override
public void close() {
}
}
自定义 partitioner
public class AbsPartitioner implements Partitioner {
@Override
public int partition(String topic, Object key, byte[] keyBytes, Object value, byte[] valueBytes, Cluster cluster) {
if (Long.parseLong(String.valueOf(key)) > 0) {
return 0;
} else {
return 1;
}
}
@Override
public void close() {
}
@Override
public void configure(Map<String, ?> configs) {
}
}
自定义分区策略 机器人发送到同一个 partition,为了快速响应真实用户。如果只是为了均匀分布,不需要指定 key(和旧版本不同)。
如果未指定 key,会通过轮询,避免 skewed
public int partition(String topic, Object key, byte[] keyBytes, Object value, byte[] valueBytes, Cluster cluster) {
List<PartitionInfo> partitions = cluster.partitionsForTopic(topic);
int numPartitions = partitions.size();
if (keyBytes == null) {
int nextValue = nextValue(topic);
List<PartitionInfo> availablePartitions = cluster.availablePartitionsForTopic(topic);
if (availablePartitions.size() > 0) {
int part = Utils.toPositive(nextValue) % availablePartitions.size();
return availablePartitions.get(part).partition();
} else {
// no partitions are available, give a non-available partition
return Utils.toPositive(nextValue) % numPartitions;
}
} else {
// hash the keyBytes to choose a partition
return Utils.toPositive(Utils.murmur2(keyBytes)) % numPartitions;
}
}
producer 拦截器
public class ProducerLogInterceptor implements ProducerInterceptor<String, Object> {
@Override
public ProducerRecord<String, Object> onSend(ProducerRecord<String, Object> record) {
System.out.println("send topic: " + record.topic());
return record;
}
@Override
public void onAcknowledgement(RecordMetadata metadata, Exception exception) {
System.out.println("record metadata");
}
@Override
public void close() {
}
@Override
public void configure(Map<String, ?> configs) {
}
}
100% 送达配置 | 无消息丢失配置
通过配置替换 send().get()
- max.block.ms=999999
- acks=-1
- retries=999 不会重试非 RetriableException 异常
- max.in.flight.requests.per.connection=1 发送未响应请求的数量
- KafkaProducer.send(record,callback)
- clonse(0)
消息内容
CRC | 版本号 | 属性 | 时间戳 | key 长度 | key 内容 | value 长度 | value 内容 |
---|---|---|---|---|---|---|---|
4B | 1B | 1B | 8B | 4B | n | 4B | n |
Consumer
group 保存位移 offset,替换 zookeeper 保存(/consumers/groupid/offsets/topic/partition
节点)。checkpointing 定期从 consumer 到 broker 对 offset 进行持久化。(log.flush.offset.checkpoint.interval.ms 默认 60s)
offset 格式=map(groupId+topic+partition, offset)
为什么不用 zookeeper 保存?
- zookeeper 不擅长频繁写(强一致性)
为什么不用 broker 保存?
- 增加应答机制,确认消费成功,影响吞吐
- 保存多个 consumer 的 offset,数据结构复杂
/**
* Start the background threads to flush logs and do log cleanup
*/
def startup() {
/* Schedule the cleanup task to delete old logs */
if (scheduler != null) {
info("Starting log cleanup with a period of %d ms.".format(retentionCheckMs))
scheduler.schedule("kafka-log-retention",
cleanupLogs _,
delay = InitialTaskDelayMs,
period = retentionCheckMs,
TimeUnit.MILLISECONDS)
info("Starting log flusher with a default period of %d ms.".format(flushCheckMs))
scheduler.schedule("kafka-log-flusher",
flushDirtyLogs _,
delay = InitialTaskDelayMs,
period = flushCheckMs,
TimeUnit.MILLISECONDS)
scheduler.schedule("kafka-recovery-point-checkpoint",
checkpointLogRecoveryOffsets _,
delay = InitialTaskDelayMs,
period = flushRecoveryOffsetCheckpointMs,
TimeUnit.MILLISECONDS)
scheduler.schedule("kafka-log-start-offset-checkpoint",
checkpointLogStartOffsets _,
delay = InitialTaskDelayMs,
period = flushStartOffsetCheckpointMs,
TimeUnit.MILLISECONDS)
scheduler.schedule("kafka-delete-logs", // will be rescheduled after each delete logs with a dynamic period
deleteLogs _,
delay = InitialTaskDelayMs,
unit = TimeUnit.MILLISECONDS)
}
if (cleanerConfig.enableCleaner)
cleaner.startup()
}
Consumer 配置
session.timeout.ms
协调者(coordinator)检测失败的时间,踢出 consumer rebalanceheartbeat.interval.ms
如果需要 rebalance,会在心跳线程的 response 中 set rebalance_in_progress,心跳线程间隔。必须小于 session.timeout.msmax.poll.interval.ms
consumer 处理逻辑最大时间 & consumer 启动选举 coordinator 时间auto.offset.reset
earliest|lastest 更换 group 后,重新消费。 默认 lastestenable.auto.commit
false 手动提交位移auto.commit.interval.ms
自动提交位移时间间隔fetch.max.bytes
如果消息很大,需要手动设置 50 * 1024 * 1024max.poll.records
单次调用返回的消息数 500connections.max.idle.ms
默认 9 分钟,推荐-1。不关闭空闲连接,周期性请求处理时间增加。partition.assignment.strategy
partition 分配策略, 默认 RangeAssignor
partition 分配策略
每个 partition 分配给一个 consumer。
e.g. 如果一个 group 订阅一个 topic,一个 topic 有 100 个 partition,一个 group 有 5 个 consumer。则每个 consumer 消费 20 个 partition
partition 分配策略,继承 AbstractPartitionAssignor 自定义策略规则,加权重等。自带分配规则:
- range 分区顺序排列、分组、分配给 consumer
- round-robin 分区顺序排列, 轮询 consumer,读取分区
- sticky 基于历史分配方案,避免数据倾斜
public class RangeAssignor extends AbstractPartitionAssignor {
@Override
public String name() {
return "range";
}
private Map<String, List<String>> consumersPerTopic(Map<String, Subscription> consumerMetadata) {
Map<String, List<String>> res = new HashMap<>();
for (Map.Entry<String, Subscription> subscriptionEntry : consumerMetadata.entrySet()) {
String consumerId = subscriptionEntry.getKey();
for (String topic : subscriptionEntry.getValue().topics())
put(res, topic, consumerId);
}
return res;
}
@Override
public Map<String, List<TopicPartition>> assign(Map<String, Integer> partitionsPerTopic,
Map<String, Subscription> subscriptions) {
Map<String, List<String>> consumersPerTopic = consumersPerTopic(subscriptions);
Map<String, List<TopicPartition>> assignment = new HashMap<>();
for (String memberId : subscriptions.keySet())
assignment.put(memberId, new ArrayList<TopicPartition>());
for (Map.Entry<String, List<String>> topicEntry : consumersPerTopic.entrySet()) {
String topic = topicEntry.getKey();
List<String> consumersForTopic = topicEntry.getValue();
Integer numPartitionsForTopic = partitionsPerTopic.get(topic);
if (numPartitionsForTopic == null)
continue;
Collections.sort(consumersForTopic);
int numPartitionsPerConsumer = numPartitionsForTopic / consumersForTopic.size();
int consumersWithExtraPartition = numPartitionsForTopic % consumersForTopic.size();
List<TopicPartition> partitions = AbstractPartitionAssignor.partitions(topic, numPartitionsForTopic);
for (int i = 0, n = consumersForTopic.size(); i < n; i++) {
int start = numPartitionsPerConsumer * i + Math.min(i, consumersWithExtraPartition);
int length = numPartitionsPerConsumer + (i + 1 > consumersWithExtraPartition ? 0 : 1);
assignment.get(consumersForTopic.get(i)).addAll(partitions.subList(start, start + length));
}
}
return assignment;
}
}
误解
使用过程中对 kafka consumer 的一些误解
误解 1
poll(long timeout)
和max.poll.records
按照时间或者消息记录数,控制每次获取消息。
poll 表示轮询,使用 poll 而不是 pull,并不需要 wakeup。所以可以使用 poll(Long.MAX_VALUE)
,每次数据流准备好后,会返回并进行业务处理。
误解 2
"consumer 只能订阅一个 topic。"
consumer.subscribe(Pattern.compile("kafka.*"))
误解 3
"commitSync 同步提交,阻塞消费。commitAsync 异步提交,不阻塞消费。"
commitSync 和 commitAsync 都会阻塞 poll,因为在 poll 执行时轮询时会判断 commit 状态。commitAsync 不阻塞业务处理后续方法执行。
void invokeCompletedOffsetCommitCallbacks() {
while (true) {
OffsetCommitCompletion completion = completedOffsetCommits.poll();
if (completion == null)
break;
completion.invoke();
}
}
EOS(Exactly-once Semantics)
- at most once 最多一次,消息可能丢失,但不会被重复处理。获取消息后,先 commit,然后业务处理。
- at least once 最少一次 消息不会丢失,但可能被处理多次。获取消息后,先业务处理,然后 commit。
- exactly once 会被处理且只会被处理一次
消费指定 partition
自定义分配策略?不需要,可以通过 assign 指定 topic partition
consumer.assign(Collections.singletonList(new TopicPartition(TOPIC, partition)));
- assign + subscribe 冲突错误
java.lang.IllegalStateException: Subscription to topics, partitions and pattern are mutually exclusive
- assign + assign 后一个生效
- 2 个 consumer assign 同一个 partition 消费两次
- 一个 consumer assign 一个 consumer subscribe, rebalance 踢出 assign
控制提交分区 offset
Map<TopicPartition, OffsetAndMetadata> offsets 控制提交分区 offset,细粒度
consumer.commitSync(Collections.singletonMap(tp, offset));
rebalance
状态机
触发条件
- consumer 加入、退出、崩溃
- topic 发生变更,如正则匹配,增加 topic
- 分区发生变动
- 消费处理超时
rebalance 开销大,合理设置 request.timeout.ms、max.poll.records 和 max.poll.interval.ms 减少 rebalance 次数
rebalance generation 标识 rebalance,每次 +1, 延迟提交 offset 会被 group 拒绝 ILLEGAL_GENERATION
协议
- joinCroup 请求
- SyncGroup 请求,group leader 同步分配方案
- Heartbeat 请求 向 coordinator 汇报心跳
- LeaveGroup 请求
- DescribeGroup 查看组信息
确认 所在 broker
Math.abs(groupId.hashCode) % offsets.topic.num.partitions 确认分区,此分区所在的 leader broker
- 收集 join consumer,选取 leader,同步给 coordinator。 leader 负责分配
- 同步更新分配方案,发送 SyncGroup 请求给 coordinator,每个 consumer 都发送,coordinator 接受 leader 的方案,分配,返回 response
def handleJoinGroupRequest(request: RequestChannel.Request) {
val joinGroupRequest = request.body[JoinGroupRequest]
// the callback for sending a join-group response
def sendResponseCallback(joinResult: JoinGroupResult) {
val members = joinResult.members map { case (memberId, metadataArray) => (memberId, ByteBuffer.wrap(metadataArray)) }
def createResponse(requestThrottleMs: Int): AbstractResponse = {
val responseBody = new JoinGroupResponse(requestThrottleMs, joinResult.error, joinResult.generationId,
joinResult.subProtocol, joinResult.memberId, joinResult.leaderId, members.asJava)
trace("Sending join group response %s for correlation id %d to client %s."
.format(responseBody, request.header.correlationId, request.header.clientId))
responseBody
}
sendResponseMaybeThrottle(request, createResponse)
}
if (!authorize(request.session, Read, Resource(Group, joinGroupRequest.groupId(), LITERAL))) {
sendResponseMaybeThrottle(request, requestThrottleMs =>
new JoinGroupResponse(
requestThrottleMs,
Errors.GROUP_AUTHORIZATION_FAILED,
JoinGroupResponse.UNKNOWN_GENERATION_ID,
JoinGroupResponse.UNKNOWN_PROTOCOL,
JoinGroupResponse.UNKNOWN_MEMBER_ID, // memberId
JoinGroupResponse.UNKNOWN_MEMBER_ID, // leaderId
Collections.emptyMap())
)
} else {
// let the coordinator handle join-group
val protocols = joinGroupRequest.groupProtocols().asScala.map(protocol =>
(protocol.name, Utils.toArray(protocol.metadata))).toList
groupCoordinator.handleJoinGroup(
joinGroupRequest.groupId,
joinGroupRequest.memberId,
request.header.clientId,
request.session.clientAddress.toString,
joinGroupRequest.rebalanceTimeout,
joinGroupRequest.sessionTimeout,
joinGroupRequest.protocolType,
protocols,
sendResponseCallback)
}
}
多线程消费
自己实现缓存区,批量执行及确认 consumer.commitSync
- 多 consumer thread 效率高,单独 offset。 缺点:受限于 topic 分区数,broker 压力大,rebalance 可能性大
- 单 consumer 多 handler thread ,获取和处理节藕,伸缩性好。难于管理分区内消息顺序,位移提交困难,处理不当导致数据丢失。
其他
日志留存
- log.retemtopm.{hours|minutes|ms}
- log.retention.bytes 字节 默认-1
- 当前日志段不会清除
- 和日志最近修改时间比较、比较记录时间戳
暂停 consumer 消费
e.g. 消费逻辑为调用三方接口,如果三方接口不稳定,需要关闭一段时间。
- 暂停
consumer.pause(consumer.assignment());
- 启动
consumer.resume(consumer.assignment());
compaction
- 订阅 binlog see canal
- 高可用日志化
manager 指定 lisnterners
因为 kafka 内部使用全称域名,不统一,导致无法获取元数据
生产环境
- 优雅的启动和关闭(Spring 生命周期)
- offset 跳过与重放
欢迎来到这里!
我们正在构建一个小众社区,大家在这里相互信任,以平等 • 自由 • 奔放的价值观进行分享交流。最终,希望大家能够找到与自己志同道合的伙伴,共同成长。
注册 关于