這段時間因為項目,對parquet做了一系列研究,從寫入跟蹤到合并及spark使用等等場景。
選擇parquet來對流資料進行序列化,用于後續離線分析的理由有以下幾點:
1、流資料一般格式比較雜亂,可以跳過不符合條件的資料,隻讀取需要的資料,降低IO資料量。
2、網絡流量資料量非常的龐大,即使過濾部分,還是非常吓人,parquet壓縮編碼可以降低磁盤存儲空間。由于同一列的資料類型是一樣的,可以使用更高效的壓縮編碼(例如RunLength Encoding和Delta Encoding)進一步節約存儲空間。
3、後續分析,隻讀取需要的列,支援向量運算,能夠擷取更好的掃描性能
寫入parquet的過程,代碼部分還是比較簡單的
1、建立schema
public void builderSchema(List> list) throws IllegalArgumentException, IOException {
MessageTypeBuilder builder = Types.buildMessage();
list.get(0).forEach((key, value) -> {
builder.optional(PrimitiveTypeName.BINARY).as(OriginalType.UTF8).named(key);
});
File file = new File("./conf/parquet.schema");
if (file.exists()) {
FileUtils.readLines(file).forEach(str -> {
builder.optional(PrimitiveTypeName.BINARY).as(OriginalType.UTF8).named(str);
});
}
MessageType schema = builder.named("aus");
hdfsConfig.set("parquet.example.schema", schema.toString());
factory = new SimpleGroupFactory(schema);
}
我這個是動态根據傳入的資料,自動建立schema的,同時也支援手動配置
2、建立writer
public void refreshWriter(String path) throws IllegalArgumentException, IOException {
closeParquetWriter();
if (factory != null) {
writer = new ParquetWriter<>(new Path(path + "/" + new Date().getTime() + ".parquet"), hdfsConfig,
new GroupWriteSupport());
}
}
ps:聲明ParquetWriter有非常多的構造,我這個選擇的參數最少的。原因後面分析會講。
3、寫入資料
public void writeParquet(List> list) throws IOException {
for (Map map : list) {
Group group = factory.newGroup();
map.forEach((key, value) -> {
group.append(key, value);
});
writer.write(group);
}
}
以上的代碼便可以實作把資料序列化成parquet格式檔案。
在實際開發過程中,遇到過好幾個坑,首先開始我選擇的是另外一個構造器,詳情如下,此構造器可以手動指定block,pagesize等大小。
public ParquetWriter(
Path file,
ParquetFileWriter.Mode mode,
WriteSupport writeSupport,
CompressionCodecName compressionCodecName,
int blockSize,
int pageSize,
int dictionaryPageSize,
boolean enableDictionary,
boolean validating,
WriterVersion writerVersion,
Configuration conf) throws IOException {
this(file, mode, writeSupport, compressionCodecName, blockSize,
validating, conf, MAX_PADDING_SIZE_DEFAULT,
ParquetProperties.builder()
.withPageSize(pageSize)
.withDictionaryPageSize(dictionaryPageSize)
.withDictionaryEncoding(enableDictionary)
.withWriterVersion(writerVersion)
.build());
}
在調試中,發現這幾個參數死活都不生效,後來經過源碼跟蹤調試,發現,在parquet的底層他實作這樣的功能,因為我HDFS的block大小為128M,而我代碼中設定的為4K,是以死活都不生效,看着資料不停地寫入,但是hdfs上檔案大小沒有絲毫變化,原來都是在記憶體中。
// use the default block size, unless row group size is larger
2 long dfsBlockSize = Math.max(fs.getDefaultBlockSize(file), rowGroupSize);
即擷取了DFS的block塊大小,然後跟設定的值比較取最大值。
有一種辦法可以強制性送出到伺服器,那就是調用writer的close方法。但問題是,close目前這個檔案之後,下次就不能再打開續寫的,parquet隻有兩種模式,要麼建立,要麼覆寫。是以對于流資料場景,想要比較好的實時性,那就會建立非常多的小檔案,這對hdfs的壓力是非常大的。是以,在項目中,我選擇了定時重新整理writer,意思就是每隔一個小時,或者每隔一天來建立一個writer,這樣可以保證一個檔案不至于太小,且可以及時關閉掉,好讓spark讀取。(ps:未close掉的parquet檔案,spark是沒法加載的,會提示不是parquet格式的檔案)
還有一種辦法可以合并parquet小檔案,在spark研究中發現,有這樣一個特性,coalesce函數可以指定block個數來輸出,并且可以加載父目錄下全部的parquet檔案,是以可實作将多個parquet檔案合并成一個檔案。
image.png
可以看到之前目錄下有4001個檔案:
image.png
處理之後,可以發現成功合并:
image.png
至于spark讀取parquet檔案,進行分析,就非常簡單了,用spark-shell做一個簡單的示範:
image.png