前言
为了让 Spark Streaming 消费 kafka 的数据不丢数据,可以创建 Kafka Direct DStream,由 Spark Streaming 自己管理 offset,并不是存到 zookeeper。启用 Spark Streaming 的 checkpoints 是存储偏移量的最简单方法,因为它可以在 Spark 的框架内轻松获得。 checkpoints 将应用程序的状态保存到 HDFS,以便在故障时可以恢复。如果发生故障,Spark Streaming 应用程序可以从 checkpoints 偏移范围读取消息。 但是,Spark Streaming checkpoints 在应用程序挂掉或者重启无法恢复,因此不是非常可靠,特别是如果您将此机制用于关键生产应用程序,另外,基于 zookeeper 的 offset 可视化工具将无法使用。我们不建议通过 Spark checkpoints 来管理偏移量。因此本文将手动存储 offset 到 zookeeper,完全自我掌控 offset。
从 ZK 获取 offset
- 创建 ZKClient,API 有好几个,最后用带序列化参数的,不然保存 offset 的时候容易出现乱码。
val zkClient = new ZkClient("192.168.1.225:2181", 60000, 60000, new ZkSerializer {
override def serialize(data: Object): Array[Byte] = {
try {
return data.toString().getBytes("UTF-8")
} catch {
case e: ZkMarshallingError => return null
}
}
override def deserialize(bytes: Array[Byte]): Object = {
try {
return new String(bytes, "UTF-8")
} catch {
case e: ZkMarshallingError => return null
}
}
})
- 查看该 groupId 在该 topic 下是否有消费记录,如果有,肯定在对应目录下会有分区数,children 大于 0 则有记录。
val topicDirs = new ZKGroupTopicDirs(groupId, topic)
val zkTopicPath = s"${topicDirs.consumerOffsetDir}"
val topics = Set(topic)
val children = zkClient.countChildren(s"${topicDirs.consumerOffsetDir}")
在有记录的情况下,去拿具体的 offset
if (children > 0) {
var fromOffsets: Map[TopicAndPartition, Long] = Map()
//---get partition leader begin----
val topicList = List(topic)
val req = new TopicMetadataRequest(topicList, 0)
//得到该topic的一些信息,比如broker,partition分布情况
val getLeaderConsumer = new SimpleConsumer("192.168.1.225", 9092, 10000, 10000, "OffsetLookup")
// brokerList的host 、brokerList的port、过期时间、过期时间
val res = getLeaderConsumer.send(req)
//TopicMetadataRequest topic broker partition 的一些信息
val topicMetaOption = res.topicsMetadata.headOption
val partitions = topicMetaOption match {
case Some(tm) =>
tm.partitionsMetadata.map(pm => (pm.partitionId, pm.leader.get.host)).toMap[Int, String]
case None =>
Map[Int, String]()
}
for (i <- 0 until children) {
val partitionOffset = zkClient.readData[String](s"${topicDirs.consumerOffsetDir}/${i}")
val tp = TopicAndPartition(topic, i)
//---additional begin-----
val requestMin = OffsetRequest(Map(tp -> PartitionOffsetRequestInfo(OffsetRequest.EarliestTime, 1)))
// -2,1
val consumerMin = new SimpleConsumer(partitions(i), 9092, 10000, 10000, "getMinOffset")
val curOffsets = consumerMin.getOffsetsBefore(requestMin).partitionErrorAndOffsets(tp).offsets
var nextOffset = partitionOffset.toLong
if (curOffsets.length > 0 && nextOffset < curOffsets.head) {
//如果下一个offset小于当前的offset
nextOffset = curOffsets.head
}
//---additional end-----
fromOffsets += (tp -> nextOffset) //将不同 partition 对应的 offset 增加到 fromOffsets 中
}//这个会将 kafka 的消息进行 transform,最终 kafak 的数据都会变成 (topic_name, message) 这样的 tuple
val messageHandler = (mmd: MessageAndMetadata[String, String]) => (mmd.topic, mmd.message())
- 注意:在 zookeeper 里存储的 offset 有可能在 kafka 里过期了,所以要拿 kafka 最小的 offset 和 zookeeper 里的 offset 比较一下。
创建 DStream
- 接下来就可以创建 Kafka Direct DStream 了,前者是从 zookeeper 拿的 offset,后者是直接从最新的开始(第一次消费)。
kafkaStream = KafkaUtils.createDirectStream[String, String, StringDecoder, StringDecoder, (String, String)](ssc, kafkaParams, fromOffsets, messageHandler)
} else {
kafkaStream = KafkaUtils.createDirectStream[String, String, StringDecoder, StringDecoder](ssc, kafkaParams, topics)
}
- 最后就是处理 RDD,保存 Offset。
kafkaStream.foreachRDD(rdd => {
if (!rdd.isEmpty) {
doSomething....
saveOffset(path,edd)
}
})
private def saveOffset(path:String,rdd: RDD[(String, String)]) = {
val offsetRanges = rdd.asInstanceOf[HasOffsetRanges].offsetRanges
for (o <- offsetRanges) {
ZkUtils.updatePersistentPath(zkClient, s"${path}/${o.partition}", String.valueOf(o.untilOffset))
}
}
欢迎来到这里!
我们正在构建一个小众社区,大家在这里相互信任,以平等 • 自由 • 奔放的价值观进行分享交流。最终,希望大家能够找到与自己志同道合的伙伴,共同成长。
注册 关于