Java方式對Parquet檔案進行檔案生成和解析
此處屬于對Parquet檔案測試(一)——使用Java方式生成Parqeut格式檔案并直接入庫的Hive中的補充,因為之前隻是寫了生成,并沒有寫如何解析,其次就是弄懂結構定義的問題。最終目的是生成正确的Parquet檔案,使用Spark可以正常的讀取檔案内容(可參考Spark練習測試(二)——定義Parquet檔案的字段結構)。
測試準備
首先定義一個結構,到時候生成的Parquet檔案會儲存如下結構的内容:
import lombok.Data;
import org.apache.spark.sql.Row;
import org.apache.spark.sql.RowFactory;
/**
* 測試結構
* 屬性為表字段
*/
@Data
public class TestEntity {
private int intValue;
private long longValue;
private double doubleValue;
private String stringValue;
private byte[] byteValue;
private byte[] byteNone;
}
生成Parquet檔案的測試用例代碼如下:
import lombok.RequiredArgsConstructor;
import org.apache.hadoop.fs.Path;
import org.apache.parquet.column.ParquetProperties;
import org.apache.parquet.example.data.Group;
import org.apache.parquet.example.data.simple.SimpleGroupFactory;
import org.apache.parquet.hadoop.ParquetFileWriter;
import org.apache.parquet.hadoop.ParquetWriter;
import org.apache.parquet.hadoop.example.ExampleParquetWriter;
import org.apache.parquet.hadoop.metadata.CompressionCodecName;
import org.apache.parquet.io.api.Binary;
import org.apache.parquet.schema.MessageType;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Qualifier;
import org.springframework.stereotype.Component;
import java.io.IOException;
@Component
@RequiredArgsConstructor
public class TestComponent {
@Autowired
@Qualifier("testMessageType")
private MessageType testMessageType;
private static final String javaDirPath = "C:\\Users\\Lenovo\\Desktop\\對比\\java\\";
/**
* 檔案寫入parquet
*/
public void javaWriteToParquet(TestEntity testEntity) throws IOException {
String filePath = javaDirPath + System.currentTimeMillis() + ".parquet";
ParquetWriter<Group> parquetWriter = ExampleParquetWriter.builder(new Path(filePath))
.withWriteMode(ParquetFileWriter.Mode.CREATE)
.withWriterVersion(ParquetProperties.WriterVersion.PARQUET_1_0)
.withCompressionCodec(CompressionCodecName.SNAPPY)
.withType(testMessageType).build();
//寫入資料
SimpleGroupFactory simpleGroupFactory = new SimpleGroupFactory(testMessageType);
Group group = simpleGroupFactory.newGroup();
group.add("intValue", testEntity.getIntValue());
group.add("longValue", testEntity.getLongValue());
group.add("doubleValue", testEntity.getDoubleValue());
group.add("stringValue", testEntity.getStringValue());
group.add("byteValue", Binary.fromConstantByteArray(testEntity.getByteValue()));
group.add("byteNone", Binary.EMPTY);
parquetWriter.write(group);
parquetWriter.close();
}
}
※在配置字段結構的時候會有個問題如何配置字段的重複性。因為代碼都是粘過來了一開始讓人十分困惑,這玩意有啥作用。先來看下它們的經典說明(百度來的還是很靠譜的):
方式 | 說明 |
---|---|
REQUIRED | 出現 1 次 |
OPTIONAL | 出現 0 次或者 1 次 |
REPEATED | 出現 0 次或者多次 |
※當然最讓人困惑的就是什麼TMD是1次、什麼TMD是0次、什麼TMD是多次。
定義結構配置字段屬性使用REQUIRED
此處還是使用SpringBootTest建立測試用例。接下來配置個Parquet的結構:
import org.apache.parquet.schema.LogicalTypeAnnotation;
import org.apache.parquet.schema.MessageType;
import org.apache.parquet.schema.Types;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import static org.apache.parquet.schema.PrimitiveType.PrimitiveTypeName.*;
@Configuration
public class TestConfiguration {
@Bean("testMessageType")
public MessageType testMessageType() {
Types.MessageTypeBuilder messageTypeBuilder = Types.buildMessage();
messageTypeBuilder.required(INT32).named("intValue");
messageTypeBuilder.required(INT64).named("longValue");
messageTypeBuilder.required(DOUBLE).named("doubleValue");
messageTypeBuilder.required(BINARY).as(LogicalTypeAnnotation.stringType()).named("stringValue");
messageTypeBuilder.required(BINARY).as(LogicalTypeAnnotation.bsonType()).named("byteValue");
messageTypeBuilder.required(BINARY).as(LogicalTypeAnnotation.bsonType()).named("byteNone");
return messageTypeBuilder.named("test");
}
}
接下來執行測試方法生成Parquet檔案。
import com.lyan.parquet_convert.test.TestComponent;
import com.lyan.parquet_convert.test.TestEntity;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.springframework.boot.test.context.SpringBootTest;
import org.springframework.test.context.junit4.SpringRunner;
import javax.annotation.Resource;
import java.io.IOException;
import java.nio.charset.StandardCharsets;
@RunWith(SpringRunner.class)
@SpringBootTest(classes = ParquetConvertApplication.class)
public class TestConvertTest {
@Resource
private TestComponent testComponent;
@Test
public void startTest() throws IOException {
TestEntity testEntity = new TestEntity();
testEntity.setIntValue(100);
testEntity.setLongValue(200);
testEntity.setDoubleValue(300);
testEntity.setStringValue("測試");
testEntity.setByteValue("不為空的值".getBytes(StandardCharsets.UTF_8));
testComponent.javaWriteToParquet(testEntity);
}
生成Parquet檔案的部分日志内容如下。
# Parquet檔案結構資訊
root
|-- intValue: integer (nullable = true)
|-- longValue: long (nullable = true)
|-- doubleValue: double (nullable = true)
|-- stringValue: string (nullable = true)
|-- byteValue: binary (nullable = true)
|-- byteNone: binary (nullable = true)
# Parquet檔案内容資訊
+--------+---------+-----------+-----------+--------------------+--------+
|intValue|longValue|doubleValue|stringValue| byteValue|byteNone|
+--------+---------+-----------+-----------+--------------------+--------+
| 100| 200| 300.0| 測試|[E4 B8 8D E4 B8 B...| []|
+--------+---------+-----------+-----------+--------------------+--------+
這裡有個疑問,比如byteNone字段就是個空值,怎麼能讓讓展示的是null,當然
TestEntity.setByteNone()
。是肯定不行的。指派隻能在
Group.add()
的上入手。那好,幹脆就不填這個值了,直接注釋:
運作後結果十分令人滿意那就是報錯。報錯會大緻出現兩種,具體原因就不分析了。要不是字段順序的問題,要不就是字段類型的原因導緻有的報錯是在檔案生成時報錯,有的是在解析時報錯。但已經不重要了。重要的是說明目前定義的結構必須得有内容,也就是REQUIRED修飾的字段内容不能為空。
定義結構配置字段屬性使用OPTIONAL
有了之前的測試結果,這塊就好測試了。接着上頭繼續修改,直接修改結果定義的内容。修改
byteNone
字段的定義為OPTIONAL。
執行測試方法生成檔案,部分日志内容如下:
# Parquet檔案内容資訊
+--------+---------+-----------+-----------+--------------------+--------+
|intValue|longValue|doubleValue|stringValue| byteValue|byteNone|
+--------+---------+-----------+-----------+--------------------+--------+
| 100| 200| 300.0| 測試|[E4 B8 8D E4 B8 B...| null|
+--------+---------+-----------+-----------+--------------------+--------+
此處明顯可以看到
byteNone
字段的内容由[ ]變為了null.。是以OPTIONAL修飾的字段内容就是可以為null,也就是0次或多次。
定義結構配置字段屬性使用REPEATED
這個就更容易看出差別了。此處多改幾個字段使用REPEATED方式:
生成檔案檢視檔案結構資訊和檔案内容資訊:
# Parquet檔案結構資訊
root
|-- intValue: array (nullable = true)
| |-- element: integer (containsNull = true)
|-- longValue: array (nullable = true)
| |-- element: long (containsNull = true)
|-- doubleValue: array (nullable = true)
| |-- element: double (containsNull = true)
|-- stringValue: string (nullable = true)
|-- byteValue: binary (nullable = true)
|-- byteNone: binary (nullable = true)
# Parquet檔案内容資訊
+--------+---------+-----------+-----------+--------------------+--------+
|intValue|longValue|doubleValue|stringValue| byteValue|byteNone|
+--------+---------+-----------+-----------+--------------------+--------+
| [100]| [200]| [300.0]| 測試|[E4 B8 8D E4 B8 B...| null|
+--------+---------+-----------+-----------+--------------------+--------+
好了到這裡就十厘清楚了,這就是數組嘛!REPEATED就是數組也就是0次或多次。
解析Parquet檔案内容
解析Parquet檔案十分簡單代碼不多:
import org.apache.hadoop.fs.Path;
import org.apache.parquet.example.data.Group;
import org.apache.parquet.hadoop.ParquetReader;
import org.apache.parquet.hadoop.example.GroupReadSupport;
public void javaReadParquet(String path) throws IOException {
GroupReadSupport readSupport = new GroupReadSupport();
ParquetReader.Builder<Group> reader = ParquetReader.builder(readSupport, new Path(path));
ParquetReader<Group> build = reader.build();
Group line;
while ((line = build.read()) != null) {
System.out.println(line.getInteger("intValue",0));
System.out.println(line.getLong("longValue",0));
System.out.println(line.getDouble("doubleValue",0));
System.out.println(line.getString("stringValue",0));
System.out.println(new String(line.getBinary("byteValue",0).getBytes()));
System.out.println(new String(line.getBinary("byteNone",0).getBytes()));
}
build.close();
}
配置上之前的檔案全路徑,解析結果如下:
2021-05-25 14:54:21.721 INFO 4860 --- [ main] o.a.p.h.InternalParquetRecordReader : RecordReader initialized will read a total of 1 records.
2021-05-25 14:54:21.722 INFO 4860 --- [ main] o.a.p.h.InternalParquetRecordReader : at row 0. reading next block
2021-05-25 14:54:21.739 INFO 4860 --- [ main] org.apache.hadoop.io.compress.CodecPool : Got brand-new decompressor [.snappy]
2021-05-25 14:54:21.743 INFO 4860 --- [ main] o.a.p.h.InternalParquetRecordReader : block read in memory in 21 ms. row count = 1
100
200
300.0
測試
不為空的值