Spark Streaming 管理 Kafka 偏移量

本贴最后更新于 2395 天前,其中的信息可能已经时移世异

前言

为了让 Spark Streaming 消费 kafka 的数据不丢数据,可以创建 Kafka Direct DStream,由 Spark Streaming 自己管理 offset,并不是存到 zookeeper。启用 Spark Streaming 的 checkpoints 是存储偏移量的最简单方法,因为它可以在 Spark 的框架内轻松获得。 checkpoints 将应用程序的状态保存到 HDFS,以便在故障时可以恢复。如果发生故障,Spark Streaming 应用程序可以从 checkpoints 偏移范围读取消息。 但是,Spark Streaming checkpoints 在应用程序挂掉或者重启无法恢复,因此不是非常可靠,特别是如果您将此机制用于关键生产应用程序,另外,基于 zookeeper 的 offset 可视化工具将无法使用。我们不建议通过 Spark checkpoints 来管理偏移量。因此本文将手动存储 offset 到 zookeeper,完全自我掌控 offset。

从 ZK 获取 offset

  • 创建 ZKClient,API 有好几个,最后用带序列化参数的,不然保存 offset 的时候容易出现乱码。
  val zkClient = new ZkClient("192.168.1.225:2181", 60000, 60000, new ZkSerializer {
	  override def serialize(data: Object): Array[Byte] = {
		try {
		  return data.toString().getBytes("UTF-8")
		} catch {
		  case e: ZkMarshallingError => return null
		}
	  }
	  override def deserialize(bytes: Array[Byte]): Object = {
		try {
		  return new String(bytes, "UTF-8")
		} catch {
		  case e: ZkMarshallingError => return null
		}
	  }
	})

  • 查看该 groupId 在该 topic 下是否有消费记录,如果有,肯定在对应目录下会有分区数,children 大于 0 则有记录。
    val topicDirs = new ZKGroupTopicDirs(groupId, topic)
    val zkTopicPath = s"${topicDirs.consumerOffsetDir}"
    val topics = Set(topic)
    val children = zkClient.countChildren(s"${topicDirs.consumerOffsetDir}")

在有记录的情况下,去拿具体的 offset

 if (children > 0) {
      var fromOffsets: Map[TopicAndPartition, Long] = Map()
      //---get partition leader begin----
      val topicList = List(topic)
      val req = new TopicMetadataRequest(topicList, 0)
      //得到该topic的一些信息,比如broker,partition分布情况
      val getLeaderConsumer = new SimpleConsumer("192.168.1.225", 9092, 10000, 10000, "OffsetLookup")
      // brokerList的host 、brokerList的port、过期时间、过期时间
      val res = getLeaderConsumer.send(req)
      //TopicMetadataRequest   topic broker partition 的一些信息
      val topicMetaOption = res.topicsMetadata.headOption
      val partitions = topicMetaOption match {
        case Some(tm) =>
          tm.partitionsMetadata.map(pm => (pm.partitionId, pm.leader.get.host)).toMap[Int, String]
        case None =>
          Map[Int, String]()
      }
      for (i <- 0 until children) {
        val partitionOffset = zkClient.readData[String](s"${topicDirs.consumerOffsetDir}/${i}")
        val tp = TopicAndPartition(topic, i)
        //---additional begin-----
        val requestMin = OffsetRequest(Map(tp -> PartitionOffsetRequestInfo(OffsetRequest.EarliestTime, 1)))
        // -2,1
        val consumerMin = new SimpleConsumer(partitions(i), 9092, 10000, 10000, "getMinOffset")
        val curOffsets = consumerMin.getOffsetsBefore(requestMin).partitionErrorAndOffsets(tp).offsets
        var nextOffset = partitionOffset.toLong
        if (curOffsets.length > 0 && nextOffset < curOffsets.head) {
          //如果下一个offset小于当前的offset
          nextOffset = curOffsets.head
        }
        //---additional end-----
        fromOffsets += (tp -> nextOffset) //将不同 partition 对应的 offset 增加到 fromOffsets 中
      }//这个会将 kafka 的消息进行 transform,最终 kafak 的数据都会变成 (topic_name, message) 这样的 tuple
      val messageHandler = (mmd: MessageAndMetadata[String, String]) => (mmd.topic, mmd.message()) 
	   
  • 注意:在 zookeeper 里存储的 offset 有可能在 kafka 里过期了,所以要拿 kafka 最小的 offset 和 zookeeper 里的 offset 比较一下。

创建 DStream

  • 接下来就可以创建 Kafka Direct DStream 了,前者是从 zookeeper 拿的 offset,后者是直接从最新的开始(第一次消费)。
      kafkaStream = KafkaUtils.createDirectStream[String, String, StringDecoder, StringDecoder, (String, String)](ssc, kafkaParams, fromOffsets, messageHandler)
    } else {
      kafkaStream = KafkaUtils.createDirectStream[String, String, StringDecoder, StringDecoder](ssc, kafkaParams, topics)
    }
  • 最后就是处理 RDD,保存 Offset。
    kafkaStream.foreachRDD(rdd => {
      if (!rdd.isEmpty) {
        doSomething....
		   saveOffset(path,edd)
      }
    })
  private def saveOffset(path:String,rdd: RDD[(String, String)]) = {
    val offsetRanges = rdd.asInstanceOf[HasOffsetRanges].offsetRanges
    for (o <- offsetRanges) {
      ZkUtils.updatePersistentPath(zkClient, s"${path}/${o.partition}", String.valueOf(o.untilOffset))
    }
  }

  • Spark

    Spark 是 UC Berkeley AMP lab 所开源的类 Hadoop MapReduce 的通用并行框架。Spark 拥有 Hadoop MapReduce 所具有的优点;但不同于 MapReduce 的是 Job 中间输出结果可以保存在内存中,从而不再需要读写 HDFS,因此 Spark 能更好地适用于数据挖掘与机器学习等需要迭代的 MapReduce 的算法。

    74 引用 • 46 回帖 • 548 关注
  • Kafka

    Kafka 是一种高吞吐量的分布式发布订阅消息系统,它可以处理消费者规模的网站中的所有动作流数据。 这种动作(网页浏览,搜索和其他用户的行动)是现代系统中许多功能的基础。 这些数据通常是由于吞吐量的要求而通过处理日志和日志聚合来解决。

    35 引用 • 35 回帖

相关帖子

欢迎来到这里!

我们正在构建一个小众社区,大家在这里相互信任,以平等 • 自由 • 奔放的价值观进行分享交流。最终,希望大家能够找到与自己志同道合的伙伴,共同成长。

注册 关于
请输入回帖内容 ...