spark streaming中維護kafka偏移量到外部媒體
以kafka偏移量維護到redis為例。
redis存儲格式
使用的資料結構為string,其中key為topic:partition,value為offset。
例如bobo這個topic下有3個分區,則key-value結構如下:
bobo:0的偏移量為x
bobo:1的偏移量為y
bobo:2的偏移量為z
消費時指定offset
主要是如下兩個方法:
createKafkaStream()建立kakfa流
getOffsets()從redis中擷取offsets
private val kafkaParams = Map[String, Object](
"bootstrap.servers" -> "crpprdap25:6667,crpprdap26:6667,crpprdap27:6667",
"key.deserializer" -> classOf[StringDeserializer],
"value.deserializer" -> classOf[StringDeserializer],
"group.id" -> "use_a_separate_group_id_for_each_stream",
// 注意這裡是none。
"auto.offset.reset" -> "none",
"enable.auto.commit" -> (false: java.lang.Boolean)
)
// `bobo`topic下有3個分區
private val topicPartitions = Map[String, Int]("bobo" -> 3)
// 從redis中擷取offsets
def getOffsets: Map[TopicPartition, Long] = {
val jedis = InternalRedisClient.getResource
// 設定每個分區起始的offset
val offsets = mutable.Map[TopicPartition, Long]()
topicPartitions.foreach { it =>
val topic = it._1
val partitions = it._2
// 周遊分區,設定每個topic下對應partition的offset
for (partition
val topicPartitionKey = topic + ":" + partition
var lastOffset = 0L
val lastSavedOffset = jedis.get(topicPartitionKey)
if (null != lastSavedOffset) {
try {
lastOffset = lastSavedOffset.toLong
} catch {
case e: Exception =>
log.error("get lastSavedOffset error", e)
System.exit(1)
}
}
log.info("from redis topic: {}, partition: {}, lastOffset: {}", topic, partition, lastOffset)
// 添加
offsets += (new TopicPartition(topic, partition) -> lastOffset)
}
}
InternalRedisClient.returnResource(jedis)
offsets.toMap
}
def createKafkaStream(ssc: StreamingContext): InputDStream[ConsumerRecord[String, String]] = {
val offsets = getOffsets
// 建立kafka stream
val stream = KafkaUtils.createDirectStream[String, String](
ssc,
LocationStrategies.PreferConsistent,
ConsumerStrategies.Assign[String, String](offsets.keys.toList, kafkaParams, offsets)
)
stream
}
其中:核心是通過ConsumerStrategies.Assign方法來指定topic下對應partition的offset資訊。
更新offset到redis
最後将offset資訊維護到redis即可。
def consume(stream: InputDStream[ConsumerRecord[String, String]]): Unit = {
stream.foreachRDD { rdd =>
// 擷取offset資訊
val offsetRanges = rdd.asInstanceOf[HasOffsetRanges].offsetRanges
// 計算相關名額,這裡就統計下條數了
val total = rdd.count()
val jedis = InternalRedisClient.getResource
val pipeline = jedis.pipelined()
// 會阻塞redis
pipeline.multi()
// 更新相關名額
pipeline.incrBy("totalRecords", total)
// 更新offset
offsetRanges.foreach { offsetRange =>
log.info("save offsets, topic: {}, partition: {}, offset: {}", offsetRange.topic, offsetRange.partition, offsetRange.untilOffset)
val topicPartitionKey = offsetRange.topic + ":" + offsetRange.partition
pipeline.set(topicPartitionKey, offsetRange.untilOffset + "")
}
// 執行,釋放
pipeline.exec()
pipeline.sync()
pipeline.close()
InternalRedisClient.returnResource(jedis)
}
}
參考
spark代碼
順便貼一下自己整理的spark相關的代碼。
主要包括:
RDD的基本使用
SQL
jdbc(讀、寫)
hive(讀、寫、動态分區)
Streaming
消費kafka(手動送出、手動維護offset)
寫入HBase
寫入Hive