天天看點

Parquet檔案測試(二)——Java方式對Parquet檔案進行檔案生成和解析Java方式對Parquet檔案進行檔案生成和解析

Java方式對Parquet檔案進行檔案生成和解析

  此處屬于對Parquet檔案測試(一)——使用Java方式生成Parqeut格式檔案并直接入庫的Hive中的補充,因為之前隻是寫了生成,并沒有寫如何解析,其次就是弄懂結構定義的問題。最終目的是生成正确的Parquet檔案,使用Spark可以正常的讀取檔案内容(可參考Spark練習測試(二)——定義Parquet檔案的字段結構)。

測試準備

  首先定義一個結構,到時候生成的Parquet檔案會儲存如下結構的内容:

import lombok.Data;
import org.apache.spark.sql.Row;
import org.apache.spark.sql.RowFactory;

/**
 * 測試結構
 * 屬性為表字段
 */
@Data
public class TestEntity {
    private int intValue;
    private long longValue;
    private double doubleValue;
    private String stringValue;
    private byte[] byteValue;
    private byte[] byteNone;
}
           

  生成Parquet檔案的測試用例代碼如下:

import lombok.RequiredArgsConstructor;
import org.apache.hadoop.fs.Path;
import org.apache.parquet.column.ParquetProperties;
import org.apache.parquet.example.data.Group;
import org.apache.parquet.example.data.simple.SimpleGroupFactory;
import org.apache.parquet.hadoop.ParquetFileWriter;
import org.apache.parquet.hadoop.ParquetWriter;
import org.apache.parquet.hadoop.example.ExampleParquetWriter;
import org.apache.parquet.hadoop.metadata.CompressionCodecName;
import org.apache.parquet.io.api.Binary;
import org.apache.parquet.schema.MessageType;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Qualifier;
import org.springframework.stereotype.Component;

import java.io.IOException;

@Component
@RequiredArgsConstructor
public class TestComponent {

    @Autowired
    @Qualifier("testMessageType")
    private MessageType testMessageType;

    private static final String javaDirPath = "C:\\Users\\Lenovo\\Desktop\\對比\\java\\";

    /**
     * 檔案寫入parquet
     */
    public void javaWriteToParquet(TestEntity testEntity) throws IOException {
        String filePath = javaDirPath + System.currentTimeMillis() + ".parquet";
        ParquetWriter<Group> parquetWriter = ExampleParquetWriter.builder(new Path(filePath))
                .withWriteMode(ParquetFileWriter.Mode.CREATE)
                .withWriterVersion(ParquetProperties.WriterVersion.PARQUET_1_0)
                .withCompressionCodec(CompressionCodecName.SNAPPY)
                .withType(testMessageType).build();
        //寫入資料
        SimpleGroupFactory simpleGroupFactory = new SimpleGroupFactory(testMessageType);
        Group group = simpleGroupFactory.newGroup();
        group.add("intValue", testEntity.getIntValue());
        group.add("longValue", testEntity.getLongValue());
        group.add("doubleValue", testEntity.getDoubleValue());
        group.add("stringValue", testEntity.getStringValue());
        group.add("byteValue", Binary.fromConstantByteArray(testEntity.getByteValue()));
        group.add("byteNone", Binary.EMPTY);
        parquetWriter.write(group);
        parquetWriter.close();
    }
}
           

  ※在配置字段結構的時候會有個問題如何配置字段的重複性。因為代碼都是粘過來了一開始讓人十分困惑,這玩意有啥作用。先來看下它們的經典說明(百度來的還是很靠譜的):

方式 說明
REQUIRED 出現 1 次
OPTIONAL 出現 0 次或者 1 次
REPEATED 出現 0 次或者多次

  ※當然最讓人困惑的就是什麼TMD是1次、什麼TMD是0次、什麼TMD是多次。

定義結構配置字段屬性使用REQUIRED

  此處還是使用SpringBootTest建立測試用例。接下來配置個Parquet的結構:

import org.apache.parquet.schema.LogicalTypeAnnotation;
import org.apache.parquet.schema.MessageType;
import org.apache.parquet.schema.Types;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;

import static org.apache.parquet.schema.PrimitiveType.PrimitiveTypeName.*;

@Configuration
public class TestConfiguration {

    @Bean("testMessageType")
    public MessageType testMessageType() {
        Types.MessageTypeBuilder messageTypeBuilder = Types.buildMessage();
        messageTypeBuilder.required(INT32).named("intValue");
        messageTypeBuilder.required(INT64).named("longValue");
        messageTypeBuilder.required(DOUBLE).named("doubleValue");
        messageTypeBuilder.required(BINARY).as(LogicalTypeAnnotation.stringType()).named("stringValue");
        messageTypeBuilder.required(BINARY).as(LogicalTypeAnnotation.bsonType()).named("byteValue");
        messageTypeBuilder.required(BINARY).as(LogicalTypeAnnotation.bsonType()).named("byteNone");
        return messageTypeBuilder.named("test");
    }

}
           

  接下來執行測試方法生成Parquet檔案。

import com.lyan.parquet_convert.test.TestComponent;
import com.lyan.parquet_convert.test.TestEntity;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.springframework.boot.test.context.SpringBootTest;
import org.springframework.test.context.junit4.SpringRunner;

import javax.annotation.Resource;
import java.io.IOException;
import java.nio.charset.StandardCharsets;

@RunWith(SpringRunner.class)
@SpringBootTest(classes = ParquetConvertApplication.class)
public class TestConvertTest {

    @Resource
    private TestComponent testComponent;

    @Test
    public void startTest() throws IOException {
        TestEntity testEntity = new TestEntity();
        testEntity.setIntValue(100);
        testEntity.setLongValue(200);
        testEntity.setDoubleValue(300);
        testEntity.setStringValue("測試");
        testEntity.setByteValue("不為空的值".getBytes(StandardCharsets.UTF_8));
        testComponent.javaWriteToParquet(testEntity);
    }
           

  生成Parquet檔案的部分日志内容如下。

# Parquet檔案結構資訊
root
 |-- intValue: integer (nullable = true)
 |-- longValue: long (nullable = true)
 |-- doubleValue: double (nullable = true)
 |-- stringValue: string (nullable = true)
 |-- byteValue: binary (nullable = true)
 |-- byteNone: binary (nullable = true)
           
# Parquet檔案内容資訊
+--------+---------+-----------+-----------+--------------------+--------+
|intValue|longValue|doubleValue|stringValue|           byteValue|byteNone|
+--------+---------+-----------+-----------+--------------------+--------+
|     100|      200|      300.0|       測試|[E4 B8 8D E4 B8 B...|      []|
+--------+---------+-----------+-----------+--------------------+--------+
           

  這裡有個疑問,比如byteNone字段就是個空值,怎麼能讓讓展示的是null,當然

TestEntity.setByteNone()

。是肯定不行的。指派隻能在

Group.add()

的上入手。那好,幹脆就不填這個值了,直接注釋:

Parquet檔案測試(二)——Java方式對Parquet檔案進行檔案生成和解析Java方式對Parquet檔案進行檔案生成和解析

  運作後結果十分令人滿意那就是報錯。報錯會大緻出現兩種,具體原因就不分析了。要不是字段順序的問題,要不就是字段類型的原因導緻有的報錯是在檔案生成時報錯,有的是在解析時報錯。但已經不重要了。重要的是說明目前定義的結構必須得有内容,也就是REQUIRED修飾的字段内容不能為空。

定義結構配置字段屬性使用OPTIONAL

  有了之前的測試結果,這塊就好測試了。接着上頭繼續修改,直接修改結果定義的内容。修改

byteNone

字段的定義為OPTIONAL。

Parquet檔案測試(二)——Java方式對Parquet檔案進行檔案生成和解析Java方式對Parquet檔案進行檔案生成和解析

  執行測試方法生成檔案,部分日志内容如下:

# Parquet檔案内容資訊
+--------+---------+-----------+-----------+--------------------+--------+
|intValue|longValue|doubleValue|stringValue|           byteValue|byteNone|
+--------+---------+-----------+-----------+--------------------+--------+
|     100|      200|      300.0|       測試|[E4 B8 8D E4 B8 B...|    null|
+--------+---------+-----------+-----------+--------------------+--------+
           

  此處明顯可以看到

byteNone

字段的内容由[ ]變為了null.。是以OPTIONAL修飾的字段内容就是可以為null,也就是0次或多次。

定義結構配置字段屬性使用REPEATED

  這個就更容易看出差別了。此處多改幾個字段使用REPEATED方式:

Parquet檔案測試(二)——Java方式對Parquet檔案進行檔案生成和解析Java方式對Parquet檔案進行檔案生成和解析

  生成檔案檢視檔案結構資訊和檔案内容資訊:

# Parquet檔案結構資訊
root
 |-- intValue: array (nullable = true)
 |    |-- element: integer (containsNull = true)
 |-- longValue: array (nullable = true)
 |    |-- element: long (containsNull = true)
 |-- doubleValue: array (nullable = true)
 |    |-- element: double (containsNull = true)
 |-- stringValue: string (nullable = true)
 |-- byteValue: binary (nullable = true)
 |-- byteNone: binary (nullable = true)
           
# Parquet檔案内容資訊
+--------+---------+-----------+-----------+--------------------+--------+
|intValue|longValue|doubleValue|stringValue|           byteValue|byteNone|
+--------+---------+-----------+-----------+--------------------+--------+
|   [100]|    [200]|    [300.0]|       測試|[E4 B8 8D E4 B8 B...|    null|
+--------+---------+-----------+-----------+--------------------+--------+
           

  好了到這裡就十厘清楚了,這就是數組嘛!REPEATED就是數組也就是0次或多次。

解析Parquet檔案内容

  解析Parquet檔案十分簡單代碼不多:

import org.apache.hadoop.fs.Path;
import org.apache.parquet.example.data.Group;
import org.apache.parquet.hadoop.ParquetReader;
import org.apache.parquet.hadoop.example.GroupReadSupport;
           
public void javaReadParquet(String path) throws IOException {
        GroupReadSupport readSupport = new GroupReadSupport();
        ParquetReader.Builder<Group> reader = ParquetReader.builder(readSupport, new Path(path));
        ParquetReader<Group> build = reader.build();
        Group line;
        while ((line = build.read()) != null) {
            System.out.println(line.getInteger("intValue",0));
            System.out.println(line.getLong("longValue",0));
            System.out.println(line.getDouble("doubleValue",0));
            System.out.println(line.getString("stringValue",0));
            System.out.println(new String(line.getBinary("byteValue",0).getBytes()));
            System.out.println(new String(line.getBinary("byteNone",0).getBytes()));
        }
        build.close();
    }
           

  配置上之前的檔案全路徑,解析結果如下:

2021-05-25 14:54:21.721  INFO 4860 --- [           main] o.a.p.h.InternalParquetRecordReader      : RecordReader initialized will read a total of 1 records.
2021-05-25 14:54:21.722  INFO 4860 --- [           main] o.a.p.h.InternalParquetRecordReader      : at row 0. reading next block
2021-05-25 14:54:21.739  INFO 4860 --- [           main] org.apache.hadoop.io.compress.CodecPool  : Got brand-new decompressor [.snappy]
2021-05-25 14:54:21.743  INFO 4860 --- [           main] o.a.p.h.InternalParquetRecordReader      : block read in memory in 21 ms. row count = 1
100
200
300.0
測試
不為空的值
           

繼續閱讀