天天看点

spark 写入 es

spark 2.4

es 7.10.2

Scala2.11  

<dependency>

            <groupId>org.elasticsearch</groupId>

            <artifactId>elasticsearch-hadoop</artifactId>

            <version>7.10.2</version>

        </dependency>

        <dependency>

            <groupId>org.elasticsearch</groupId>

            <artifactId>elasticsearch-spark-20_2.11</artifactId>

            <version>7.10.0</version>

        </dependency>

        <dependency>

            <groupId>io.searchbox</groupId>

            <artifactId>jest</artifactId>

            <version>6.3.1</version>

        </dependency>

        <dependency>

            <groupId>org.projectlombok</groupId>

            <artifactId>lombok</artifactId>

            <version>1.16.10</version>

        </dependency>

        <dependency>

            <groupId>commons-logging</groupId>

            <artifactId>commons-logging</artifactId>

            <version>1.1.1</version>

        </dependency>

        <dependency>

            <groupId>commons-codec</groupId>

            <artifactId>commons-codec</artifactId>

            <version>1.4</version>

        </dependency>

        <dependency>

            <groupId>commons-httpclient</groupId>

            <artifactId>commons-httpclient</artifactId>

            <version>3.0.1</version>

        </dependency>

================

package com

import com.constant.PropConstants

import com.javaUtil.PropertieUtil

import io.searchbox.client.JestClient

import io.searchbox.core.{Bulk, BulkResult, Index}

import org.apache.hadoop.security.UserGroupInformation

import org.apache.log4j.Logger

import org.apache.spark.rdd.RDD

import org.apache.spark.sql.functions.lit

import org.apache.spark.sql.{DataFrame, Dataset, SaveMode, SparkSession}

import org.elasticsearch.spark.sql.EsSparkSQL

import java.sql.Timestamp

import java.text.SimpleDateFormat

import java.util

import java.util.{Date, Properties}

object center_materiel_hotspot2es {

  def main(args: Array[String]): Unit = {

    val log: Logger = Logger.getRootLogger

    //读取集群配置文件

    val prop: Properties = PropertieUtil.load("config.properties")

    //本地测试读文件

//    val prop: Properties = PropertieUtil.getProperties("/config.properties")

    //读hive 的Kerberos认证

    System.setProperty("java.security.krb5.conf", prop.getProperty(PropConstants.KRB5_CONF_PATH))

    System.setProperty("HADOOP_USER_NAME", prop.getProperty(PropConstants.HADOOP_USER_NAME))

    System.setProperty("user.name", prop.getProperty(PropConstants.USER_NAME))

    UserGroupInformation.loginUserFromKeytab(

      prop.getProperty(PropConstants.KEYTAB_NAME), prop.getProperty(PropConstants.KEYTAB_FILE_PATH)

    )

    val session: SparkSession = SparkSession.builder()

//      .master("local[9]")

//      .config("spark.testing.memory","4718592000")

//      .appName("spark to es")

//      .config("spark.yarn.am.waitTime", "1000")

      .config("spark.hadoop.hive.exec.dynamic.partition", "true")//开启动态分区

      .config("spark.hadoop.hive.exec.dynamic.partition.mode", "nonstrict")//开启动态分区

      .enableHiveSupport() //支持hive

      .getOrCreate()

    import session.implicits._

    //首先处理

    val dataFrame: DataFrame = session.sql(

      """

        |select id,create_by,create_at,update_by,update_at,position_top,position_left,materiel_group_child_id,

        |drawing_no,width,height,drawing_type from

        | dws.center_materiel_hotspot

        |""".stripMargin)

    val rdd = dataFrame.map(x => {

      val id: String = x.getAs(0)

      val create_by: String = x.getAs(1)

      val create_at: String = x.getAs(2).toString

      val update_by: String = x.getAs(3)

      val update_at: String = x.getAs(4).toString

      val position_top: String = if(x.getAs(5)!=null) x.getAs(5).toString else x.getAs(5)

      val position_left: String = if(x.getAs(6)!=null) x.getAs(6).toString else x.getAs(6)

      val materiel_group_child_id: String = x.getAs(7)

      val drawing_no: String = x.getAs(8)

      val width: String = if(x.getAs(9)!=null) x.getAs(9).toString else x.getAs(9)

      val height: String = if(x.getAs(10)!=null) x.getAs(10).toString else x.getAs(10)

      val drawing_type: String = x.getAs(11)

      val createTime: Long = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss").parse(create_at).getTime

      val updateTime: Long = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss").parse(update_at).getTime

      center_materiel_hotspot_local(id, create_by, createTime, update_by, updateTime, position_top,position_left,

        materiel_group_child_id,drawing_no,width,height,drawing_type)

    })

//    rdd.show(2)

    rdd.foreachPartition(x=>{

      val tuples: List[(String, center_materiel_hotspot_local)] = x.toList.map { data => (data.id, data) }

      //tuples 数据,参二:索引名称  ,参三:类型

      val dt: String = new SimpleDateFormat("yyyyMMdd").format(new Date())

      savaBulk(tuples,"center_materiel_hotspot_local","_doc")

    })

    session.stop()

  }

  case class center_materiel_hotspot_local(id:String, createBy:String, createTime:Long, updateBy:String,

                                           updateTime:Long,positionTop:String, positionLeft:String,

                                           materielGroupChildId:String,drawingNo:String, width:String,

                                           height:String, drawingType:String )

  def savaBulk(dataList:List[(String,AnyRef)],indexName:String,typeName:String):Unit={

    if (dataList!=null && dataList.nonEmpty) {

      val client: JestClient = jestClient

      val builder = new Bulk.Builder

      builder.defaultIndex(indexName).defaultType(typeName)

      for ((id, data) <- dataList) {

        val index: Index = new Index.Builder(data).id(id).build()

        builder.addAction(index)

      }

      val bulk: Bulk = builder.build()

      //获取执行的返回值

      val items: util.List[BulkResult#BulkResultItem] = client.execute(bulk).getItems

      println("以保存:" + items.size() + "条数据!")

      client.close()

    }

  }

  import io.searchbox.client.JestClient

  import io.searchbox.client.JestClientFactory

  import io.searchbox.client.config.HttpClientConfig

  def jestClient: JestClient = {

    val factory = new JestClientFactory

    factory.setHttpClientConfig(new HttpClientConfig.Builder(

      "http://es-cn-tl32b5q0t003a6n6g.public.elasticsearch.aliyuncs.com:9200")

      .multiThreaded(true)

      .defaultMaxTotalConnectionPerRoute(2)

      .maxTotalConnection(10)

      .build)

    factory.getObject

  }

}

继续阅读