1、配置文件
package config
import org.apache.spark.sql.SparkSession
import org.apache.spark.{SparkConf, SparkContext}
case object conf {
private val master = "local[*]"
val confs: SparkConf = new SparkConf().setMaster(master).setAppName("jobs")
// val confs: SparkConf = new SparkConf().setMaster("http://laptop-2up1s8pr:4040/").setAppName("jobs")
val sc = new SparkContext(confs)
sc.setLogLevel("ERROR")
val spark_session: SparkSession = SparkSession.builder()
.appName("jobs").config(confs).getOrCreate()
// 设置支持笛卡尔积 对于spark2.0来说
spark_session.conf.set("spark.sql.crossJoin.enabled",value = true)
}
2、处理脚本
package sparkDataMange
import config.conf.{sc, spark_session}
import org.apache.spark.rdd.RDD
import org.apache.spark.sql.{Column, DataFrame, Row, SaveMode}
import config.conf.spark_session.implicits._
import org.apache.spark.sql.functions._
import org.apache.spark.sql.types.DoubleType
object irisDataFrame {
def main(args: Array[String]): Unit = {
val st: Long = System.currentTimeMillis()
val path:String = "data/iris.data"
var df: DataFrame = spark_session.read.csv(path)
val ls: Column = when(col("_c4").equalTo("Iris-setosa"), "1").
when(col("_c4").equalTo("Iris-versicolor"), "2").
otherwise("3")
df = df.withColumn("_c4",ls)
df = df.select(df.columns.map(f => df(f).cast(DoubleType)): _*)
df.show()
df.printSchema()
spark_session.stop()
println("执行时间为:"+ (System.currentTimeMillis()-st)/1000.toDouble +"s")
}
}