天天看點

spark消費kafka的兩種方式

轉載:http://blog.csdn.net/wisgood/article/details/51815845

Spark-Streaming擷取kafka資料的兩種方式Receiver與Direct

一、基于Receiver的方式

這種方式使用Receiver來擷取資料。Receiver是使用Kafka的高層次Consumer API來實作的。receiver從Kafka中擷取的資料都是存儲在Spark Executor的記憶體中的(如果突然資料暴增,大量batch堆積,很容易出現記憶體溢出的問題),然後Spark Streaming啟動的job會去處理那些資料。

然而,在預設的配置下,這種方式可能會因為底層的失敗而丢失資料。如果要啟用高可靠機制,讓資料零丢失,就必須啟用Spark Streaming的預寫日志機制(Write Ahead Log,WAL)。該機制會同步地将接收到的Kafka資料寫入分布式檔案系統(比如HDFS)上的預寫日志中。是以,即使底層節點出現了失敗,也可以使用預寫日志中的資料進行恢複。

需要注意的要點

1、Kafka中的topic的partition,與Spark中的RDD的partition是沒有關系的。是以,在KafkaUtils.createStream()中,提高partition的數量,隻會增加一個Receiver中,讀取partition的線程的數量。不會增加Spark處理資料的并行度。

2、可以建立多個Kafka輸入DStream,使用不同的consumer group和topic,來通過多個receiver并行接收資料。

3、如果基于容錯的檔案系統,比如HDFS,啟用了預寫日志機制,接收到的資料都會被複制一份到預寫日志中。是以,在KafkaUtils.createStream()中,設定的持久化級别是StorageLevel.MEMORY_AND_DISK_SER。

二、基于Direct的方式

這種新的不基于Receiver的直接方式,是在Spark 1.3中引入的,進而能夠確定更加健壯的機制。替代掉使用Receiver來接收資料後,這種方式會周期性地查詢Kafka,來獲得每個topic+partition的最新的offset,進而定義每個batch的offset的範圍。當處理資料的job啟動時,就會使用Kafka的簡單consumer api來擷取Kafka指定offset範圍的資料。

這種方式有如下優點:

1、簡化并行讀取:如果要讀取多個partition,不需要建立多個輸入DStream然後對它們進行union操作。Spark會建立跟Kafka partition一樣多的RDD partition,并且會并行從Kafka中讀取資料。是以在Kafka partition和RDD partition之間,有一個一對一的映射關系。

2、高性能:如果要保證零資料丢失,在基于receiver的方式中,需要開啟WAL機制。這種方式其實效率低下,因為資料實際上被複制了兩份,Kafka自己本身就有高可靠的機制,會對資料複制一份,而這裡又會複制一份到WAL中。而基于direct的方式,不依賴Receiver,不需要開啟WAL機制,隻要Kafka中作了資料的複制,那麼就可以通過Kafka的副本進行恢複。

3、一次且僅一次的事務機制:

對比:

基于receiver的方式,是使用Kafka的高階API來在ZooKeeper中儲存消費過的offset的。這是消費Kafka資料的傳統方式。這種方式配合着WAL機制可以保證資料零丢失的高可靠性,但是卻無法保證資料被處理一次且僅一次,可能會處理兩次。因為Spark和ZooKeeper之間可能是不同步的。

基于direct的方式,使用kafka的簡單api,Spark Streaming自己就負責追蹤消費的offset,并儲存在checkpoint中。Spark自己一定是同步的,是以可以保證資料是消費一次且僅消費一次。

在實際生産環境中大都用Direct方式

maven依賴

<dependency>
            <groupId>org.apache.spark</groupId>
            <artifactId>spark-core_2.11</artifactId>
            <version>1.6.1</version>
        </dependency>

        <dependency>
            <groupId>org.apache.spark</groupId>
            <artifactId>spark-streaming_2.11</artifactId>
            <version>1.6.1</version>
        </dependency>
        <dependency>
            <groupId>org.apache.spark</groupId>
            <artifactId>spark-streaming-kafka_2.11</artifactId>
            <version>1.6.1</version>
        </dependency>
           

完整測試代碼

import kafka.serializer.StringDecoder
import org.apache.spark.SparkConf
import org.apache.spark.streaming.{Seconds, StreamingContext}
import org.apache.spark.streaming.kafka._
/**
  * Created by Administrator on 2017/6/16.
  */
object KafkaConsumer {

  val numThreads = 
  val topics = "mytest"
  val zkQuorum = "192.168.1.115:2181"
  val group = "consumer1"
  val brokers = "192.168.1.115:9092"



  def main(args: Array[String]): Unit = {
    createstream
  }

  /**
    *bin/kafka-console-producer.sh –broker-list localhost:9092 –topic mytest
    */
  def createstream()={
    val conf = new SparkConf().setAppName("kafka test").setMaster("local[2]")
    val ssc = new StreamingContext(conf,Seconds());

    val topicMap = topics.split(",").map((_, numThreads.toInt)).toMap
    val lines = KafkaUtils.createStream(ssc, zkQuorum, group, topicMap).map(_._2)
    val words = lines.flatMap(_.split(" ")).map(x=>(x,))
    words.reduceByKey(_ + _).print()
    ssc.start()
    ssc.awaitTermination()
  }


  def direct()={
    val conf = new SparkConf().setMaster("local[2]").setAppName("kafka test")
    val ssc = new StreamingContext(conf,Seconds())
    val topicsSet = topics.split(",").toSet
    val kafkaParams = Map[String, String]("metadata.broker.list" -> brokers)
    val messages = KafkaUtils.createDirectStream[String, String, StringDecoder, StringDecoder](
      ssc, kafkaParams, topicsSet)
    val lines = messages.map(_._2)
    val words = lines.flatMap(_.split(" ")).map(x=>(x,))

    words.reduceByKey(_ + _).print()
    ssc.start()
    ssc.awaitTermination()
  }
}
           

繼續閱讀