sparkstreaming用redis管理偏移量
- RedisUtils.scala
import java.io.FileInputStream
import java.util.Properties
import redis.clients.jedis.{Jedis, JedisPool, JedisPoolConfig}
object RedisUtils {
private val properties = new Properties()
// 擷取目前目錄下
val path: String = Thread.currentThread().getContextClassLoader.getResource("jedis.properties").getPath
properties.load(new FileInputStream(path))
val host: String = properties.getProperty("redis.host")
val auth: String = properties.getProperty("redis.auth")
val port: Int = properties.getProperty("redis.port").toInt
val config = new JedisPoolConfig
config.setMaxTotal(properties.getProperty("redis.maxConn").toInt)
config.setMaxIdle(properties.getProperty("redis.maxIdle").toInt)
// val pool: JedisPool = new JedisPool(config, host, port, 10000, auth)
val pool: JedisPool = new JedisPool(config, host, port, 10000)
def getConnections(): Jedis ={
pool.getResource
}
}
```
- OffsetKafkaRedis.scala
import java.util
import org.apache.kafka.clients.consumer.ConsumerRecord
import scala.collection.mutable
import org.apache.kafka.common.TopicPartition
import org.apache.kafka.common.serialization.StringDeserializer
import org.apache.spark.rdd.RDD
import org.apache.spark.streaming.dstream.InputDStream
import org.apache.spark.streaming.kafka010.{ConsumerStrategies, HasOffsetRanges, KafkaUtils, LocationStrategies, OffsetRange}
import org.apache.spark.streaming.{Seconds, StreamingContext}
import org.apache.spark.{SparkConf, SparkContext}
import redis.clients.jedis.{Jedis, Pipeline}
import org.apache.log4j.Logger
import scala.util.Try
import scala.collection.JavaConverters._
object OffsetKafaRedis {
private val logger: Logger = Logger.getLogger(this.getClass)
def getOffset(topics:Array[String], groupId:String): mutable.Map[TopicPartition, Long] = {
val fromOffset = scala.collection.mutable.MapTopicPartition, Long
val jedis: Jedis = RedisUtils.getConnections()
topics.foreach(topic => {
val keys: util.Set[String] = jedis.keys(s"kafka_offset:\({groupId}:\){topic}????")
if (!keys.isEmpty) {
keys.asScala.foreach(key => {
val offset: String = jedis.get(key)
val partition:String = Try(key.split(s"kafka_offset:\({groupId}:\){topic}????.apply(1)).getOrElse("0")
println(s"[INFO] 目前主題:\({topic}, 目前分區:\){partition}, 目前偏移量:\({offset}")
fromOffset.put(new TopicPartition(topic, partition.toInt), offset.toLong)
})
}
})
jedis.close()
fromOffset
}
def main(args: Array[String]): Unit = {
val conf: SparkConf = new SparkConf().setAppName("redisOffsetDemo").setMaster("local[2]")
val context = new SparkContext(conf)
context.setLogLevel("WARN")
val ssc: StreamingContext = new StreamingContext(context, Seconds(10))
val topics = Array("offsetDemo")
val groupId = "g1"
val kafkaParams = Map[String, Object](
"bootstrap.servers" -> "linux01:9092,linux02:9092,linux03:9092",
"key.deserializer" -> classOf[StringDeserializer],
"value.deserializer" -> classOf[StringDeserializer],
"group.id" -> groupId,
"auto.offset.reset" -> "latest",
"enable.auto.commit" -> (false: java.lang.Boolean)
)
val offsets: mutable.Map[TopicPartition, Long] = getOffset(topics, groupId)
val kafkaDStream: InputDStream[ConsumerRecord[String, String]] = KafkaUtils.createDirectStream[String, String](
ssc,
LocationStrategies.PreferConsistent,
ConsumerStrategies.Subscribe[String, String](topics, kafkaParams, offsets)
kafkaDStream.foreachRDD(
rdd => {
// 擷取offset
val offsetRanges: Array[OffsetRange] = rdd.asInstanceOf[HasOffsetRanges].offsetRanges
// 連接配接redis
val jedis: Jedis = RedisUtils.getConnections()
// 開啟事務
val pipeline: Pipeline = jedis.pipelined()
pipeline.multi()
// val transaction: Transaction = jedis.multi()
// 應用邏輯
try {
val result: RDD[(String, Int)] = rdd.map(_.value()).flatMap(_.split(" ")).map((_, 1)).reduceByKey(_ + _)
result.foreach(println)
offsetRanges.foreach(
iter => {
val key: String = s"kafka_offset:\){groupId}:\({iter.topic}:\){iter.partition}"
val value: Long = iter.untilOffset
println(s"[INOF]所屬鍵:\({key}更新偏移量:\){value}")
pipeline.set(key, value.toString)
}
)
// 送出事務
pipeline.exec()
// 關閉pipeline
pipeline.sync()
} catch {
case e:Exception => {
logger.error("[ERROR]",e)
pipeline.discard()
} finally {
pipeline.close()
jedis.close()
ssc.start()
ssc.awaitTermination()
```