天天看點

Spark 踩坑記:資料庫(Hbase+Mysql)

在使用Spark Streaming的過程中對于計算産生結果的進行持久化時,我們往往需要操作資料庫,去統計或者改變一些值。

最近一個實時消費者處理任務,在使用spark streaming進行實時的資料流處理時,我需要将計算好的資料更新到hbase和mysql中,是以本文對spark操作hbase和mysql的内容進行總結,并且對自己踩到的一些坑進行記錄。

print:列印driver結點上每個Dstream中的前10個batch元素,常用于開發和調試

<code>saveAsTextFiles(prefix, [suffix])</code>:将目前Dstream儲存為檔案,每個interval batch的檔案名命名規則基于 prefix 和 suffix :<code>”prefix-TIME_IN_MS[.suffix]”</code>.

<code>saveAsObjectFiles(prefix, [suffix])</code>:将目前的Dstream内容作為Java可序列化對象的序列化檔案進行儲存,每個interval batch的檔案命名規則基于prefix和suffix:<code>: “prefix-TIME_IN_MS[.suffix]”</code>.

<code>saveAsHadoopFiles(prefix, [suffix])</code>:将Dstream以hadoop檔案的形式進行儲存,每個interval batch的檔案命名規則基于prefix和suffix:<code>: “prefix-TIME_IN_MS[.suffix]”</code>.

foreachRDD(func):最通用的輸出操作,可以對從資料流中産生的每一個RDD應用函數fun。通常fun會将每個RDD中的資料儲存到外部系統,如:将RDD儲存到檔案,或者通過網絡連接配接儲存到資料庫。值得注意的是:fun執行在跑應用的driver程序中,并且通常會包含RDD action以促使資料流RDD開始計算。

<code>dstream.foreachRDD</code>對于開發而言提供了很大的靈活性,但在使用時也要避免很多常見的坑。我們通常将資料儲存到外部系統中的流程是:建立遠端連接配接-&gt;通過連接配接傳輸資料到遠端系統-&gt;關閉連接配接。針對這個流程我們很直接的想到了下面的程式代碼:

在上一篇文章《spark踩坑記——初試》中,對spark的worker和driver進行了整理,我們知道在叢集模式下,上述代碼中的connection需要通過序列化對象的形式從driver發送到worker,但是connection是無法在機器之間傳遞的,即connection是無法序列化的,這樣可能會引起<code>Cserialization errors (connection object not serializable)</code>的錯誤。為了避免這種錯誤,我們将conenction在worker當中建立,代碼如下:

似乎這樣問題解決了?但是細想下,我們在每個rdd的每條記錄當中都進行了connection的建立和關閉,這會導緻不必要的高負荷并且降低整個系統的吞吐量。

是以一個更好的方式是使用<code>rdd.foreachPartition</code>即對于每一個rdd的partition建立唯一的連接配接(注:每個partition是内的rdd是運作在同一worker之上的),代碼如下:

這樣我們降低了頻繁建立連接配接的負載,通常我們在連接配接資料庫時會使用連接配接池,把連接配接池的概念引入,代碼優化如下:

通過持有一個靜态連接配接池對象,我們可以重複利用connection而進一步優化了連接配接建立的開銷,進而降低了負載。另外值得注意的是,同資料庫的連接配接池類似,我們這裡所說的連接配接池同樣應該是lazy的按需建立連接配接,并且及時的收回逾時的連接配接。

另外值得注意的是:

如果在spark streaming中使用了多次foreachRDD,它們之間是按照程式順序向下執行的

Dstream對于輸出操作的執行政策是lazy的,是以如果我們在foreachRDD中不添加任何RDD action,那麼系統僅僅會接收資料然後将資料丢棄。

上面我們闡述了将spark streaming的Dstream輸出到外部系統的基本設計模式,這裡我們闡述如何将Dstream輸出到Hbase叢集。

Scala連接配接Hbase是通過zookeeper擷取資訊,是以在配置時需要提供zookeeper的相關資訊,如下:

根據網上資料,Hbase的連接配接的特殊性我們并沒有使用連接配接池

我們以put操作為例,示範将上述設計模式應用到Hbase輸出操作當中:

關于Hbase的其他操作可以參考Spark 下操作 HBase(1.0.0 新 API)

重點記錄在連接配接Hbase過程中配置<code>HConstants.ZOOKEEPER_QUORUM</code>的問題:

由于Hbase的連接配接不能直接使用ip位址進行通路,往往需要配置hosts,例如我在上述代碼段中<code>127-0-0-1(任意)</code>,我們在hosts中需要配置

在單機情況下,我們隻需要配置一台zookeeper所在Hbase的hosts即可,但是當切換到Hbase叢集是遇到一個詭異的bug

問題描述:在foreachRDD中将Dstream儲存到Hbase時會卡住,并且沒有任何錯誤資訊爆出(沒錯!它就是卡住,沒反應)

問題分析:由于Hbase叢集有多台機器,而我們隻配置了一台Hbase機器的hosts,這樣導緻Spark叢集在通路Hbase時不斷的去尋找但卻找不到就卡在那裡

解決方式:對每個worker上的hosts配置了所有hbase的節點ip,問題解決

同通路Hbase類似,我們也需要有一個可序列化的類來建立Mysql連接配接,這裡我們利用了Mysql的C3P0連接配接池

我們利用c3p0建立Mysql連接配接池,然後通路的時候每次從連接配接池中取出連接配接用于資料傳輸。

同樣利用之前的foreachRDD設計模式,将Dstream輸出到mysql的代碼如下:

值得注意的是:

我們在送出Mysql的操作的時候,并不是每條記錄送出一次,而是采用了批量送出的形式,是以需要将<code>conn.setAutoCommit(false)</code>,這樣可以進一步提高mysql的效率。

如果我們更新Mysql中帶索引的字段時,會導緻更新速度較慢,這種情況應想辦法避免,如果不可避免,那就硬上吧(T^T)

提供一下Spark連接配接Mysql和Hbase所需要的jar包的maven配置:

參考文獻:

Spark Streaming Programming Guide

HBase介紹

Spark 下操作 HBase(1.0.0 新 API)

Spark開發快速入門

kafka-&gt;spark-&gt;streaming-&gt;mysql(scala)實時資料處理示例

Spark Streaming 中使用c3p0連接配接池操作mysql資料庫