天天看点

spark sql增删改差操作mysql_spark dataframe 处理数据 增删改查

1、配置文件

package config

import org.apache.spark.sql.SparkSession

import org.apache.spark.{SparkConf, SparkContext}

case object conf {

private val master = "local[*]"

val confs: SparkConf = new SparkConf().setMaster(master).setAppName("jobs")

// val confs: SparkConf = new SparkConf().setMaster("http://laptop-2up1s8pr:4040/").setAppName("jobs")

val sc = new SparkContext(confs)

sc.setLogLevel("ERROR")

val spark_session: SparkSession = SparkSession.builder()

.appName("jobs").config(confs).getOrCreate()

// 设置支持笛卡尔积 对于spark2.0来说

spark_session.conf.set("spark.sql.crossJoin.enabled",value = true)

}

2、处理脚本

package sparkDataMange

import config.conf.{sc, spark_session}

import org.apache.spark.rdd.RDD

import org.apache.spark.sql.{Column, DataFrame, Row, SaveMode}

import config.conf.spark_session.implicits._

import org.apache.spark.sql.functions._

import org.apache.spark.sql.types.DoubleType

object irisDataFrame {

def main(args: Array[String]): Unit = {

val st: Long = System.currentTimeMillis()

val path:String = "data/iris.data"

var df: DataFrame = spark_session.read.csv(path)

val ls: Column = when(col("_c4").equalTo("Iris-setosa"), "1").

when(col("_c4").equalTo("Iris-versicolor"), "2").

otherwise("3")

df = df.withColumn("_c4",ls)

df = df.select(df.columns.map(f => df(f).cast(DoubleType)): _*)

df.show()

df.printSchema()

spark_session.stop()

println("执行时间为:"+ (System.currentTimeMillis()-st)/1000.toDouble +"s")

}

}