这段时间因为项目,对parquet做了一系列研究,从写入跟踪到合并及spark使用等等场景。
选择parquet来对流数据进行序列化,用于后续离线分析的理由有以下几点:
1、流数据一般格式比较杂乱,可以跳过不符合条件的数据,只读取需要的数据,降低IO数据量。
2、网络流量数据量非常的庞大,即使过滤部分,还是非常吓人,parquet压缩编码可以降低磁盘存储空间。由于同一列的数据类型是一样的,可以使用更高效的压缩编码(例如RunLength Encoding和Delta Encoding)进一步节约存储空间。
3、后续分析,只读取需要的列,支持向量运算,能够获取更好的扫描性能
写入parquet的过程,代码部分还是比较简单的
1、创建schema
public void builderSchema(List> list) throws IllegalArgumentException, IOException {
MessageTypeBuilder builder = Types.buildMessage();
list.get(0).forEach((key, value) -> {
builder.optional(PrimitiveTypeName.BINARY).as(OriginalType.UTF8).named(key);
});
File file = new File("./conf/parquet.schema");
if (file.exists()) {
FileUtils.readLines(file).forEach(str -> {
builder.optional(PrimitiveTypeName.BINARY).as(OriginalType.UTF8).named(str);
});
}
MessageType schema = builder.named("aus");
hdfsConfig.set("parquet.example.schema", schema.toString());
factory = new SimpleGroupFactory(schema);
}
我这个是动态根据传入的数据,自动创建schema的,同时也支持手动配置
2、创建writer
public void refreshWriter(String path) throws IllegalArgumentException, IOException {
closeParquetWriter();
if (factory != null) {
writer = new ParquetWriter<>(new Path(path + "/" + new Date().getTime() + ".parquet"), hdfsConfig,
new GroupWriteSupport());
}
}
ps:声明ParquetWriter有非常多的构造,我这个选择的参数最少的。原因后面分析会讲。
3、写入数据
public void writeParquet(List> list) throws IOException {
for (Map map : list) {
Group group = factory.newGroup();
map.forEach((key, value) -> {
group.append(key, value);
});
writer.write(group);
}
}
以上的代码便可以实现把数据序列化成parquet格式文件。
在实际开发过程中,遇到过好几个坑,首先开始我选择的是另外一个构造器,详情如下,此构造器可以手动指定block,pagesize等大小。
public ParquetWriter(
Path file,
ParquetFileWriter.Mode mode,
WriteSupport writeSupport,
CompressionCodecName compressionCodecName,
int blockSize,
int pageSize,
int dictionaryPageSize,
boolean enableDictionary,
boolean validating,
WriterVersion writerVersion,
Configuration conf) throws IOException {
this(file, mode, writeSupport, compressionCodecName, blockSize,
validating, conf, MAX_PADDING_SIZE_DEFAULT,
ParquetProperties.builder()
.withPageSize(pageSize)
.withDictionaryPageSize(dictionaryPageSize)
.withDictionaryEncoding(enableDictionary)
.withWriterVersion(writerVersion)
.build());
}
在调试中,发现这几个参数死活都不生效,后来经过源码跟踪调试,发现,在parquet的底层他实现这样的功能,因为我HDFS的block大小为128M,而我代码中设置的为4K,所以死活都不生效,看着数据不停地写入,但是hdfs上文件大小没有丝毫变化,原来都是在内存中。
// use the default block size, unless row group size is larger
2 long dfsBlockSize = Math.max(fs.getDefaultBlockSize(file), rowGroupSize);
即获取了DFS的block块大小,然后跟设置的值比较取最大值。
有一种办法可以强制性提交到服务器,那就是调用writer的close方法。但问题是,close当前这个文件之后,下次就不能再打开续写的,parquet只有两种模式,要么创建,要么覆盖。所以对于流数据场景,想要比较好的实时性,那就会创建非常多的小文件,这对hdfs的压力是非常大的。所以,在项目中,我选择了定时刷新writer,意思就是每隔一个小时,或者每隔一天来创建一个writer,这样可以保证一个文件不至于太小,且可以及时关闭掉,好让spark读取。(ps:未close掉的parquet文件,spark是没法加载的,会提示不是parquet格式的文件)
还有一种办法可以合并parquet小文件,在spark研究中发现,有这样一个特性,coalesce函数可以指定block个数来输出,并且可以加载父目录下全部的parquet文件,所以可实现将多个parquet文件合并成一个文件。
image.png
可以看到之前目录下有4001个文件:
image.png
处理之后,可以发现成功合并:
image.png
至于spark读取parquet文件,进行分析,就非常简单了,用spark-shell做一个简单的演示:
image.png