天天看点

Spark SQL 快速入门系列(六)Spark SQL 访问 JDBCJDBC

这里写目录标题

  • JDBC
    • 准备 MySQL 环境
    • 使用 SparkSQL 向 MySQL 中写入数据
    • 从 MySQL 中读取数据

JDBC

导读

1,通过 SQL 操作 MySQL 的表

2,将数据写入 MySQL 的表中

准备 MySQL 环境

在使用 SparkSQL 访问 MySQL 之前, 要对 MySQL 进行一些操作, 例如说创建用户, 表和库等

Step 1: 连接 MySQL 数据库

在 MySQL 所在的主机上执行如下命令

mysql -uroot -p
           

Step 2: 创建 Spark 使用的用户

登进 MySQL 后, 需要先创建用户

CREATE USER 'spark'@'%' IDENTIFIED BY 'Spark123!';
GRANT ALL ON spark_test.* TO 'spark'@'%';
           

Step 3: 创建库和表

CREATE DATABASE spark_test;

USE spark_test;

CREATE TABLE IF NOT EXISTS `student`(
`id` INT AUTO_INCREMENT,
`name` VARCHAR(100) NOT NULL,
`age` INT NOT NULL,
`gpa` FLOAT,
PRIMARY KEY ( `id` )
)ENGINE=InnoDB DEFAULT CHARSET=utf8;

           

使用 SparkSQL 向 MySQL 中写入数据

其实在使用 SparkSQL 访问 MySQL 是通过 JDBC, 那么其实所有支持 JDBC 的数据库理论上都可以通过这种方式进行访问

在使用 JDBC 访问关系型数据的时候, 其实也是使用 DataFrameReader, 对 DataFrameReader 提供一些配置, 就可以使用 Spark 访问 JDBC, 有如下几个配置可用

属性 含义
url 要连接的 JDBC URL
dbtable 要访问的表, 可以使用任何 SQL 语句中 from 子句支持的语法
fetchsize 数据抓取的大小(单位行), 适用于读的情况
batchsize 数据传输的大小(单位行), 适用于写的情况
isolationLevel 事务隔离级别, 是一个枚举, 取值 NONE, READ_COMMITTED, READ_UNCOMMITTED, REPEATABLE_READ, SERIALIZABLE, 默认为 READ_UNCOMMITTED

创建个文件名:(studenttab10k)

放入如下数据(由于笔者数据有1W个,但无法全部写入,这里只放200个)

ulysses thompson    64	1.90
katie carson	25	3.65
luke king	65	0.73
holly davidson	57	2.43
fred miller	55	3.77
holly white	43	0.24
luke steinbeck	51	1.14
nick underhill	31	2.46
holly davidson	59	1.26
calvin brown	56	0.72
rachel robinson	62	2.25
tom carson	35	0.56
tom johnson	72	0.99
irene garcia	54	1.06
oscar nixon	39	3.60
holly allen	32	2.58
oscar hernandez	19	0.05
alice ichabod	65	2.25
wendy thompson	30	2.39
priscilla hernandez	73	0.23
gabriella van buren	68	1.32
yuri thompson	42	3.65
yuri laertes	60	1.16
sarah young	23	2.76
zach white	32	0.20
nick van buren	68	1.75
xavier underhill	41	1.51
bob ichabod	56	2.81
zach steinbeck	61	2.22
alice garcia	42	2.03
jessica king	29	3.61
calvin nixon	37	0.30
fred polk	66	3.69
bob zipper	40	0.28
alice young	75	0.31
nick underhill	37	1.65
mike white	57	0.69
calvin ovid	41	3.02
fred steinbeck	47	3.57
sarah ovid	65	0.00
wendy nixon	63	0.62
gabriella zipper	77	1.51
david king	40	1.99
jessica white	30	3.82
alice robinson	37	3.69
zach nixon	74	2.75
irene davidson	27	1.22
priscilla xylophone	43	1.60
oscar zipper	25	2.43
fred falkner	38	2.23
ulysses polk	58	0.01
katie hernandez	47	3.80
zach steinbeck	55	0.68
fred laertes	69	3.62
quinn laertes	70	3.66
nick garcia	50	0.12
oscar young	55	2.22
bob underhill	47	0.24
calvin young	77	1.60
mike allen	65	2.95
david young	77	0.26
oscar garcia	69	1.59
ulysses ichabod	26	0.95
wendy laertes	76	1.13
sarah laertes	20	0.24
zach ichabod	60	1.60
tom robinson	62	0.78
zach steinbeck	69	1.01
quinn garcia	57	0.98
yuri van buren	32	1.97
luke carson	39	0.76
calvin ovid	73	0.82
luke ellison	27	0.56
oscar zipper	50	1.31
fred steinbeck	52	3.14
katie xylophone	76	1.38
luke king	54	2.30
ethan white	72	1.43
yuri ovid	37	3.64
jessica garcia	54	1.08
luke young	29	0.80
mike miller	39	3.35
fred hernandez	63	0.17
priscilla hernandez	52	0.35
ethan garcia	43	1.70
quinn hernandez	25	2.58
calvin nixon	33	1.01
yuri xylophone	47	1.36
ulysses steinbeck	63	1.05
jessica nixon	25	2.13
bob johnson	53	3.31
jessica ichabod	56	2.21
zach miller	63	3.87
priscilla white	66	2.82
ulysses allen	21	1.68
katie falkner	47	1.49
tom king	51	1.91
bob laertes	60	3.33
luke nixon	27	3.54
quinn johnson	42	2.24
wendy quirinius	71	0.10
victor polk	55	3.63
rachel robinson	32	1.11
sarah king	57	1.37
victor young	38	1.72
priscilla steinbeck	38	2.11
fred brown	19	2.72
xavier underhill	55	3.56
irene ovid	67	3.80
calvin brown	37	2.22
katie thompson	20	3.27
katie carson	66	3.55
tom miller	57	2.83
rachel brown	56	0.74
holly johnson	38	2.51
irene steinbeck	29	1.97
wendy falkner	37	0.14
ethan white	29	3.62
bob underhill	26	1.10
jessica king	64	0.69
luke steinbeck	19	1.16
luke laertes	70	3.58
rachel polk	74	0.92
calvin xylophone	52	0.58
luke white	57	3.86
calvin van buren	52	3.13
holly quirinius	59	1.70
mike brown	44	1.93
yuri ichabod	61	0.70
ulysses miller	56	3.53
victor hernandez	64	2.52
oscar young	34	0.34
luke ovid	36	3.17
quinn ellison	50	1.13
quinn xylophone	72	2.07
nick underhill	48	0.15
rachel miller	23	3.38
mike van buren	68	1.74
zach van buren	38	0.34
irene zipper	32	0.54
sarah garcia	31	3.87
rachel van buren	56	0.35
fred davidson	69	1.57
nick hernandez	19	2.11
irene polk	40	3.89
katie young	26	2.88
priscilla ovid	49	3.28
jessica hernandez	39	3.13
yuri allen	29	3.51
victor garcia	66	3.45
zach johnson	77	0.95
yuri zipper	48	3.44
alice falkner	28	3.72
gabriella allen	58	3.61
bob nixon	34	3.34
bob white	67	2.93
holly steinbeck	57	1.81
wendy van buren	40	1.09
calvin brown	61	2.08
irene young	25	2.66
holly van buren	40	2.37
katie underhill	30	0.63
quinn hernandez	73	0.31
fred nixon	53	1.76
luke ellison	59	1.10
quinn nixon	24	0.30
ethan underhill	68	2.25
fred underhill	28	3.88
jessica brown	59	3.66
katie falkner	49	3.96
calvin ellison	27	2.23
zach carson	59	0.46
priscilla polk	47	2.99
rachel zipper	49	3.26
holly king	73	1.23
zach carson	64	2.60
fred xylophone	61	3.15
gabriella miller	43	1.73
david laertes	56	3.43
tom garcia	63	2.78
ethan king	66	3.13
david hernandez	26	2.52
wendy ichabod	57	2.81
alice young	69	0.25
tom xylophone	50	2.78
ulysses carson	62	2.26
nick garcia	43	2.23
gabriella ellison	33	1.18
ethan miller	28	2.15
tom zipper	19	2.56
wendy white	19	1.12
luke ovid	31	1.68
wendy xylophone	75	2.58
quinn garcia	22	3.65
holly ellison	68	0.26
yuri hernandez	75	2.50
tom underhill	71	2.68
ulysses king	31	1.76
fred thompson	46	1.55
gabriella ichabod	26	1.59
           

读取数据集, 处理过后存往 MySQL 中的代码如下

package com.spark.jdbc

import org.apache.spark.sql.{SaveMode, SparkSession}
import org.apache.spark.sql.types.{FloatType, IntegerType, StringType, StructField, StructType}

object MySQLWrite {
  /**
   * MySQL 的访问方式有两种 : 使用本地运行 , 提交到集群运行
   * 在写入MySQL数据时 , 使用本地运行 , 读取的时候使用集群运行
   */

  def main(args: Array[String]): Unit = {
    //1.创建SparkSession 对象
    val spark = SparkSession.builder()
      .appName(this.getClass.getSimpleName)
      .master("local[6]")
      .getOrCreate()


    //2.读取数据创建DataFrame
    //  1.拷贝文件
    //  2.读取
    val schema = StructType(
      List(
        StructField("name",StringType),
        StructField("age",IntegerType),
        StructField("gpa",FloatType)
      )
    )


    val studentDF = spark.read
      .schema(schema)
      .option("delimiter", "\t")
      .csv("E:\\Project\\Spark\\spark-sql\\input\\studenttab10k")


    //3.处理数据 (不处理了,直接写)

    //4.落地
    studentDF.write
      .format("jdbc")
      .option("url","jdbc:mysql://Bigdata01:3306/spark_test")
      .option("dbtable","student")
      .option("user","spark")
      .option("password","Spark123!")
      .mode(SaveMode.Overwrite)
      .save()

  }
}

           

运行程序

如果是在本地运行, 需要导入 Maven 依赖

<dependency>
    <groupId>mysql</groupId>
    <artifactId>mysql-connector-java</artifactId>
    <version>5.1.38</version>
</dependency>
           

如果使用 Spark submit 或者 Spark shell 来运行任务, 需要通过 --jars 参数提交 MySQL 的 Jar 包, 或者指定 --packages 从 Maven 库中读取

bin/spark-shell \
--packages  mysql:mysql-connector-java:5.1.47 \
--repositories http://maven.aliyun.com/nexus/content/groups/public/
           

运行后查询结果如下:

mysql> select * from student limit 29;
+---------------------+------+------+
| name                | age  | gpa  |
+---------------------+------+------+
| katie carson        |   25 | 3.65 |
| luke king           |   65 | 0.73 |
| holly davidson      |   57 | 2.43 |
| fred miller         |   55 | 3.77 |
| holly white         |   43 | 0.24 |
| luke steinbeck      |   51 | 1.14 |
| nick underhill      |   31 | 2.46 |
| holly davidson      |   59 | 1.26 |
| calvin brown        |   56 | 0.72 |
| rachel robinson     |   62 | 2.25 |
| tom carson          |   35 | 0.56 |
| tom johnson         |   72 | 0.99 |
| irene garcia        |   54 | 1.06 |
| oscar nixon         |   39 |  3.6 |
| holly allen         |   32 | 2.58 |
| oscar hernandez     |   19 | 0.05 |
| alice ichabod       |   65 | 2.25 |
| wendy thompson      |   30 | 2.39 |
| priscilla hernandez |   73 | 0.23 |
| gabriella van buren |   68 | 1.32 |
| yuri thompson       |   42 | 3.65 |
| yuri laertes        |   60 | 1.16 |
| sarah young         |   23 | 2.76 |
| zach white          |   32 |  0.2 |
| nick van buren      |   68 | 1.75 |
| xavier underhill    |   41 | 1.51 |
| bob ichabod         |   56 | 2.81 |
| zach steinbeck      |   61 | 2.22 |
| alice garcia        |   42 | 2.03 |
+---------------------+------+------+
29 rows in set (0.00 sec)

           

从 MySQL 中读取数据

读取 MySQL 的方式也非常的简单, 只是使用 SparkSQL 的 DataFrameReader 加上参数配置即可访问

package com.spark.jdbc

import org.apache.spark.sql.SparkSession

object MySQLRead {
  def main(args: Array[String]): Unit = {

    val spark = SparkSession.builder()
      .appName(this.getClass.getSimpleName)
      .master("local[6]")
      .getOrCreate()



    spark.read.format("jdbc")
      .option("url","jdbc:mysql://Bigdata01:3306/spark_test")
      .option("dbtable","student")
      .option("user","spark")
      .option("password","Spark123!")
      .load()
      .show()
  }
}
           

默认情况下读取 MySQL 表时, 从 MySQL 表中读取的数据放入了一个分区, 拉取后可以使用 DataFrame 重分区来保证并行计算和内存占用不会太高, 但是如果感觉 MySQL 中数据过多的时候, 读取时可能就会产生 OOM, 所以在数据量比较大的场景, 就需要在读取的时候就将其分发到不同的 RDD 分区

属性 含义
partitionColumn 指定按照哪一列进行分区, 只能设置类型为数字的列, 一般指定为 ID
lowerBound, upperBound 确定步长的参数, lowerBound - upperBound 之间的数据均分给每一个分区, 小于 lowerBound 的数据分给第一个分区, 大于 upperBound 的数据分给最后一个分区
numPartitions 分区数量
spark.read.format("jdbc")
  .option("url", "jdbc:mysql://node01:3306/spark_test")
  .option("dbtable", "student")
  .option("user", "spark")
  .option("password", "Spark123!")
  .option("partitionColumn", "age")
  .option("lowerBound", 1)
  .option("upperBound", 60)
  .option("numPartitions", 10)
  .load()
  .show()
           

有时候可能要使用非数字列来作为分区依据, Spark 也提供了针对任意类型的列作为分区依据的方法

val predicates = Array(
  "age < 20",
  "age >= 20, age < 30",
  "age >= 30"
)

val connectionProperties = new Properties()
connectionProperties.setProperty("user", "spark")
connectionProperties.setProperty("password", "Spark123!")

spark.read
  .jdbc(
    url = "jdbc:mysql://node01:3306/spark_test",
    table = "student",
    predicates = predicates,
    connectionProperties = connectionProperties
  )
  .show()

           

SparkSQL 中并没有直接提供按照 SQL 进行筛选读取数据的 API 和参数, 但是可以通过 dbtable 来曲线救国, dbtable 指定目标表的名称, 但是因为 dbtable 中可以编写 SQL, 所以使用子查询即可做到

spark.read.format("jdbc")
  .option("url", "jdbc:mysql://node01:3306/spark_test")
  .option("dbtable", "(select name, age from student where age > 10 and age < 20) as stu")
  .option("user", "spark")
  .option("password", "Spark123!")
  .option("partitionColumn", "age")
  .option("lowerBound", 1)
  .option("upperBound", 60)
  .option("numPartitions", 10)
  .load()
  .show()
           

继续阅读