Spark3.1.2與Iceberg0.12.1整合
Spark可以操作Iceberg資料湖,這裡使用的Iceberg的版本為0.12.1,此版本與Spark2.4版本之上相容。由于在Spark2.4版本中在操作Iceberg時不支援DDL、增加分區及增加分區轉換、Iceberg中繼資料查詢、insert into/overwrite等操作,建議使用Spark3.x版本來整合Iceberg0.12.1版本,這裡我們使用的Spark版本是3.1.2版本。
一、向pom檔案導入依賴
在Idea中建立Maven項目,在pom檔案中導入以下關鍵依賴:
<!-- 配置以下可以解決 在jdk1.8環境下打包時報錯 “-source 1.5 中不支援 lambda 表達式” -->
<properties>
<project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
<maven.compiler.source>1.8</maven.compiler.source>
<maven.compiler.target>1.8</maven.compiler.target>
</properties>
<dependencies>
<!-- Spark-core -->
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-core_2.12</artifactId>
<version>3.1.2</version>
</dependency>
<!-- Spark與Iceberg整合的依賴包-->
<dependency>
<groupId>org.apache.iceberg</groupId>
<artifactId>iceberg-spark3</artifactId>
<version>0.12.1</version>
</dependency>
<dependency>
<groupId>org.apache.iceberg</groupId>
<artifactId>iceberg-spark3-runtime</artifactId>
<version>0.12.1</version>
</dependency>
<!-- avro格式 依賴包 -->
<dependency>
<groupId>org.apache.avro</groupId>
<artifactId>avro</artifactId>
<version>1.10.2</version>
</dependency>
<!-- parquet格式 依賴包 -->
<dependency>
<groupId>org.apache.parquet</groupId>
<artifactId>parquet-hadoop</artifactId>
<version>1.12.0</version>
</dependency>
<!-- SparkSQL -->
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-sql_2.12</artifactId>
<version>3.1.2</version>
</dependency>
<!-- SparkSQL ON Hive-->
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-hive_2.12</artifactId>
<version>3.1.2</version>
</dependency>
<!--<!–mysql依賴的jar包–>-->
<!--<dependency>-->
<!--<groupId>mysql</groupId>-->
<!--<artifactId>mysql-connector-java</artifactId>-->
<!--<version>5.1.47</version>-->
<!--</dependency>-->
<!--SparkStreaming-->
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-streaming_2.12</artifactId>
<version>3.1.2</version>
</dependency>
<!-- SparkStreaming + Kafka -->
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-streaming-kafka-0-10_2.12</artifactId>
<version>3.1.2</version>
</dependency>
<!--<!– 向kafka 生産資料需要包 –>-->
<!--<dependency>-->
<!--<groupId>org.apache.kafka</groupId>-->
<!--<artifactId>kafka-clients</artifactId>-->
<!--<version>0.10.0.0</version>-->
<!--<!– 編譯和測試使用jar包,沒有傳遞性 –>-->
<!--<!–<scope>provided</scope>–>-->
<!--</dependency>-->
<!-- StructStreaming + Kafka -->
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-sql-kafka-0-10_2.12</artifactId>
<version>3.1.2</version>
</dependency>
<!-- Scala 包-->
<dependency>
<groupId>org.scala-lang</groupId>
<artifactId>scala-library</artifactId>
<version>2.12.14</version>
</dependency>
<dependency>
<groupId>org.scala-lang</groupId>
<artifactId>scala-compiler</artifactId>
<version>2.12.14</version>
</dependency>
<dependency>
<groupId>org.scala-lang</groupId>
<artifactId>scala-reflect</artifactId>
<version>2.12.14</version>
</dependency>
<dependency>
<groupId>log4j</groupId>
<artifactId>log4j</artifactId>
<version>1.2.12</version>
</dependency>
<dependency>
<groupId>com.google.collections</groupId>
<artifactId>google-collections</artifactId>
<version>1.0</version>
</dependency>
</dependencies>
複制
二、SparkSQL設定catalog配置
以下操作主要是SparkSQL操作Iceberg,同樣Spark中支援兩種Catalog的設定:hive和hadoop,Hive Catalog就是iceberg表存儲使用Hive預設的資料路徑,Hadoop Catalog需要指定Iceberg格式表存儲路徑。
在SparkSQL代碼中通過以下方式來指定使用的Catalog:
val spark: SparkSession = SparkSession.builder().master("local").appName("SparkOperateIceberg")
//指定hive catalog, catalog名稱為hive_prod
.config("spark.sql.catalog.hive_prod", "org.apache.iceberg.spark.SparkCatalog")
.config("spark.sql.catalog.hive_prod.type", "hive")
.config("spark.sql.catalog.hive_prod.uri", "thrift://node1:9083")
.config("iceberg.engine.hive.enabled", "true")
//指定hadoop catalog,catalog名稱為hadoop_prod
.config("spark.sql.catalog.hadoop_prod", "org.apache.iceberg.spark.SparkCatalog")
.config("spark.sql.catalog.hadoop_prod.type", "hadoop")
.config("spark.sql.catalog.hadoop_prod.warehouse", "hdfs://mycluster/sparkoperateiceberg")
.getOrCreate()
複制
三、使用Hive Catalog管理Iceberg表
使用Hive Catalog管理Iceberg表預設資料存儲在Hive對應的Warehouse目錄下,在Hive中會自動建立對應的Iceberg表,SparkSQL 相當于是Hive用戶端,需要額外設定“iceberg.engine.hive.enabled”屬性為true,否則在Hive對應的Iceberg格式表中查詢不到資料。
1、建立表
//建立表 ,hive_pord:指定catalog名稱。default:指定Hive中存在的庫。test:建立的iceberg表名。
spark.sql(
"""
| create table if not exists hive_prod.default.test(id int,name string,age int) using iceberg
""".stripMargin)
複制
注意:
1)建立表時,表名稱為:${catalog名稱}.${Hive中庫名}.${建立的Iceberg格式表名}
2)表建立之後,可以在Hive中查詢到對應的test表,建立的是Hive外表,在對應的Hive warehouse 目錄下可以看到對應的資料目錄。
![](https://img.laitimes.com/img/_0nNw4CM6IyYiwiM6ICdiwiIwIjNx8CX39CXy8CXycXZpZVZnFWbp9zZuBnLzMzYhRzM0UjNzEWM2MjZkNWY0MzMhFzMjFjZ3czNygzLclTMwkTNxEzLcVmdhNXLwRHdo9CXt92YucWbpRWdvx2Yx5yazF2Lc9CX6MHc0RHaiojIsJye.png)
2、插入資料
//插入資料
spark.sql(
"""
|insert into hive_prod.default.test values (1,"zs",18),(2,"ls",19),(3,"ww",20)
""".stripMargin)
複制
3、查詢資料
//查詢資料
spark.sql(
"""
|select * from hive_prod.default.test
""".stripMargin).show()
複制
結果如下:
在Hive對應的test表中也能查詢到資料:
4、删除表
//删除表,删除表對應的資料不會被删除
spark.sql(
"""
|drop table hive_prod.default.test
""".stripMargin)
複制
注意:删除表後,資料會被删除,但是表目錄還是存在,如果徹底删除資料,需要把對應的表目錄删除。
四、用Hadoop Catalog管理Iceberg表
使用Hadoop Catalog管理表,需要指定對應Iceberg存儲資料的目錄。
1、建立表
//建立表 ,hadoop_prod:指定Hadoop catalog名稱。default:指定庫名稱。test:建立的iceberg表名。
spark.sql(
"""
| create table if not exists hadoop_prod.default.test(id int,name string,age int) using iceberg
""".stripMargin)
複制
注意:
1)建立表名稱為:${Hadoop Catalog名稱}.${随意定義的庫名}.${Iceberg格式表名}
2)建立表後,會在hadoop_prod名稱對應的目錄下建立該表
2、插入資料
//插入資料
spark.sql(
"""
|insert into hadoop_prod.default.test values (1,"zs",18),(2,"ls",19),(3,"ww",20)
""".stripMargin)
複制
3、查詢資料
spark.sql(
"""
|select * from hadoop_prod.default.test
""".stripMargin).show()
複制
4、建立對應的Hive表映射資料
在Hive表中執行如下建表語句:
CREATE TABLE hdfs_iceberg (
id int,
name string,
age int
)
STORED BY 'org.apache.iceberg.mr.hive.HiveIcebergStorageHandler'
LOCATION 'hdfs://mycluster/sparkoperateiceberg/default/test'
TBLPROPERTIES ('iceberg.catalog'='location_based_table');
複制
在Hive中查詢“hdfs_iceberg”表資料如下:
5、删除表
spark.sql(
"""
|drop table hadoop_prod.default.test
""".stripMargin)
複制
注意:删除iceberg表後,資料被删除,對應的庫目錄存在。