天天看點

kafka spark java_spark streaming中維護kafka偏移量到外部媒體

spark streaming中維護kafka偏移量到外部媒體

以kafka偏移量維護到redis為例。

redis存儲格式

使用的資料結構為string,其中key為topic:partition,value為offset。

例如bobo這個topic下有3個分區,則key-value結構如下:

bobo:0的偏移量為x

bobo:1的偏移量為y

bobo:2的偏移量為z

消費時指定offset

主要是如下兩個方法:

createKafkaStream()建立kakfa流

getOffsets()從redis中擷取offsets

private val kafkaParams = Map[String, Object](

"bootstrap.servers" -> "crpprdap25:6667,crpprdap26:6667,crpprdap27:6667",

"key.deserializer" -> classOf[StringDeserializer],

"value.deserializer" -> classOf[StringDeserializer],

"group.id" -> "use_a_separate_group_id_for_each_stream",

// 注意這裡是none。

"auto.offset.reset" -> "none",

"enable.auto.commit" -> (false: java.lang.Boolean)

)

// `bobo`topic下有3個分區

private val topicPartitions = Map[String, Int]("bobo" -> 3)

// 從redis中擷取offsets

def getOffsets: Map[TopicPartition, Long] = {

val jedis = InternalRedisClient.getResource

// 設定每個分區起始的offset

val offsets = mutable.Map[TopicPartition, Long]()

topicPartitions.foreach { it =>

val topic = it._1

val partitions = it._2

// 周遊分區,設定每個topic下對應partition的offset

for (partition

val topicPartitionKey = topic + ":" + partition

var lastOffset = 0L

val lastSavedOffset = jedis.get(topicPartitionKey)

if (null != lastSavedOffset) {

try {

lastOffset = lastSavedOffset.toLong

} catch {

case e: Exception =>

log.error("get lastSavedOffset error", e)

System.exit(1)

}

}

log.info("from redis topic: {}, partition: {}, lastOffset: {}", topic, partition, lastOffset)

// 添加

offsets += (new TopicPartition(topic, partition) -> lastOffset)

}

}

InternalRedisClient.returnResource(jedis)

offsets.toMap

}

def createKafkaStream(ssc: StreamingContext): InputDStream[ConsumerRecord[String, String]] = {

val offsets = getOffsets

// 建立kafka stream

val stream = KafkaUtils.createDirectStream[String, String](

ssc,

LocationStrategies.PreferConsistent,

ConsumerStrategies.Assign[String, String](offsets.keys.toList, kafkaParams, offsets)

)

stream

}

其中:核心是通過ConsumerStrategies.Assign方法來指定topic下對應partition的offset資訊。

更新offset到redis

最後将offset資訊維護到redis即可。

def consume(stream: InputDStream[ConsumerRecord[String, String]]): Unit = {

stream.foreachRDD { rdd =>

// 擷取offset資訊

val offsetRanges = rdd.asInstanceOf[HasOffsetRanges].offsetRanges

// 計算相關名額,這裡就統計下條數了

val total = rdd.count()

val jedis = InternalRedisClient.getResource

val pipeline = jedis.pipelined()

// 會阻塞redis

pipeline.multi()

// 更新相關名額

pipeline.incrBy("totalRecords", total)

// 更新offset

offsetRanges.foreach { offsetRange =>

log.info("save offsets, topic: {}, partition: {}, offset: {}", offsetRange.topic, offsetRange.partition, offsetRange.untilOffset)

val topicPartitionKey = offsetRange.topic + ":" + offsetRange.partition

pipeline.set(topicPartitionKey, offsetRange.untilOffset + "")

}

// 執行,釋放

pipeline.exec()

pipeline.sync()

pipeline.close()

InternalRedisClient.returnResource(jedis)

}

}

參考

spark代碼

順便貼一下自己整理的spark相關的代碼。

主要包括:

RDD的基本使用

SQL

jdbc(讀、寫)

hive(讀、寫、動态分區)

Streaming

消費kafka(手動送出、手動維護offset)

寫入HBase

寫入Hive