天天看點

sparkstreaming用redis管理偏移量

sparkstreaming用redis管理偏移量

  • RedisUtils.scala
import java.io.FileInputStream
import java.util.Properties

import redis.clients.jedis.{Jedis, JedisPool, JedisPoolConfig}

object RedisUtils {
  private val properties = new Properties()
  // 擷取目前目錄下
  val path: String = Thread.currentThread().getContextClassLoader.getResource("jedis.properties").getPath
  properties.load(new FileInputStream(path))
  val host: String = properties.getProperty("redis.host")
  val auth: String = properties.getProperty("redis.auth")
  val port: Int = properties.getProperty("redis.port").toInt
  val config = new JedisPoolConfig
  config.setMaxTotal(properties.getProperty("redis.maxConn").toInt)
  config.setMaxIdle(properties.getProperty("redis.maxIdle").toInt)
//  val pool: JedisPool = new JedisPool(config, host, port, 10000, auth)
  val pool: JedisPool = new JedisPool(config, host, port, 10000)
  def getConnections(): Jedis ={
    pool.getResource
  }
}
```
- OffsetKafkaRedis.scala
      

import java.util

import org.apache.kafka.clients.consumer.ConsumerRecord

import scala.collection.mutable

import org.apache.kafka.common.TopicPartition

import org.apache.kafka.common.serialization.StringDeserializer

import org.apache.spark.rdd.RDD

import org.apache.spark.streaming.dstream.InputDStream

import org.apache.spark.streaming.kafka010.{ConsumerStrategies, HasOffsetRanges, KafkaUtils, LocationStrategies, OffsetRange}

import org.apache.spark.streaming.{Seconds, StreamingContext}

import org.apache.spark.{SparkConf, SparkContext}

import redis.clients.jedis.{Jedis, Pipeline}

import org.apache.log4j.Logger

import scala.util.Try

import scala.collection.JavaConverters._

object OffsetKafaRedis {

private val logger: Logger = Logger.getLogger(this.getClass)

def getOffset(topics:Array[String], groupId:String): mutable.Map[TopicPartition, Long] = {

val fromOffset = scala.collection.mutable.Map​​TopicPartition, Long​​

val jedis: Jedis = RedisUtils.getConnections()

topics.foreach(topic => {

val keys: util.Set[String] = jedis.keys(s"kafka_offset:\({groupId}:\){topic}????")

if (!keys.isEmpty) {

keys.asScala.foreach(key => {

val offset: String = jedis.get(key)

val partition:String = Try(key.split(s"kafka_offset:\({groupId}:\){topic}????.apply(1)).getOrElse("0")

println(s"[INFO] 目前主題:\({topic}, 目前分區:\){partition}, 目前偏移量:\({offset}")

fromOffset.put(new TopicPartition(topic, partition.toInt), offset.toLong)

})

}

})

jedis.close()

fromOffset

}

def main(args: Array[String]): Unit = {

val conf: SparkConf = new SparkConf().setAppName("redisOffsetDemo").setMaster("local[2]")

val context = new SparkContext(conf)

context.setLogLevel("WARN")

val ssc: StreamingContext = new StreamingContext(context, Seconds(10))

val topics = Array("offsetDemo")

val groupId = "g1"

val kafkaParams = Map[String, Object](

"bootstrap.servers" -> "linux01:9092,linux02:9092,linux03:9092",

"key.deserializer" -> classOf[StringDeserializer],

"value.deserializer" -> classOf[StringDeserializer],

"group.id" -> groupId,

"auto.offset.reset" -> "latest",

"enable.auto.commit" -> (false: java.lang.Boolean)

)

val offsets: mutable.Map[TopicPartition, Long] = getOffset(topics, groupId)

val kafkaDStream: InputDStream[ConsumerRecord[String, String]] = KafkaUtils.createDirectStream[String, String](

ssc,

LocationStrategies.PreferConsistent,

ConsumerStrategies.Subscribe[String, String](topics, kafkaParams, offsets)

kafkaDStream.foreachRDD(

rdd => {

// 擷取offset

val offsetRanges: Array[OffsetRange] = rdd.asInstanceOf[HasOffsetRanges].offsetRanges

// 連接配接redis

val jedis: Jedis = RedisUtils.getConnections()

// 開啟事務

val pipeline: Pipeline = jedis.pipelined()

pipeline.multi()

// val transaction: Transaction = jedis.multi()

// 應用邏輯

try {

val result: RDD[(String, Int)] = rdd.map(_.value()).flatMap(_.split(" ")).map((_, 1)).reduceByKey(_ + _)

result.foreach(println)

offsetRanges.foreach(

iter => {

val key: String = s"kafka_offset:\){groupId}:\({iter.topic}:\){iter.partition}"

val value: Long = iter.untilOffset

println(s"[INOF]所屬鍵:\({key}更新偏移量:\){value}")

pipeline.set(key, value.toString)

}

)

// 送出事務

pipeline.exec()

// 關閉pipeline

pipeline.sync()

} catch {

case e:Exception => {

logger.error("[ERROR]",e)

pipeline.discard()

} finally {

pipeline.close()

jedis.close()

ssc.start()

ssc.awaitTermination()

​```

繼續閱讀