天天看點

parquet java_Java操作parquet

這段時間因為項目,對parquet做了一系列研究,從寫入跟蹤到合并及spark使用等等場景。

選擇parquet來對流資料進行序列化,用于後續離線分析的理由有以下幾點:

1、流資料一般格式比較雜亂,可以跳過不符合條件的資料,隻讀取需要的資料,降低IO資料量。

2、網絡流量資料量非常的龐大,即使過濾部分,還是非常吓人,parquet壓縮編碼可以降低磁盤存儲空間。由于同一列的資料類型是一樣的,可以使用更高效的壓縮編碼(例如RunLength Encoding和Delta Encoding)進一步節約存儲空間。

3、後續分析,隻讀取需要的列,支援向量運算,能夠擷取更好的掃描性能

寫入parquet的過程,代碼部分還是比較簡單的

1、建立schema

public void builderSchema(List> list) throws IllegalArgumentException, IOException {

MessageTypeBuilder builder = Types.buildMessage();

list.get(0).forEach((key, value) -> {

builder.optional(PrimitiveTypeName.BINARY).as(OriginalType.UTF8).named(key);

});

File file = new File("./conf/parquet.schema");

if (file.exists()) {

FileUtils.readLines(file).forEach(str -> {

builder.optional(PrimitiveTypeName.BINARY).as(OriginalType.UTF8).named(str);

});

}

MessageType schema = builder.named("aus");

hdfsConfig.set("parquet.example.schema", schema.toString());

factory = new SimpleGroupFactory(schema);

}

我這個是動态根據傳入的資料,自動建立schema的,同時也支援手動配置

2、建立writer

public void refreshWriter(String path) throws IllegalArgumentException, IOException {

closeParquetWriter();

if (factory != null) {

writer = new ParquetWriter<>(new Path(path + "/" + new Date().getTime() + ".parquet"), hdfsConfig,

new GroupWriteSupport());

}

}

ps:聲明ParquetWriter有非常多的構造,我這個選擇的參數最少的。原因後面分析會講。

3、寫入資料

public void writeParquet(List> list) throws IOException {

for (Map map : list) {

Group group = factory.newGroup();

map.forEach((key, value) -> {

group.append(key, value);

});

writer.write(group);

}

}

以上的代碼便可以實作把資料序列化成parquet格式檔案。

在實際開發過程中,遇到過好幾個坑,首先開始我選擇的是另外一個構造器,詳情如下,此構造器可以手動指定block,pagesize等大小。

public ParquetWriter(

Path file,

ParquetFileWriter.Mode mode,

WriteSupport writeSupport,

CompressionCodecName compressionCodecName,

int blockSize,

int pageSize,

int dictionaryPageSize,

boolean enableDictionary,

boolean validating,

WriterVersion writerVersion,

Configuration conf) throws IOException {

this(file, mode, writeSupport, compressionCodecName, blockSize,

validating, conf, MAX_PADDING_SIZE_DEFAULT,

ParquetProperties.builder()

.withPageSize(pageSize)

.withDictionaryPageSize(dictionaryPageSize)

.withDictionaryEncoding(enableDictionary)

.withWriterVersion(writerVersion)

.build());

}

在調試中,發現這幾個參數死活都不生效,後來經過源碼跟蹤調試,發現,在parquet的底層他實作這樣的功能,因為我HDFS的block大小為128M,而我代碼中設定的為4K,是以死活都不生效,看着資料不停地寫入,但是hdfs上檔案大小沒有絲毫變化,原來都是在記憶體中。

// use the default block size, unless row group size is larger

2 long dfsBlockSize = Math.max(fs.getDefaultBlockSize(file), rowGroupSize);

即擷取了DFS的block塊大小,然後跟設定的值比較取最大值。

有一種辦法可以強制性送出到伺服器,那就是調用writer的close方法。但問題是,close目前這個檔案之後,下次就不能再打開續寫的,parquet隻有兩種模式,要麼建立,要麼覆寫。是以對于流資料場景,想要比較好的實時性,那就會建立非常多的小檔案,這對hdfs的壓力是非常大的。是以,在項目中,我選擇了定時重新整理writer,意思就是每隔一個小時,或者每隔一天來建立一個writer,這樣可以保證一個檔案不至于太小,且可以及時關閉掉,好讓spark讀取。(ps:未close掉的parquet檔案,spark是沒法加載的,會提示不是parquet格式的檔案)

還有一種辦法可以合并parquet小檔案,在spark研究中發現,有這樣一個特性,coalesce函數可以指定block個數來輸出,并且可以加載父目錄下全部的parquet檔案,是以可實作将多個parquet檔案合并成一個檔案。

parquet java_Java操作parquet

image.png

可以看到之前目錄下有4001個檔案:

parquet java_Java操作parquet

image.png

處理之後,可以發現成功合并:

parquet java_Java操作parquet

image.png

至于spark讀取parquet檔案,進行分析,就非常簡單了,用spark-shell做一個簡單的示範:

parquet java_Java操作parquet

image.png