大家好,又見面了,我是你們的朋友全棧君。
首先我們來對官網的描述了解一下。
DStream中的foreachRDD是一個非常強大函數,它允許你把資料發送給外部系統。因為輸出操作實際上是允許外部系統消費轉換後的資料,它們觸發的實際操作是DStream轉換。是以要掌握它,對它要有深入了解。下面有一些常用的錯誤需要了解。經常寫資料到外部系統需要建立一個連接配接的object(eg:根據TCP協定連接配接到遠端的伺服器,我們連接配接外部資料庫需要自己的句柄)和發送資料到遠端的系統為此,開發者需要在Spark的driver建立一個object用于連接配接。
為了達到這個目的,開發人員可能不經意的在Spark驅動中建立一個連接配接對象,但是在Spark worker中 嘗試調用這個連接配接對象儲存記錄到RDD中,如下:
dstream.foreachRDD {
rdd =>
val connection = createNewConnection() // executed at the driver
rdd.foreach {
record =>
connection.send(record) // executed at the worker
}
}
複制
這是不正确的,因為這需要先序列化連接配接對象,然後将它從driver發送到worker中。這樣的連接配接對象在機器之間不能
傳送。它可能表現為序列化錯誤(連接配接對象不可序列化)或者初始化錯誤(連接配接對象應該 在worker中初始化)等
等。正确的解決辦法是在worker中建立連接配接對象。
然而,這會造成另外一個常見的錯誤-為每一個記錄建立了一個連接配接對象。例如:
dstream.foreachRDD {
rdd =>
rdd.foreach {
record =>
val connection = createNewConnection()
connection.send(record)
connection.close()
}
}
複制
通常,建立一個連接配接對象有資源和時間的開支。是以,為每個記錄建立和銷毀連接配接對象會導緻非常高的開支,明顯
的減少系統的整體吞吐量。一個更好的解決辦法是利用rdd.foreachPartition方法。 為RDD的partition建立一個連接配接對
象,用這個兩件對象發送partition中的所有記錄。
dstream.foreachRDD {
rdd =>
rdd.foreachPartition {
partitionOfRecords =>
val connection = createNewConnection()
partitionOfRecords.foreach(record => connection.send(record))
connection.close()
}
}
複制
最後,可以通過在多個RDD或者批資料間重用連接配接對象做更進一步的優化。開發者可以保有一個靜态的連接配接對象
池,重複使用池中的對象将多批次的RDD推送到外部系統,以進一步節省開支
dstream.foreachRDD {
rdd =>
rdd.foreachPartition {
partitionOfRecords =>
// ConnectionPool is a static, lazily initialized pool of connections
val connection = ConnectionPool.getConnection()
partitionOfRecords.foreach(record => connection.send(record))
ConnectionPool.returnConnection(connection) // return to the pool for future reuse
}
}
複制
需要注意的是,池中的連接配接對象應該根據需要延遲建立,并且在空閑一段時間後自動逾時。這樣就擷取了最有效的
方式發生資料到外部系統。
其它需要注意的地方:
(1)輸出操作通過懶執行的方式操作DStreams,正如RDD action通過懶執行的方式操作RDD。具體地看,RDD
actions和DStreams輸出操作接收資料的處理。是以,如果你的應用程式沒有任何輸出操作或者 用于輸出操作
dstream.foreachRDD(),但是沒有任何RDD action操作在dstream.foreachRDD()裡面,那麼什麼也不會執行。系統
僅僅會接收輸入,然後丢棄它們。
(2)預設情況下,DStreams輸出操作是分時執行的,它們按照應用程式的定義順序按序執行。
實驗1:把SparkStreaming的内部資料存入Mysql
(1)在mysql中建立一個表用于存放資料
mysql> create database sparkStreaming;
Query OK, 1 row affected (0.01 sec)
mysql> use sparkStreaming;
Database changed
mysql> show tables;
Empty set (0.01 sec)
mysql> create table searchKeyWord(insert_time date,keyword varchar(30),search_count integer);
Query OK, 0 rows affected (0.05 sec)
複制
(2)用scala編寫連接配接Mysql的連接配接池
import java.sql.Connection
import java.sql.PreparedStatement
import java.sql.ResultSet
import org.apache.commons.dbcp.BasicDataSource
import org.apache.log4j.Logger
object scalaConnectPool {
val log = Logger.getLogger(scalaConnectPool.this.getClass)
var ds:BasicDataSource = null
def getDataSource={
if(ds == null){
ds = new BasicDataSource()
ds.setUsername("root")
ds.setPassword("iamhaoren")
ds.setUrl("jdbc:mysql://localhost:3306/sparkStreaming")
ds.setDriverClassName("com.mysql.jdbc.Driver")
ds.setInitialSize(20)
ds.setMaxActive(100)
ds.setMinIdle(50)
ds.setMaxIdle(100)
ds.setMaxWait(1000)
ds.setMinEvictableIdleTimeMillis(5*60*1000)
ds.setTimeBetweenEvictionRunsMillis(10*60*1000)
ds.setTestOnBorrow(true)
}
ds
}
def getConnection : Connection= {
var connect:Connection = null
try {
if(ds != null){
connect = ds.getConnection
}else{
connect = getDataSource.getConnection
}
}
connect
}
def shutDownDataSource: Unit=if (ds !=null){ds.close()}
def closeConnection(rs:ResultSet,ps:PreparedStatement,connect:Connection): Unit ={
if(rs != null){rs.close}
if(ps != null){ps.close}
if(connect != null){connect.close}
}
}
複制
(3)編寫SparkStreaming程式
import org.apache.spark.SparkConf
import org.apache.spark.streaming.{Seconds, StreamingContext}
object dataToMySQL {
def main(args: Array[String]) {
val conf = new SparkConf().setAppName("use the foreachRDD write data to mysql").setMaster("local[2]")
val ssc = new StreamingContext(conf,Seconds(10))
val streamData = ssc.socketTextStream("master",9999)
val wordCount = streamData.map(line =>(line.split(",")(0),1)).reduceByKeyAndWindow(_+_,Seconds(60))
val hottestWord = wordCount.transform(itemRDD => {
val top3 = itemRDD.map(pair => (pair._2, pair._1))
.sortByKey(false).map(pair => (pair._2, pair._1)).take(3)
ssc.sparkContext.makeRDD(top3)
})
hottestWord.foreachRDD( rdd =>{
rdd.foreachPartition(partitionOfRecords =>{
val connect = scalaConnectPool.getConnection
connect.setAutoCommit(false)
val stmt = connect.createStatement()
partitionOfRecords.foreach(record =>{
stmt.addBatch("insert into searchKeyWord (insert_time,keyword,search_count) values (now(),'"+record._1+"','"+record._2+"')")
})
stmt.executeBatch()
connect.commit()
}
)
}
)
ssc.start()
ssc.awaitTermination()
ssc.stop()
}
}
複制
(4)編寫一個socket端的資料模拟器
import java.io.{PrintWriter}
import java.net.ServerSocket
import scala.io.Source
object streamingSimulation {
def index(n: Int) = scala.util.Random.nextInt(n)
def main(args: Array[String]) {
// 調用該模拟器需要三個參數,分為為檔案路徑、端口号和間隔時間(機關:毫秒)
if (args.length != 3) {
System.err.println("Usage: <filename> <port> <millisecond>")
System.exit(1)
}
// 擷取指定檔案總的行數
val filename = args(0)
val lines = Source.fromFile(filename).getLines.toList
val filerow = lines.length
// 指定監聽某端口,當外部程式請求時建立連接配接
val listener = new ServerSocket(args(1).toInt)
while (true) {
val socket = listener.accept()
new Thread() {
override def run = {
println("Got client connected from: " + socket.getInetAddress)
val out = new PrintWriter(socket.getOutputStream(), true)
while (true) {
Thread.sleep(args(2).toLong)
// 當該端口接受請求時,随機擷取某行資料發送給對方
val content = lines(index(filerow))
println("-------------------------------------------")
println(s"Time: ${System.currentTimeMillis()}")
println("-------------------------------------------")
println(content)
out.write(content + '\n')
out.flush()
}
socket.close()
}
}.start()
}
}
}
複制
實驗資料為:
spark
Streaming
better
than
storm
you
need
it
yes
do
it
(5)實驗啟動
在用戶端啟動資料流模拟
對socket端的資料模拟器程式進行 jar檔案的打包,并放入到叢集目錄中
啟動程式如下:
java -cp DataSimulation.jar streamingSimulation /root/application/upload/Information 9999 1000
複制
啟動SparkStreaming程式
結果如下:
釋出者:全棧程式員棧長,轉載請注明出處:https://javaforall.cn/148544.html原文連結:https://javaforall.cn