天天看點

SparkStreaming之foreachRDD

大家好,又見面了,我是你們的朋友全棧君。

首先我們來對官網的描述了解一下。

DStream中的foreachRDD是一個非常強大函數,它允許你把資料發送給外部系統。因為輸出操作實際上是允許外部系統消費轉換後的資料,它們觸發的實際操作是DStream轉換。是以要掌握它,對它要有深入了解。下面有一些常用的錯誤需要了解。經常寫資料到外部系統需要建立一個連接配接的object(eg:根據TCP協定連接配接到遠端的伺服器,我們連接配接外部資料庫需要自己的句柄)和發送資料到遠端的系統為此,開發者需要在Spark的driver建立一個object用于連接配接。

為了達到這個目的,開發人員可能不經意的在Spark驅動中建立一個連接配接對象,但是在Spark worker中 嘗試調用這個連接配接對象儲存記錄到RDD中,如下:

dstream.foreachRDD { 
    rdd =>
  val connection = createNewConnection()  // executed at the driver
  rdd.foreach { 
    record =>
    connection.send(record) // executed at the worker
  }
}           

複制

這是不正确的,因為這需要先序列化連接配接對象,然後将它從driver發送到worker中。這樣的連接配接對象在機器之間不能

傳送。它可能表現為序列化錯誤(連接配接對象不可序列化)或者初始化錯誤(連接配接對象應該 在worker中初始化)等

等。正确的解決辦法是在worker中建立連接配接對象。

然而,這會造成另外一個常見的錯誤-為每一個記錄建立了一個連接配接對象。例如:

dstream.foreachRDD { 
    rdd =>
  rdd.foreach { 
    record =>
    val connection = createNewConnection()
    connection.send(record)
    connection.close()
  }
}           

複制

通常,建立一個連接配接對象有資源和時間的開支。是以,為每個記錄建立和銷毀連接配接對象會導緻非常高的開支,明顯

的減少系統的整體吞吐量。一個更好的解決辦法是利用rdd.foreachPartition方法。 為RDD的partition建立一個連接配接對

象,用這個兩件對象發送partition中的所有記錄。

dstream.foreachRDD { 
    rdd =>
  rdd.foreachPartition { 
    partitionOfRecords =>
    val connection = createNewConnection()
    partitionOfRecords.foreach(record => connection.send(record))
    connection.close()
  }
}           

複制

最後,可以通過在多個RDD或者批資料間重用連接配接對象做更進一步的優化。開發者可以保有一個靜态的連接配接對象

池,重複使用池中的對象将多批次的RDD推送到外部系統,以進一步節省開支

dstream.foreachRDD { 
    rdd =>
  rdd.foreachPartition { 
    partitionOfRecords =>
    // ConnectionPool is a static, lazily initialized pool of connections
    val connection = ConnectionPool.getConnection()
    partitionOfRecords.foreach(record => connection.send(record))
    ConnectionPool.returnConnection(connection)  // return to the pool for future reuse
  }
}           

複制

需要注意的是,池中的連接配接對象應該根據需要延遲建立,并且在空閑一段時間後自動逾時。這樣就擷取了最有效的

方式發生資料到外部系統。

其它需要注意的地方:

(1)輸出操作通過懶執行的方式操作DStreams,正如RDD action通過懶執行的方式操作RDD。具體地看,RDD

actions和DStreams輸出操作接收資料的處理。是以,如果你的應用程式沒有任何輸出操作或者 用于輸出操作

dstream.foreachRDD(),但是沒有任何RDD action操作在dstream.foreachRDD()裡面,那麼什麼也不會執行。系統

僅僅會接收輸入,然後丢棄它們。

(2)預設情況下,DStreams輸出操作是分時執行的,它們按照應用程式的定義順序按序執行。

實驗1:把SparkStreaming的内部資料存入Mysql

(1)在mysql中建立一個表用于存放資料

mysql> create database sparkStreaming;
Query OK, 1 row affected (0.01 sec)
 
mysql> use sparkStreaming;
Database changed
mysql> show tables;
Empty set (0.01 sec)
 
mysql> create table searchKeyWord(insert_time date,keyword varchar(30),search_count integer);
Query OK, 0 rows affected (0.05 sec)           

複制

(2)用scala編寫連接配接Mysql的連接配接池

import java.sql.Connection
import java.sql.PreparedStatement
import java.sql.ResultSet

import org.apache.commons.dbcp.BasicDataSource
import org.apache.log4j.Logger

object scalaConnectPool {
  val  log = Logger.getLogger(scalaConnectPool.this.getClass)
  var ds:BasicDataSource = null
  def getDataSource={
    if(ds == null){
      ds = new BasicDataSource()
      ds.setUsername("root")
      ds.setPassword("iamhaoren")
      ds.setUrl("jdbc:mysql://localhost:3306/sparkStreaming")
      ds.setDriverClassName("com.mysql.jdbc.Driver")
      ds.setInitialSize(20)
      ds.setMaxActive(100)
      ds.setMinIdle(50)
      ds.setMaxIdle(100)
      ds.setMaxWait(1000)
      ds.setMinEvictableIdleTimeMillis(5*60*1000)
      ds.setTimeBetweenEvictionRunsMillis(10*60*1000)
      ds.setTestOnBorrow(true)
    }
    ds
  }

  def getConnection : Connection= {
    var connect:Connection = null
    try {
      if(ds != null){
        connect = ds.getConnection
      }else{
        connect = getDataSource.getConnection
      }
    }
    connect
  }

  def shutDownDataSource: Unit=if (ds !=null){ds.close()}

  def closeConnection(rs:ResultSet,ps:PreparedStatement,connect:Connection): Unit ={
    if(rs != null){rs.close}
    if(ps != null){ps.close}
    if(connect != null){connect.close}
  }

}           

複制

(3)編寫SparkStreaming程式

import org.apache.spark.SparkConf
import org.apache.spark.streaming.{Seconds, StreamingContext}

object dataToMySQL {
  def main(args: Array[String]) {
    val conf = new SparkConf().setAppName("use the foreachRDD write data to mysql").setMaster("local[2]")
    val ssc = new StreamingContext(conf,Seconds(10))

    val streamData = ssc.socketTextStream("master",9999)
    val wordCount = streamData.map(line =>(line.split(",")(0),1)).reduceByKeyAndWindow(_+_,Seconds(60))
    val hottestWord = wordCount.transform(itemRDD => {
      val top3 = itemRDD.map(pair => (pair._2, pair._1))
        .sortByKey(false).map(pair => (pair._2, pair._1)).take(3)
      ssc.sparkContext.makeRDD(top3)
    })


    hottestWord.foreachRDD( rdd =>{
      rdd.foreachPartition(partitionOfRecords =>{
        val connect = scalaConnectPool.getConnection
        connect.setAutoCommit(false)
        val stmt = connect.createStatement()
        partitionOfRecords.foreach(record =>{
          stmt.addBatch("insert into searchKeyWord (insert_time,keyword,search_count) values (now(),'"+record._1+"','"+record._2+"')")
        })
        stmt.executeBatch()
        connect.commit()
      }
      )
    }
    )



    ssc.start()
    ssc.awaitTermination()
    ssc.stop()
  }
}           

複制

(4)編寫一個socket端的資料模拟器

import java.io.{PrintWriter}  
import java.net.ServerSocket  
import scala.io.Source  
  
  
object streamingSimulation {  
  def index(n: Int) = scala.util.Random.nextInt(n)  
  
  def main(args: Array[String]) {  
    // 調用該模拟器需要三個參數,分為為檔案路徑、端口号和間隔時間(機關:毫秒)  
    if (args.length != 3) {  
      System.err.println("Usage: <filename> <port> <millisecond>")  
      System.exit(1)  
    }  
  
    // 擷取指定檔案總的行數  
    val filename = args(0)  
    val lines = Source.fromFile(filename).getLines.toList  
    val filerow = lines.length  
  
    // 指定監聽某端口,當外部程式請求時建立連接配接  
    val listener = new ServerSocket(args(1).toInt)  
  
    while (true) {  
      val socket = listener.accept()  
      new Thread() {  
        override def run = {  
          println("Got client connected from: " + socket.getInetAddress)  
          val out = new PrintWriter(socket.getOutputStream(), true)  
          while (true) {  
            Thread.sleep(args(2).toLong)  
            // 當該端口接受請求時,随機擷取某行資料發送給對方  
            val content = lines(index(filerow))  
            println("-------------------------------------------")  
            println(s"Time: ${System.currentTimeMillis()}")  
            println("-------------------------------------------")  
            println(content)  
            out.write(content + '\n')  
            out.flush()  
          }  
          socket.close()  
        }  
      }.start()  
    }  
  }  
  
}             

複制

實驗資料為:

spark

Streaming

better

than

storm

you

need

it

yes

do

it

(5)實驗啟動

在用戶端啟動資料流模拟

對socket端的資料模拟器程式進行 jar檔案的打包,并放入到叢集目錄中

啟動程式如下:

java -cp DataSimulation.jar streamingSimulation /root/application/upload/Information 9999 1000           

複制

啟動SparkStreaming程式

結果如下:

SparkStreaming之foreachRDD

釋出者:全棧程式員棧長,轉載請注明出處:https://javaforall.cn/148544.html原文連結:https://javaforall.cn