天天看点

parquet java_Java操作parquet

这段时间因为项目,对parquet做了一系列研究,从写入跟踪到合并及spark使用等等场景。

选择parquet来对流数据进行序列化,用于后续离线分析的理由有以下几点:

1、流数据一般格式比较杂乱,可以跳过不符合条件的数据,只读取需要的数据,降低IO数据量。

2、网络流量数据量非常的庞大,即使过滤部分,还是非常吓人,parquet压缩编码可以降低磁盘存储空间。由于同一列的数据类型是一样的,可以使用更高效的压缩编码(例如RunLength Encoding和Delta Encoding)进一步节约存储空间。

3、后续分析,只读取需要的列,支持向量运算,能够获取更好的扫描性能

写入parquet的过程,代码部分还是比较简单的

1、创建schema

public void builderSchema(List> list) throws IllegalArgumentException, IOException {

MessageTypeBuilder builder = Types.buildMessage();

list.get(0).forEach((key, value) -> {

builder.optional(PrimitiveTypeName.BINARY).as(OriginalType.UTF8).named(key);

});

File file = new File("./conf/parquet.schema");

if (file.exists()) {

FileUtils.readLines(file).forEach(str -> {

builder.optional(PrimitiveTypeName.BINARY).as(OriginalType.UTF8).named(str);

});

}

MessageType schema = builder.named("aus");

hdfsConfig.set("parquet.example.schema", schema.toString());

factory = new SimpleGroupFactory(schema);

}

我这个是动态根据传入的数据,自动创建schema的,同时也支持手动配置

2、创建writer

public void refreshWriter(String path) throws IllegalArgumentException, IOException {

closeParquetWriter();

if (factory != null) {

writer = new ParquetWriter<>(new Path(path + "/" + new Date().getTime() + ".parquet"), hdfsConfig,

new GroupWriteSupport());

}

}

ps:声明ParquetWriter有非常多的构造,我这个选择的参数最少的。原因后面分析会讲。

3、写入数据

public void writeParquet(List> list) throws IOException {

for (Map map : list) {

Group group = factory.newGroup();

map.forEach((key, value) -> {

group.append(key, value);

});

writer.write(group);

}

}

以上的代码便可以实现把数据序列化成parquet格式文件。

在实际开发过程中,遇到过好几个坑,首先开始我选择的是另外一个构造器,详情如下,此构造器可以手动指定block,pagesize等大小。

public ParquetWriter(

Path file,

ParquetFileWriter.Mode mode,

WriteSupport writeSupport,

CompressionCodecName compressionCodecName,

int blockSize,

int pageSize,

int dictionaryPageSize,

boolean enableDictionary,

boolean validating,

WriterVersion writerVersion,

Configuration conf) throws IOException {

this(file, mode, writeSupport, compressionCodecName, blockSize,

validating, conf, MAX_PADDING_SIZE_DEFAULT,

ParquetProperties.builder()

.withPageSize(pageSize)

.withDictionaryPageSize(dictionaryPageSize)

.withDictionaryEncoding(enableDictionary)

.withWriterVersion(writerVersion)

.build());

}

在调试中,发现这几个参数死活都不生效,后来经过源码跟踪调试,发现,在parquet的底层他实现这样的功能,因为我HDFS的block大小为128M,而我代码中设置的为4K,所以死活都不生效,看着数据不停地写入,但是hdfs上文件大小没有丝毫变化,原来都是在内存中。

// use the default block size, unless row group size is larger

2 long dfsBlockSize = Math.max(fs.getDefaultBlockSize(file), rowGroupSize);

即获取了DFS的block块大小,然后跟设置的值比较取最大值。

有一种办法可以强制性提交到服务器,那就是调用writer的close方法。但问题是,close当前这个文件之后,下次就不能再打开续写的,parquet只有两种模式,要么创建,要么覆盖。所以对于流数据场景,想要比较好的实时性,那就会创建非常多的小文件,这对hdfs的压力是非常大的。所以,在项目中,我选择了定时刷新writer,意思就是每隔一个小时,或者每隔一天来创建一个writer,这样可以保证一个文件不至于太小,且可以及时关闭掉,好让spark读取。(ps:未close掉的parquet文件,spark是没法加载的,会提示不是parquet格式的文件)

还有一种办法可以合并parquet小文件,在spark研究中发现,有这样一个特性,coalesce函数可以指定block个数来输出,并且可以加载父目录下全部的parquet文件,所以可实现将多个parquet文件合并成一个文件。

parquet java_Java操作parquet

image.png

可以看到之前目录下有4001个文件:

parquet java_Java操作parquet

image.png

处理之后,可以发现成功合并:

parquet java_Java操作parquet

image.png

至于spark读取parquet文件,进行分析,就非常简单了,用spark-shell做一个简单的演示:

parquet java_Java操作parquet

image.png