这里写目录标题
- JDBC
-
- 准备 MySQL 环境
- 使用 SparkSQL 向 MySQL 中写入数据
- 从 MySQL 中读取数据
JDBC
导读
1,通过 SQL 操作 MySQL 的表
2,将数据写入 MySQL 的表中
准备 MySQL 环境
在使用 SparkSQL 访问 MySQL 之前, 要对 MySQL 进行一些操作, 例如说创建用户, 表和库等
Step 1: 连接 MySQL 数据库
在 MySQL 所在的主机上执行如下命令
mysql -uroot -p
Step 2: 创建 Spark 使用的用户
登进 MySQL 后, 需要先创建用户
CREATE USER 'spark'@'%' IDENTIFIED BY 'Spark123!';
GRANT ALL ON spark_test.* TO 'spark'@'%';
Step 3: 创建库和表
CREATE DATABASE spark_test;
USE spark_test;
CREATE TABLE IF NOT EXISTS `student`(
`id` INT AUTO_INCREMENT,
`name` VARCHAR(100) NOT NULL,
`age` INT NOT NULL,
`gpa` FLOAT,
PRIMARY KEY ( `id` )
)ENGINE=InnoDB DEFAULT CHARSET=utf8;
使用 SparkSQL 向 MySQL 中写入数据
其实在使用 SparkSQL 访问 MySQL 是通过 JDBC, 那么其实所有支持 JDBC 的数据库理论上都可以通过这种方式进行访问
在使用 JDBC 访问关系型数据的时候, 其实也是使用 DataFrameReader, 对 DataFrameReader 提供一些配置, 就可以使用 Spark 访问 JDBC, 有如下几个配置可用
属性 | 含义 |
---|---|
url | 要连接的 JDBC URL |
dbtable | 要访问的表, 可以使用任何 SQL 语句中 from 子句支持的语法 |
fetchsize | 数据抓取的大小(单位行), 适用于读的情况 |
batchsize | 数据传输的大小(单位行), 适用于写的情况 |
isolationLevel | 事务隔离级别, 是一个枚举, 取值 NONE, READ_COMMITTED, READ_UNCOMMITTED, REPEATABLE_READ, SERIALIZABLE, 默认为 READ_UNCOMMITTED |
创建个文件名:(studenttab10k)
放入如下数据(由于笔者数据有1W个,但无法全部写入,这里只放200个)
ulysses thompson 64 1.90
katie carson 25 3.65
luke king 65 0.73
holly davidson 57 2.43
fred miller 55 3.77
holly white 43 0.24
luke steinbeck 51 1.14
nick underhill 31 2.46
holly davidson 59 1.26
calvin brown 56 0.72
rachel robinson 62 2.25
tom carson 35 0.56
tom johnson 72 0.99
irene garcia 54 1.06
oscar nixon 39 3.60
holly allen 32 2.58
oscar hernandez 19 0.05
alice ichabod 65 2.25
wendy thompson 30 2.39
priscilla hernandez 73 0.23
gabriella van buren 68 1.32
yuri thompson 42 3.65
yuri laertes 60 1.16
sarah young 23 2.76
zach white 32 0.20
nick van buren 68 1.75
xavier underhill 41 1.51
bob ichabod 56 2.81
zach steinbeck 61 2.22
alice garcia 42 2.03
jessica king 29 3.61
calvin nixon 37 0.30
fred polk 66 3.69
bob zipper 40 0.28
alice young 75 0.31
nick underhill 37 1.65
mike white 57 0.69
calvin ovid 41 3.02
fred steinbeck 47 3.57
sarah ovid 65 0.00
wendy nixon 63 0.62
gabriella zipper 77 1.51
david king 40 1.99
jessica white 30 3.82
alice robinson 37 3.69
zach nixon 74 2.75
irene davidson 27 1.22
priscilla xylophone 43 1.60
oscar zipper 25 2.43
fred falkner 38 2.23
ulysses polk 58 0.01
katie hernandez 47 3.80
zach steinbeck 55 0.68
fred laertes 69 3.62
quinn laertes 70 3.66
nick garcia 50 0.12
oscar young 55 2.22
bob underhill 47 0.24
calvin young 77 1.60
mike allen 65 2.95
david young 77 0.26
oscar garcia 69 1.59
ulysses ichabod 26 0.95
wendy laertes 76 1.13
sarah laertes 20 0.24
zach ichabod 60 1.60
tom robinson 62 0.78
zach steinbeck 69 1.01
quinn garcia 57 0.98
yuri van buren 32 1.97
luke carson 39 0.76
calvin ovid 73 0.82
luke ellison 27 0.56
oscar zipper 50 1.31
fred steinbeck 52 3.14
katie xylophone 76 1.38
luke king 54 2.30
ethan white 72 1.43
yuri ovid 37 3.64
jessica garcia 54 1.08
luke young 29 0.80
mike miller 39 3.35
fred hernandez 63 0.17
priscilla hernandez 52 0.35
ethan garcia 43 1.70
quinn hernandez 25 2.58
calvin nixon 33 1.01
yuri xylophone 47 1.36
ulysses steinbeck 63 1.05
jessica nixon 25 2.13
bob johnson 53 3.31
jessica ichabod 56 2.21
zach miller 63 3.87
priscilla white 66 2.82
ulysses allen 21 1.68
katie falkner 47 1.49
tom king 51 1.91
bob laertes 60 3.33
luke nixon 27 3.54
quinn johnson 42 2.24
wendy quirinius 71 0.10
victor polk 55 3.63
rachel robinson 32 1.11
sarah king 57 1.37
victor young 38 1.72
priscilla steinbeck 38 2.11
fred brown 19 2.72
xavier underhill 55 3.56
irene ovid 67 3.80
calvin brown 37 2.22
katie thompson 20 3.27
katie carson 66 3.55
tom miller 57 2.83
rachel brown 56 0.74
holly johnson 38 2.51
irene steinbeck 29 1.97
wendy falkner 37 0.14
ethan white 29 3.62
bob underhill 26 1.10
jessica king 64 0.69
luke steinbeck 19 1.16
luke laertes 70 3.58
rachel polk 74 0.92
calvin xylophone 52 0.58
luke white 57 3.86
calvin van buren 52 3.13
holly quirinius 59 1.70
mike brown 44 1.93
yuri ichabod 61 0.70
ulysses miller 56 3.53
victor hernandez 64 2.52
oscar young 34 0.34
luke ovid 36 3.17
quinn ellison 50 1.13
quinn xylophone 72 2.07
nick underhill 48 0.15
rachel miller 23 3.38
mike van buren 68 1.74
zach van buren 38 0.34
irene zipper 32 0.54
sarah garcia 31 3.87
rachel van buren 56 0.35
fred davidson 69 1.57
nick hernandez 19 2.11
irene polk 40 3.89
katie young 26 2.88
priscilla ovid 49 3.28
jessica hernandez 39 3.13
yuri allen 29 3.51
victor garcia 66 3.45
zach johnson 77 0.95
yuri zipper 48 3.44
alice falkner 28 3.72
gabriella allen 58 3.61
bob nixon 34 3.34
bob white 67 2.93
holly steinbeck 57 1.81
wendy van buren 40 1.09
calvin brown 61 2.08
irene young 25 2.66
holly van buren 40 2.37
katie underhill 30 0.63
quinn hernandez 73 0.31
fred nixon 53 1.76
luke ellison 59 1.10
quinn nixon 24 0.30
ethan underhill 68 2.25
fred underhill 28 3.88
jessica brown 59 3.66
katie falkner 49 3.96
calvin ellison 27 2.23
zach carson 59 0.46
priscilla polk 47 2.99
rachel zipper 49 3.26
holly king 73 1.23
zach carson 64 2.60
fred xylophone 61 3.15
gabriella miller 43 1.73
david laertes 56 3.43
tom garcia 63 2.78
ethan king 66 3.13
david hernandez 26 2.52
wendy ichabod 57 2.81
alice young 69 0.25
tom xylophone 50 2.78
ulysses carson 62 2.26
nick garcia 43 2.23
gabriella ellison 33 1.18
ethan miller 28 2.15
tom zipper 19 2.56
wendy white 19 1.12
luke ovid 31 1.68
wendy xylophone 75 2.58
quinn garcia 22 3.65
holly ellison 68 0.26
yuri hernandez 75 2.50
tom underhill 71 2.68
ulysses king 31 1.76
fred thompson 46 1.55
gabriella ichabod 26 1.59
读取数据集, 处理过后存往 MySQL 中的代码如下
package com.spark.jdbc
import org.apache.spark.sql.{SaveMode, SparkSession}
import org.apache.spark.sql.types.{FloatType, IntegerType, StringType, StructField, StructType}
object MySQLWrite {
/**
* MySQL 的访问方式有两种 : 使用本地运行 , 提交到集群运行
* 在写入MySQL数据时 , 使用本地运行 , 读取的时候使用集群运行
*/
def main(args: Array[String]): Unit = {
//1.创建SparkSession 对象
val spark = SparkSession.builder()
.appName(this.getClass.getSimpleName)
.master("local[6]")
.getOrCreate()
//2.读取数据创建DataFrame
// 1.拷贝文件
// 2.读取
val schema = StructType(
List(
StructField("name",StringType),
StructField("age",IntegerType),
StructField("gpa",FloatType)
)
)
val studentDF = spark.read
.schema(schema)
.option("delimiter", "\t")
.csv("E:\\Project\\Spark\\spark-sql\\input\\studenttab10k")
//3.处理数据 (不处理了,直接写)
//4.落地
studentDF.write
.format("jdbc")
.option("url","jdbc:mysql://Bigdata01:3306/spark_test")
.option("dbtable","student")
.option("user","spark")
.option("password","Spark123!")
.mode(SaveMode.Overwrite)
.save()
}
}
运行程序
如果是在本地运行, 需要导入 Maven 依赖
<dependency>
<groupId>mysql</groupId>
<artifactId>mysql-connector-java</artifactId>
<version>5.1.38</version>
</dependency>
如果使用 Spark submit 或者 Spark shell 来运行任务, 需要通过 --jars 参数提交 MySQL 的 Jar 包, 或者指定 --packages 从 Maven 库中读取
bin/spark-shell \
--packages mysql:mysql-connector-java:5.1.47 \
--repositories http://maven.aliyun.com/nexus/content/groups/public/
运行后查询结果如下:
mysql> select * from student limit 29;
+---------------------+------+------+
| name | age | gpa |
+---------------------+------+------+
| katie carson | 25 | 3.65 |
| luke king | 65 | 0.73 |
| holly davidson | 57 | 2.43 |
| fred miller | 55 | 3.77 |
| holly white | 43 | 0.24 |
| luke steinbeck | 51 | 1.14 |
| nick underhill | 31 | 2.46 |
| holly davidson | 59 | 1.26 |
| calvin brown | 56 | 0.72 |
| rachel robinson | 62 | 2.25 |
| tom carson | 35 | 0.56 |
| tom johnson | 72 | 0.99 |
| irene garcia | 54 | 1.06 |
| oscar nixon | 39 | 3.6 |
| holly allen | 32 | 2.58 |
| oscar hernandez | 19 | 0.05 |
| alice ichabod | 65 | 2.25 |
| wendy thompson | 30 | 2.39 |
| priscilla hernandez | 73 | 0.23 |
| gabriella van buren | 68 | 1.32 |
| yuri thompson | 42 | 3.65 |
| yuri laertes | 60 | 1.16 |
| sarah young | 23 | 2.76 |
| zach white | 32 | 0.2 |
| nick van buren | 68 | 1.75 |
| xavier underhill | 41 | 1.51 |
| bob ichabod | 56 | 2.81 |
| zach steinbeck | 61 | 2.22 |
| alice garcia | 42 | 2.03 |
+---------------------+------+------+
29 rows in set (0.00 sec)
从 MySQL 中读取数据
读取 MySQL 的方式也非常的简单, 只是使用 SparkSQL 的 DataFrameReader 加上参数配置即可访问
package com.spark.jdbc
import org.apache.spark.sql.SparkSession
object MySQLRead {
def main(args: Array[String]): Unit = {
val spark = SparkSession.builder()
.appName(this.getClass.getSimpleName)
.master("local[6]")
.getOrCreate()
spark.read.format("jdbc")
.option("url","jdbc:mysql://Bigdata01:3306/spark_test")
.option("dbtable","student")
.option("user","spark")
.option("password","Spark123!")
.load()
.show()
}
}
默认情况下读取 MySQL 表时, 从 MySQL 表中读取的数据放入了一个分区, 拉取后可以使用 DataFrame 重分区来保证并行计算和内存占用不会太高, 但是如果感觉 MySQL 中数据过多的时候, 读取时可能就会产生 OOM, 所以在数据量比较大的场景, 就需要在读取的时候就将其分发到不同的 RDD 分区
属性 | 含义 |
---|---|
partitionColumn | 指定按照哪一列进行分区, 只能设置类型为数字的列, 一般指定为 ID |
lowerBound, upperBound | 确定步长的参数, lowerBound - upperBound 之间的数据均分给每一个分区, 小于 lowerBound 的数据分给第一个分区, 大于 upperBound 的数据分给最后一个分区 |
numPartitions | 分区数量 |
spark.read.format("jdbc")
.option("url", "jdbc:mysql://node01:3306/spark_test")
.option("dbtable", "student")
.option("user", "spark")
.option("password", "Spark123!")
.option("partitionColumn", "age")
.option("lowerBound", 1)
.option("upperBound", 60)
.option("numPartitions", 10)
.load()
.show()
有时候可能要使用非数字列来作为分区依据, Spark 也提供了针对任意类型的列作为分区依据的方法
val predicates = Array(
"age < 20",
"age >= 20, age < 30",
"age >= 30"
)
val connectionProperties = new Properties()
connectionProperties.setProperty("user", "spark")
connectionProperties.setProperty("password", "Spark123!")
spark.read
.jdbc(
url = "jdbc:mysql://node01:3306/spark_test",
table = "student",
predicates = predicates,
connectionProperties = connectionProperties
)
.show()
SparkSQL 中并没有直接提供按照 SQL 进行筛选读取数据的 API 和参数, 但是可以通过 dbtable 来曲线救国, dbtable 指定目标表的名称, 但是因为 dbtable 中可以编写 SQL, 所以使用子查询即可做到
spark.read.format("jdbc")
.option("url", "jdbc:mysql://node01:3306/spark_test")
.option("dbtable", "(select name, age from student where age > 10 and age < 20) as stu")
.option("user", "spark")
.option("password", "Spark123!")
.option("partitionColumn", "age")
.option("lowerBound", 1)
.option("upperBound", 60)
.option("numPartitions", 10)
.load()
.show()