spark 2.4
es 7.10.2
Scala2.11
<dependency>
<groupId>org.elasticsearch</groupId>
<artifactId>elasticsearch-hadoop</artifactId>
<version>7.10.2</version>
</dependency>
<dependency>
<groupId>org.elasticsearch</groupId>
<artifactId>elasticsearch-spark-20_2.11</artifactId>
<version>7.10.0</version>
</dependency>
<dependency>
<groupId>io.searchbox</groupId>
<artifactId>jest</artifactId>
<version>6.3.1</version>
</dependency>
<dependency>
<groupId>org.projectlombok</groupId>
<artifactId>lombok</artifactId>
<version>1.16.10</version>
</dependency>
<dependency>
<groupId>commons-logging</groupId>
<artifactId>commons-logging</artifactId>
<version>1.1.1</version>
</dependency>
<dependency>
<groupId>commons-codec</groupId>
<artifactId>commons-codec</artifactId>
<version>1.4</version>
</dependency>
<dependency>
<groupId>commons-httpclient</groupId>
<artifactId>commons-httpclient</artifactId>
<version>3.0.1</version>
</dependency>
================
package com
import com.constant.PropConstants
import com.javaUtil.PropertieUtil
import io.searchbox.client.JestClient
import io.searchbox.core.{Bulk, BulkResult, Index}
import org.apache.hadoop.security.UserGroupInformation
import org.apache.log4j.Logger
import org.apache.spark.rdd.RDD
import org.apache.spark.sql.functions.lit
import org.apache.spark.sql.{DataFrame, Dataset, SaveMode, SparkSession}
import org.elasticsearch.spark.sql.EsSparkSQL
import java.sql.Timestamp
import java.text.SimpleDateFormat
import java.util
import java.util.{Date, Properties}
object center_materiel_hotspot2es {
def main(args: Array[String]): Unit = {
val log: Logger = Logger.getRootLogger
//读取集群配置文件
val prop: Properties = PropertieUtil.load("config.properties")
//本地测试读文件
// val prop: Properties = PropertieUtil.getProperties("/config.properties")
//读hive 的Kerberos认证
System.setProperty("java.security.krb5.conf", prop.getProperty(PropConstants.KRB5_CONF_PATH))
System.setProperty("HADOOP_USER_NAME", prop.getProperty(PropConstants.HADOOP_USER_NAME))
System.setProperty("user.name", prop.getProperty(PropConstants.USER_NAME))
UserGroupInformation.loginUserFromKeytab(
prop.getProperty(PropConstants.KEYTAB_NAME), prop.getProperty(PropConstants.KEYTAB_FILE_PATH)
)
val session: SparkSession = SparkSession.builder()
// .master("local[9]")
// .config("spark.testing.memory","4718592000")
// .appName("spark to es")
// .config("spark.yarn.am.waitTime", "1000")
.config("spark.hadoop.hive.exec.dynamic.partition", "true")//开启动态分区
.config("spark.hadoop.hive.exec.dynamic.partition.mode", "nonstrict")//开启动态分区
.enableHiveSupport() //支持hive
.getOrCreate()
import session.implicits._
//首先处理
val dataFrame: DataFrame = session.sql(
"""
|select id,create_by,create_at,update_by,update_at,position_top,position_left,materiel_group_child_id,
|drawing_no,width,height,drawing_type from
| dws.center_materiel_hotspot
|""".stripMargin)
val rdd = dataFrame.map(x => {
val id: String = x.getAs(0)
val create_by: String = x.getAs(1)
val create_at: String = x.getAs(2).toString
val update_by: String = x.getAs(3)
val update_at: String = x.getAs(4).toString
val position_top: String = if(x.getAs(5)!=null) x.getAs(5).toString else x.getAs(5)
val position_left: String = if(x.getAs(6)!=null) x.getAs(6).toString else x.getAs(6)
val materiel_group_child_id: String = x.getAs(7)
val drawing_no: String = x.getAs(8)
val width: String = if(x.getAs(9)!=null) x.getAs(9).toString else x.getAs(9)
val height: String = if(x.getAs(10)!=null) x.getAs(10).toString else x.getAs(10)
val drawing_type: String = x.getAs(11)
val createTime: Long = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss").parse(create_at).getTime
val updateTime: Long = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss").parse(update_at).getTime
center_materiel_hotspot_local(id, create_by, createTime, update_by, updateTime, position_top,position_left,
materiel_group_child_id,drawing_no,width,height,drawing_type)
})
// rdd.show(2)
rdd.foreachPartition(x=>{
val tuples: List[(String, center_materiel_hotspot_local)] = x.toList.map { data => (data.id, data) }
//tuples 数据,参二:索引名称 ,参三:类型
val dt: String = new SimpleDateFormat("yyyyMMdd").format(new Date())
savaBulk(tuples,"center_materiel_hotspot_local","_doc")
})
session.stop()
}
case class center_materiel_hotspot_local(id:String, createBy:String, createTime:Long, updateBy:String,
updateTime:Long,positionTop:String, positionLeft:String,
materielGroupChildId:String,drawingNo:String, width:String,
height:String, drawingType:String )
def savaBulk(dataList:List[(String,AnyRef)],indexName:String,typeName:String):Unit={
if (dataList!=null && dataList.nonEmpty) {
val client: JestClient = jestClient
val builder = new Bulk.Builder
builder.defaultIndex(indexName).defaultType(typeName)
for ((id, data) <- dataList) {
val index: Index = new Index.Builder(data).id(id).build()
builder.addAction(index)
}
val bulk: Bulk = builder.build()
//获取执行的返回值
val items: util.List[BulkResult#BulkResultItem] = client.execute(bulk).getItems
println("以保存:" + items.size() + "条数据!")
client.close()
}
}
import io.searchbox.client.JestClient
import io.searchbox.client.JestClientFactory
import io.searchbox.client.config.HttpClientConfig
def jestClient: JestClient = {
val factory = new JestClientFactory
factory.setHttpClientConfig(new HttpClientConfig.Builder(
"http://es-cn-tl32b5q0t003a6n6g.public.elasticsearch.aliyuncs.com:9200")
.multiThreaded(true)
.defaultMaxTotalConnectionPerRoute(2)
.maxTotalConnection(10)
.build)
factory.getObject
}
}