天天看点

Spark练习测试(二)——定义Parquet文件的字段结构定义生成Parquet文件的字段结构

定义生成Parquet文件的字段结构

  相关依赖在《Spark练习测试——读写Parquet格式文件(一)》中的开头处。

定义生成数据的结构

  定义一个表结构,在该类中有个

conversionToRow()

方法,这个方法将会在定义Parquet文件字段的时候用到。

import lombok.Data;
import org.apache.spark.sql.Row;
import org.apache.spark.sql.RowFactory;

/**
 * 测试结构
 * 属性为表字段
 */
@Data
public class TestEntity {
    private int intValue;
    private long longValue;
    private double doubleValue;
    private String stringValue;
    private byte[] byteValue;
    private byte[] byteNone;

    /**
     * 配置Parquet文件中的字段
     * 字段顺序 intValue、longValue、doubleValue、stringValue、byteValue、byteNone
     *
     * @return
     */
    public Row conversionToRow() {
        return RowFactory.create(intValue, longValue, doubleValue, stringValue, byteValue, byteNone);
    }

}
           

原结构序列化生成Parquet文件

  写个测试生成一个Parquet文件,看看生成Parquet文件的

Schema

信息。

import cn.hutool.core.collection.ListUtil;
import org.apache.spark.SparkConf;
import org.apache.spark.sql.Dataset;
import org.apache.spark.sql.Row;
import org.apache.spark.sql.SaveMode;
import org.apache.spark.sql.SparkSession;
import org.apache.spark.sql.types.StructType;
import org.springframework.stereotype.Component;

@Component
public class TestComponent {

    private static final String sparkDirPath = "C:\\Users\\Lenovo\\Desktop\\对比\\spark\\";

    /**
     * 生成文件测试
     *  
     * @param testEntity
     */
    public void sparkWriteToParquet(TestEntity testEntity) {
        SparkConf sparkConf = new SparkConf();
        sparkConf.set("spark.sql.parquet.binaryAsString", "true");
        sparkConf.set("spark.sql.parquet.writeLegacyFormat", "true");
        sparkConf.setMaster("local").setAppName("convertor");
        //写入数据
        SparkSession sparkSession = SparkSession.builder().appName("write").config(sparkConf).getOrCreate();
        Dataset<Row> dataFrame = sparkSession.createDataFrame(ListUtil.toList(testEntity),TestEntity.class);
        dataFrame.coalesce(1).write().mode(SaveMode.Append).parquet(sparkDirPath);
        dataFrame.unpersist(true);
        sparkSession.close();//关闭连接
    }
    /**
     * 读取文件测试
     *
     * @param path
     */
    public void readParquet(String path) {
        SparkConf sparkConf = new SparkConf();
        sparkConf.set("spark.sql.parquet.binaryAsString", "true");
        sparkConf.set("spark.sql.parquet.writeLegacyFormat", "true");
        sparkConf.setMaster("local").setAppName("convertor");
        SparkSession sparkSession = SparkSession.builder()
        	.appName("convertor").config(sparkConf).getOrCreate();
        Dataset<Row> dataset = sparkSession.read().parquet(path);
        Dataset<Row> select = dataset.select("*");
        select.printSchema();//打印结构
        select.show();//打印内容
    }
    
}    
           

  为了方便此处使用SpringBootTest编写测试用例。

import com.lyan.parquet_convert.test.TestComponent;
import com.lyan.parquet_convert.test.TestEntity;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.springframework.boot.test.context.SpringBootTest;
import org.springframework.test.context.junit4.SpringRunner;
import javax.annotation.Resource;
import java.nio.charset.StandardCharsets;

@RunWith(SpringRunner.class)
@SpringBootTest(classes = ParquetConvertApplication.class)
public class TestConvertTest {
    @Resource
    private TestComponent testComponent;
    
    /**
     * 生成Parquet文件测试
     */
    @Test
    public void writeDataToParquetTest(){
        TestEntity testEntity = new TestEntity();
        testEntity.setIntValue(100);
        testEntity.setLongValue(200);
        testEntity.setDoubleValue(300);
        testEntity.setStringValue("测试");
        testEntity.setByteValue("不为空的值".getBytes(StandardCharsets.UTF_8));
        testComponent.sparkWriteToParquet(testEntity);
    }
    
    /**
     * 读取Parquet文件测试
     */
    @Test
    public void readParquetTest() {
        testComponent.readParquet("生成的文件目录");
    }
    
}    
           

  执行

writeDataToParquetTest()

方法生成文件。结果如下图所示:

Spark练习测试(二)——定义Parquet文件的字段结构定义生成Parquet文件的字段结构

  执行

readParquetTest

方法查看生成文件的结构和文件内容(路径设为生成文件的路径例:“C:\Users\Lenovo\Desktop\对比\spark\part-00000-0abbc6a3-d406-4467-b01e-0449742b5c0c-c000.snappy.parquet”),截取内容如下:

# Parquet文件结构信息
root
 |-- byteNone: binary (nullable = true)
 |-- byteValue: binary (nullable = true)
 |-- doubleValue: double (nullable = true)
 |-- intValue: integer (nullable = true)
 |-- longValue: long (nullable = true)
 |-- stringValue: string (nullable = true)
           
# Parquet文件内容信息
+--------+--------------------+-----------+--------+---------+-----------+
|byteNone|           byteValue|doubleValue|intValue|longValue|stringValue|
+--------+--------------------+-----------+--------+---------+-----------+
|    null|[E4 B8 8D E4 B8 B...|      300.0|     100|      200|       测试|
+--------+--------------------+-----------+--------+---------+-----------+
           

  查看结果可以看到字段顺序已经打乱(至少看着不舒服)。还有就是如果有的字段需要更改怎么办,主要是不想更改类的属性名称。想想属性名称一变

Get()

Set()

啥的都得改一遍,太费劲了!(主要是太LOW了)

定义Parquet文件的字段结构和字段顺序

  定义结构并不难,因为Spark提供了通过

StructType

生成Parqeut文件。通过它可以方便的完成字段名称的设置。接下来写个配置定一下要生成的文件结构和字段名称,这里将之前驼峰命名都改成下划线的:

import org.apache.spark.sql.types.*;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;

import java.util.ArrayList;
import java.util.List;
@Configuration
public class TestConfiguration {

    @Bean("testStructType")
    public StructType testStructType(){
        List<StructField> structFields = new ArrayList<>();
        structFields.add(new StructField("int_value", IntegerType$.MODULE$, true, Metadata.empty()));
        structFields.add(new StructField("long_value", LongType$.MODULE$, true, Metadata.empty()));
        structFields.add(new StructField("double_value", DoubleType$.MODULE$, true, Metadata.empty()));
        structFields.add(new StructField("string_value", StringType$.MODULE$, true, Metadata.empty()));
        structFields.add(new StructField("byte_value", BinaryType$.MODULE$, true, Metadata.empty()));
        structFields.add(new StructField("byte_none", BinaryType$.MODULE$, true, Metadata.empty()));
        return new StructType(structFields.toArray(new StructField[0]));
    }
    
}    
           

  接下来修改下TestComponent的

sparkWriteToParquet()

方法,注意最后字段的顺序是由

Row

(相当于表里的行)完成的,所以之前在TestEntity类中添加了方法进行转换。修改之后代码如下:

@Autowired
    @Qualifier("testStructType")
    private StructType testStructType;

    public void sparkWriteToParquet(TestEntity testEntity) {
        SparkConf sparkConf = new SparkConf();
        sparkConf.set("spark.sql.parquet.binaryAsString", "true");
        sparkConf.set("spark.sql.parquet.writeLegacyFormat", "true");
        sparkConf.setMaster("local").setAppName("convertor");
        //转换对象
        Stream<Row> rowStream = ListUtil.toList(testEntity).stream().map(TestEntity::conversionToRow);
        //写入数据
        SparkSession sparkSession = SparkSession.builder().appName("write").config(sparkConf).getOrCreate();
        Dataset<Row> dataFrame = sparkSession.createDataFrame(rowStream.collect(Collectors.toList()), testStructType);
        dataFrame.coalesce(1).write().mode(SaveMode.Append).parquet(sparkDirPath);
        dataFrame.unpersist(true);
        sparkSession.close();//关闭连接
    }
           

  省略生成文件的过程,直接查看文件结构和文件内容。只要查看字段名称和字段顺序。

# Parquet文件结构信息
root
 |-- int_value: integer (nullable = true)
 |-- long_value: long (nullable = true)
 |-- double_value: double (nullable = true)
 |-- string_value: string (nullable = true)
 |-- byte_value: binary (nullable = true)
 |-- byte_none: binary (nullable = true)
           
# Parquet文件内容信息
+---------+----------+------------+------------+--------------------+---------+
|int_value|long_value|double_value|string_value|          byte_value|byte_none|
+---------+----------+------------+------------+--------------------+---------+
|      100|       200|       300.0|        测试|[E4 B8 8D E4 B8 B...|     null|
+---------+----------+------------+------------+--------------------+---------+
           

  结果还算不打脸,字段名称已改变,字段顺序也符合预期。

继续阅读