轉載:http://blog.csdn.net/wisgood/article/details/51815845
Spark-Streaming擷取kafka資料的兩種方式Receiver與Direct
一、基于Receiver的方式
這種方式使用Receiver來擷取資料。Receiver是使用Kafka的高層次Consumer API來實作的。receiver從Kafka中擷取的資料都是存儲在Spark Executor的記憶體中的(如果突然資料暴增,大量batch堆積,很容易出現記憶體溢出的問題),然後Spark Streaming啟動的job會去處理那些資料。
然而,在預設的配置下,這種方式可能會因為底層的失敗而丢失資料。如果要啟用高可靠機制,讓資料零丢失,就必須啟用Spark Streaming的預寫日志機制(Write Ahead Log,WAL)。該機制會同步地将接收到的Kafka資料寫入分布式檔案系統(比如HDFS)上的預寫日志中。是以,即使底層節點出現了失敗,也可以使用預寫日志中的資料進行恢複。
需要注意的要點
1、Kafka中的topic的partition,與Spark中的RDD的partition是沒有關系的。是以,在KafkaUtils.createStream()中,提高partition的數量,隻會增加一個Receiver中,讀取partition的線程的數量。不會增加Spark處理資料的并行度。
2、可以建立多個Kafka輸入DStream,使用不同的consumer group和topic,來通過多個receiver并行接收資料。
3、如果基于容錯的檔案系統,比如HDFS,啟用了預寫日志機制,接收到的資料都會被複制一份到預寫日志中。是以,在KafkaUtils.createStream()中,設定的持久化級别是StorageLevel.MEMORY_AND_DISK_SER。
二、基于Direct的方式
這種新的不基于Receiver的直接方式,是在Spark 1.3中引入的,進而能夠確定更加健壯的機制。替代掉使用Receiver來接收資料後,這種方式會周期性地查詢Kafka,來獲得每個topic+partition的最新的offset,進而定義每個batch的offset的範圍。當處理資料的job啟動時,就會使用Kafka的簡單consumer api來擷取Kafka指定offset範圍的資料。
這種方式有如下優點:
1、簡化并行讀取:如果要讀取多個partition,不需要建立多個輸入DStream然後對它們進行union操作。Spark會建立跟Kafka partition一樣多的RDD partition,并且會并行從Kafka中讀取資料。是以在Kafka partition和RDD partition之間,有一個一對一的映射關系。
2、高性能:如果要保證零資料丢失,在基于receiver的方式中,需要開啟WAL機制。這種方式其實效率低下,因為資料實際上被複制了兩份,Kafka自己本身就有高可靠的機制,會對資料複制一份,而這裡又會複制一份到WAL中。而基于direct的方式,不依賴Receiver,不需要開啟WAL機制,隻要Kafka中作了資料的複制,那麼就可以通過Kafka的副本進行恢複。
3、一次且僅一次的事務機制:
對比:
基于receiver的方式,是使用Kafka的高階API來在ZooKeeper中儲存消費過的offset的。這是消費Kafka資料的傳統方式。這種方式配合着WAL機制可以保證資料零丢失的高可靠性,但是卻無法保證資料被處理一次且僅一次,可能會處理兩次。因為Spark和ZooKeeper之間可能是不同步的。
基于direct的方式,使用kafka的簡單api,Spark Streaming自己就負責追蹤消費的offset,并儲存在checkpoint中。Spark自己一定是同步的,是以可以保證資料是消費一次且僅消費一次。
在實際生産環境中大都用Direct方式
maven依賴
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-core_2.11</artifactId>
<version>1.6.1</version>
</dependency>
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-streaming_2.11</artifactId>
<version>1.6.1</version>
</dependency>
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-streaming-kafka_2.11</artifactId>
<version>1.6.1</version>
</dependency>
完整測試代碼
import kafka.serializer.StringDecoder
import org.apache.spark.SparkConf
import org.apache.spark.streaming.{Seconds, StreamingContext}
import org.apache.spark.streaming.kafka._
/**
* Created by Administrator on 2017/6/16.
*/
object KafkaConsumer {
val numThreads =
val topics = "mytest"
val zkQuorum = "192.168.1.115:2181"
val group = "consumer1"
val brokers = "192.168.1.115:9092"
def main(args: Array[String]): Unit = {
createstream
}
/**
*bin/kafka-console-producer.sh –broker-list localhost:9092 –topic mytest
*/
def createstream()={
val conf = new SparkConf().setAppName("kafka test").setMaster("local[2]")
val ssc = new StreamingContext(conf,Seconds());
val topicMap = topics.split(",").map((_, numThreads.toInt)).toMap
val lines = KafkaUtils.createStream(ssc, zkQuorum, group, topicMap).map(_._2)
val words = lines.flatMap(_.split(" ")).map(x=>(x,))
words.reduceByKey(_ + _).print()
ssc.start()
ssc.awaitTermination()
}
def direct()={
val conf = new SparkConf().setMaster("local[2]").setAppName("kafka test")
val ssc = new StreamingContext(conf,Seconds())
val topicsSet = topics.split(",").toSet
val kafkaParams = Map[String, String]("metadata.broker.list" -> brokers)
val messages = KafkaUtils.createDirectStream[String, String, StringDecoder, StringDecoder](
ssc, kafkaParams, topicsSet)
val lines = messages.map(_._2)
val words = lines.flatMap(_.split(" ")).map(x=>(x,))
words.reduceByKey(_ + _).print()
ssc.start()
ssc.awaitTermination()
}
}