定义生成Parquet文件的字段结构
相关依赖在《Spark练习测试——读写Parquet格式文件(一)》中的开头处。
定义生成数据的结构
定义一个表结构,在该类中有个
conversionToRow()
方法,这个方法将会在定义Parquet文件字段的时候用到。
import lombok.Data;
import org.apache.spark.sql.Row;
import org.apache.spark.sql.RowFactory;
/**
* 测试结构
* 属性为表字段
*/
@Data
public class TestEntity {
private int intValue;
private long longValue;
private double doubleValue;
private String stringValue;
private byte[] byteValue;
private byte[] byteNone;
/**
* 配置Parquet文件中的字段
* 字段顺序 intValue、longValue、doubleValue、stringValue、byteValue、byteNone
*
* @return
*/
public Row conversionToRow() {
return RowFactory.create(intValue, longValue, doubleValue, stringValue, byteValue, byteNone);
}
}
原结构序列化生成Parquet文件
写个测试生成一个Parquet文件,看看生成Parquet文件的
Schema
信息。
import cn.hutool.core.collection.ListUtil;
import org.apache.spark.SparkConf;
import org.apache.spark.sql.Dataset;
import org.apache.spark.sql.Row;
import org.apache.spark.sql.SaveMode;
import org.apache.spark.sql.SparkSession;
import org.apache.spark.sql.types.StructType;
import org.springframework.stereotype.Component;
@Component
public class TestComponent {
private static final String sparkDirPath = "C:\\Users\\Lenovo\\Desktop\\对比\\spark\\";
/**
* 生成文件测试
*
* @param testEntity
*/
public void sparkWriteToParquet(TestEntity testEntity) {
SparkConf sparkConf = new SparkConf();
sparkConf.set("spark.sql.parquet.binaryAsString", "true");
sparkConf.set("spark.sql.parquet.writeLegacyFormat", "true");
sparkConf.setMaster("local").setAppName("convertor");
//写入数据
SparkSession sparkSession = SparkSession.builder().appName("write").config(sparkConf).getOrCreate();
Dataset<Row> dataFrame = sparkSession.createDataFrame(ListUtil.toList(testEntity),TestEntity.class);
dataFrame.coalesce(1).write().mode(SaveMode.Append).parquet(sparkDirPath);
dataFrame.unpersist(true);
sparkSession.close();//关闭连接
}
/**
* 读取文件测试
*
* @param path
*/
public void readParquet(String path) {
SparkConf sparkConf = new SparkConf();
sparkConf.set("spark.sql.parquet.binaryAsString", "true");
sparkConf.set("spark.sql.parquet.writeLegacyFormat", "true");
sparkConf.setMaster("local").setAppName("convertor");
SparkSession sparkSession = SparkSession.builder()
.appName("convertor").config(sparkConf).getOrCreate();
Dataset<Row> dataset = sparkSession.read().parquet(path);
Dataset<Row> select = dataset.select("*");
select.printSchema();//打印结构
select.show();//打印内容
}
}
为了方便此处使用SpringBootTest编写测试用例。
import com.lyan.parquet_convert.test.TestComponent;
import com.lyan.parquet_convert.test.TestEntity;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.springframework.boot.test.context.SpringBootTest;
import org.springframework.test.context.junit4.SpringRunner;
import javax.annotation.Resource;
import java.nio.charset.StandardCharsets;
@RunWith(SpringRunner.class)
@SpringBootTest(classes = ParquetConvertApplication.class)
public class TestConvertTest {
@Resource
private TestComponent testComponent;
/**
* 生成Parquet文件测试
*/
@Test
public void writeDataToParquetTest(){
TestEntity testEntity = new TestEntity();
testEntity.setIntValue(100);
testEntity.setLongValue(200);
testEntity.setDoubleValue(300);
testEntity.setStringValue("测试");
testEntity.setByteValue("不为空的值".getBytes(StandardCharsets.UTF_8));
testComponent.sparkWriteToParquet(testEntity);
}
/**
* 读取Parquet文件测试
*/
@Test
public void readParquetTest() {
testComponent.readParquet("生成的文件目录");
}
}
执行
writeDataToParquetTest()
方法生成文件。结果如下图所示:
执行
readParquetTest
方法查看生成文件的结构和文件内容(路径设为生成文件的路径例:“C:\Users\Lenovo\Desktop\对比\spark\part-00000-0abbc6a3-d406-4467-b01e-0449742b5c0c-c000.snappy.parquet”),截取内容如下:
# Parquet文件结构信息
root
|-- byteNone: binary (nullable = true)
|-- byteValue: binary (nullable = true)
|-- doubleValue: double (nullable = true)
|-- intValue: integer (nullable = true)
|-- longValue: long (nullable = true)
|-- stringValue: string (nullable = true)
# Parquet文件内容信息
+--------+--------------------+-----------+--------+---------+-----------+
|byteNone| byteValue|doubleValue|intValue|longValue|stringValue|
+--------+--------------------+-----------+--------+---------+-----------+
| null|[E4 B8 8D E4 B8 B...| 300.0| 100| 200| 测试|
+--------+--------------------+-----------+--------+---------+-----------+
查看结果可以看到字段顺序已经打乱(至少看着不舒服)。还有就是如果有的字段需要更改怎么办,主要是不想更改类的属性名称。想想属性名称一变
Get()
、
Set()
啥的都得改一遍,太费劲了!(主要是太LOW了)
定义Parquet文件的字段结构和字段顺序
定义结构并不难,因为Spark提供了通过
StructType
生成Parqeut文件。通过它可以方便的完成字段名称的设置。接下来写个配置定一下要生成的文件结构和字段名称,这里将之前驼峰命名都改成下划线的:
import org.apache.spark.sql.types.*;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import java.util.ArrayList;
import java.util.List;
@Configuration
public class TestConfiguration {
@Bean("testStructType")
public StructType testStructType(){
List<StructField> structFields = new ArrayList<>();
structFields.add(new StructField("int_value", IntegerType$.MODULE$, true, Metadata.empty()));
structFields.add(new StructField("long_value", LongType$.MODULE$, true, Metadata.empty()));
structFields.add(new StructField("double_value", DoubleType$.MODULE$, true, Metadata.empty()));
structFields.add(new StructField("string_value", StringType$.MODULE$, true, Metadata.empty()));
structFields.add(new StructField("byte_value", BinaryType$.MODULE$, true, Metadata.empty()));
structFields.add(new StructField("byte_none", BinaryType$.MODULE$, true, Metadata.empty()));
return new StructType(structFields.toArray(new StructField[0]));
}
}
接下来修改下TestComponent的
sparkWriteToParquet()
方法,注意最后字段的顺序是由
Row
(相当于表里的行)完成的,所以之前在TestEntity类中添加了方法进行转换。修改之后代码如下:
@Autowired
@Qualifier("testStructType")
private StructType testStructType;
public void sparkWriteToParquet(TestEntity testEntity) {
SparkConf sparkConf = new SparkConf();
sparkConf.set("spark.sql.parquet.binaryAsString", "true");
sparkConf.set("spark.sql.parquet.writeLegacyFormat", "true");
sparkConf.setMaster("local").setAppName("convertor");
//转换对象
Stream<Row> rowStream = ListUtil.toList(testEntity).stream().map(TestEntity::conversionToRow);
//写入数据
SparkSession sparkSession = SparkSession.builder().appName("write").config(sparkConf).getOrCreate();
Dataset<Row> dataFrame = sparkSession.createDataFrame(rowStream.collect(Collectors.toList()), testStructType);
dataFrame.coalesce(1).write().mode(SaveMode.Append).parquet(sparkDirPath);
dataFrame.unpersist(true);
sparkSession.close();//关闭连接
}
省略生成文件的过程,直接查看文件结构和文件内容。只要查看字段名称和字段顺序。
# Parquet文件结构信息
root
|-- int_value: integer (nullable = true)
|-- long_value: long (nullable = true)
|-- double_value: double (nullable = true)
|-- string_value: string (nullable = true)
|-- byte_value: binary (nullable = true)
|-- byte_none: binary (nullable = true)
# Parquet文件内容信息
+---------+----------+------------+------------+--------------------+---------+
|int_value|long_value|double_value|string_value| byte_value|byte_none|
+---------+----------+------------+------------+--------------------+---------+
| 100| 200| 300.0| 测试|[E4 B8 8D E4 B8 B...| null|
+---------+----------+------------+------------+--------------------+---------+
结果还算不打脸,字段名称已改变,字段顺序也符合预期。