轉:http://blog.csdn.net/woloqun/article/details/76068147
摘要
本文将介紹常用parquet檔案讀寫的幾種方式
1.用spark的hadoopFile api讀取hive中的parquet格式檔案
2.用sparkSql讀寫hive中的parquet格式
3.用新舊MapReduce讀寫parquet格式檔案
讀parquet檔案
首先建立hive表,資料用tab分隔
[java] view plain copy
- create table test(name string,age int)
- row format delimited
- fields terminated by '\t';
加載資料
[java] view plain copy
- load data local inpath '/home/work/test/ddd.txt' into table test;
資料樣例格式:
[java] view plain copy
- hive> select * from test limit 5;
- OK
- leo 27
- jim 38
- leo 15
- jack 22
- jay 7
- Time taken: 0.101 seconds, Fetched: 5 row(s)
建立parquet格式表 [java] view plain copy
- create table test_parquet(name string,age int) stored as parquet
檢視表結構 [java] view plain copy
- hive> show create table test_parquet;
- OK
- CREATE TABLE `test_parquet`(
- `name` string,
- `age` int)
- ROW FORMAT SERDE
- 'org.apache.hadoop.hive.ql.io.parquet.serde.ParquetHiveSerDe'
- STORED AS INPUTFORMAT
- 'org.apache.hadoop.hive.ql.io.parquet.MapredParquetInputFormat'
- OUTPUTFORMAT
- 'org.apache.hadoop.hive.ql.io.parquet.MapredParquetOutputFormat'
- LOCATION
- 'hdfs://localhost:9000/user/hive/warehouse/test_parquet'
- TBLPROPERTIES (
- 'transient_lastDdlTime'='1495038003')
可以看到資料的inputFormat是MapredParquetInputFormat,之後我們将用這個類來解析資料檔案
往parquet格式表中插入資料
[java] view plain copy
- insert into table test_parquet select * from test;
a.用spark中hadoopFile api解析hive中parquet格式檔案
如果是用spark-shell中方式讀取檔案一定要将hive-exec-0.14.0.jar加入到啟動指令行中(MapredParquetInputFormat在這個jar中),還有就是要指定序列化的類,啟動指令行如下
[java] view plain copy
- spark-shell --master spark://xiaobin:7077 --jars /home/xiaobin/soft/apache-hive-0.14.0-bin/lib/hive-exec-0.14.0.jar
- --conf spark.serializer=org.apache.spark.serializer.KryoSerializer
具體讀取代碼如下
[java] view plain copy
- scala> import org.apache.hadoop.hive.ql.io.parquet.MapredParquetInputFormat
- import org.apache.hadoop.hive.ql.io.parquet.MapredParquetInputFormat
- scala> import org.apache.hadoop.io.{ArrayWritable, NullWritable, Text}
- import org.apache.hadoop.io.{ArrayWritable, NullWritable, Text}
- scala> val file =sc.hadoopFile("hdfs://localhost:9000/user/hive/warehouse/test_parquet/000000_0",
- | classOf[MapredParquetInputFormat],classOf[Void],classOf[ArrayWritable])
- file: org.apache.spark.rdd.RDD[(Void, org.apache.hadoop.io.ArrayWritable)] =
- hdfs://localhost:9000/user/hive/warehouse/test_parquet/000000_0 HadoopRDD[0] at hadoopFile at <console>:29
- scala> file.take(10).foreach{case(k,v)=>
- | val writables = v.get()
- | val name = writables(0)
- | val age = writables(1)
- | println(writables.length+" "+name+" "+age)
- | }
用MapredParquetInputFormat解析hive中parquet格式檔案,每行資料将會解析成一個key和value,這裡的key是空值,value是一個ArrayWritable,value的長度和表的列個數一樣,value各個元素對應hive表中行各個字段的值
b.用spark DataFrame 解析parquet檔案
[java] view plain copy
- val conf = new SparkConf().setAppName("test").setMaster("local")
- val sc = new SparkContext(conf)
- val sqlContext = new org.apache.spark.sql.SQLContext(sc)
- val parquet: DataFrame =
- sqlContext.read.parquet("hdfs://192.168.1.115:9000/user/hive/warehouse/test_parquet")
- parquet.printSchema()
- parquet.select(parquet("name"), parquet("age") + 1).show()
- root
- |-- name: string (nullable = true)
- |-- age: integer (nullable = true)
- +----+---------+
- |name|(age + 1)|
- +----+---------+
- | leo| 28|
- | jim| 39|
- | leo| 16|
- |jack| 23|
- | jay| 8|
- | jim| 38|
- |jack| 37|
- | jay| 12|
c.用hivesql直接讀取hive表
在local模式下沒有測試成功,打包用spark-submit測試,代碼如下
[java] view plain copy
- val conf = new SparkConf().setAppName("test")
- val sc = new SparkContext(conf)
- val hiveContext = new HiveContext(sc)
- val sql: DataFrame = hiveContext.sql("select * from test_parquet limit 10")
- sql.take(10).foreach(println)
- [leo,27]
- [jim,38]
- [leo,15]
- [jack,22]
- [jay,7]
- [jim,37]
- [jack,36]
- [jay,11]
- [leo,35]
- [leo,33]
送出任務指令行
[java] view plain copy
- spark-submit --class quickspark.QuickSpark02 --master spark://192.168.1.115:7077 sparkcore-1.0-SNAPSHOT.jar
寫parquet檔案
a.用spark寫parquet檔案
[java] view plain copy
- val conf = new SparkConf().setAppName("test").setMaster("local")
- val sc = new SparkContext(conf)
- val sqlContext = new org.apache.spark.sql.SQLContext(sc)
- // 讀取檔案生成RDD
- val file = sc.textFile("hdfs://192.168.1.115:9000/test/user.txt")
- //定義parquet的schema,資料字段和資料類型需要和hive表中的字段和資料類型相同,否則hive表無法解析
- val schema = (new StructType)
- .add("name", StringType, true)
- .add("age", IntegerType, false)
- val rowRDD = file.map(_.split("\t")).map(p => Row(p(0), Integer.valueOf(p(1).trim)))
- // 将RDD裝換成DataFrame
- val peopleDataFrame = sqlContext.createDataFrame(rowRDD, schema)
- peopleDataFrame.registerTempTable("people")
- peopleDataFrame.write.parquet("hdfs://192.168.1.115:9000/user/hive/warehouse/test_parquet/")
用hivesql讀取用spark DataFrame生成的parquet檔案
[java] view plain copy
- hive> select * from test_parquet limit 10;
- OK
- SLF4J: Failed to load class "org.slf4j.impl.StaticLoggerBinder".
- SLF4J: Defaulting to no-operation (NOP) logger implementation
- SLF4J: See http://www.slf4j.org/codes.html#StaticLoggerBinder for further details.
- leo 27
- jim 38
- leo 15
- jack 22
- jay 7
- jim 37
- jack 36
- jay 11
- leo 35
- leo 33
b.用MapReduce寫parquet檔案
用MR讀寫parquet檔案,剛開始打算使用hive中指定的org.apache.hadoop.hive.ql.io.parquet.MapredParquetOutputFormat這個類,但是這個類的getRecordWriter方法沒實作,直接抛出異常
[java] view plain copy
- @Override
- public RecordWriter<Void, ArrayWritable> getRecordWriter(
- final FileSystem ignored,
- final JobConf job,
- final String name,
- final Progressable progress
- ) throws IOException {
- throw new RuntimeException("Should never be used");
- }
是以使用官方提供的parquet解析方式,github位址:https://github.com/apache/parquet-mr/,導入依賴
[java] view plain copy
- <dependency>
- <groupId>org.apache.parquet</groupId>
- <artifactId>parquet-common</artifactId>
- <version>1.8.1</version>
- </dependency>
- <dependency>
- <groupId>org.apache.parquet</groupId>
- <artifactId>parquet-encoding</artifactId>
- <version>1.8.1</version>
- </dependency>
- <dependency>
- <groupId>org.apache.parquet</groupId>
- <artifactId>parquet-column</artifactId>
- <version>1.8.1</version>
- </dependency>
- <dependency>
- <groupId>org.apache.parquet</groupId>
- <artifactId>parquet-hadoop</artifactId>
- <version>1.8.1</version>
- </dependency>
Parquet讀寫有新舊兩個版本,主要是新舊MR api之分,我們用新舊老版本的MR實作下parquet檔案的讀寫
舊版本如下
[java] view plain copy
- package com.fan.hadoop.parquet;
- import java.io.IOException;
- import java.util.*;
- import org.apache.hadoop.fs.Path;
- import org.apache.hadoop.io.*;
- import org.apache.hadoop.mapred.*;
- import org.apache.parquet.hadoop.example.GroupWriteSupport;
- import org.apache.parquet.example.data.Group;
- import org.apache.parquet.example.data.simple.SimpleGroupFactory;
- import org.apache.parquet.hadoop.mapred.DeprecatedParquetOutputFormat;
- import org.apache.parquet.schema.MessageTypeParser;
- public class ParquetMR {
- public static class Map extends MapReduceBase implements
- Mapper<LongWritable, Text, Text, IntWritable> {
- private final static IntWritable one = new IntWritable(1);
- private Text word = new Text();
- public void map(LongWritable key, Text value,
- OutputCollector<Text, IntWritable> output,
- Reporter reporter) throws IOException {
- String line = value.toString();
- StringTokenizer tokenizer = new StringTokenizer(line);
- while (tokenizer.hasMoreTokens()) {
- word.set(tokenizer.nextToken());
- output.collect(word, one);
- }
- }
- }
- public static class Reduce extends MapReduceBase implements
- Reducer<Text, IntWritable, Void, Group> {
- private SimpleGroupFactory factory;
- public void reduce(Text key, Iterator<IntWritable> values,
- OutputCollector<Void, Group> output,
- Reporter reporter) throws IOException {
- int sum = 0;
- while (values.hasNext()) {
- sum += values.next().get();
- }
- Group group = factory.newGroup()
- .append("name", key.toString())
- .append("age", sum);
- output.collect(null,group);
- }
- @Override
- public void configure(JobConf job) {
- factory = new SimpleGroupFactory(GroupWriteSupport.getSchema(job));
- }
- }
- public static void main(String[] args) throws Exception {
- JobConf conf = new JobConf(ParquetMR.class);
- conf.setJobName("wordcount");
- String in = "hdfs://localhost:9000/test/wordcount.txt";
- String out = "hdfs://localhost:9000/test/wd";
- String writeSchema = "message example {\n" +
- "required binary name;\n" +
- "required int32 age;\n" +
- "}";
- conf.setMapOutputKeyClass(Text.class);
- conf.setMapOutputValueClass(IntWritable.class);
- conf.setOutputKeyClass(NullWritable.class);
- conf.setOutputValueClass(Group.class);
- conf.setMapperClass(Map.class);
- conf.setReducerClass(Reduce.class);
- conf.setInputFormat(TextInputFormat.class);
- conf.setOutputFormat(DeprecatedParquetOutputFormat.class);
- FileInputFormat.setInputPaths(conf, new Path(in));
- DeprecatedParquetOutputFormat.setWriteSupportClass(conf, GroupWriteSupport.class);
- GroupWriteSupport.setSchema(MessageTypeParser.parseMessageType(writeSchema), conf);
- DeprecatedParquetOutputFormat.setOutputPath(conf, new Path(out));
- JobClient.runJob(conf);
- }
- }
生成的檔案:
[java] view plain copy
- hadoop dfs -ls /test/wd
- Found 2 items
- -rw-r--r-- 3 work supergroup 0 2017-07-18 17:41 /test/wd/_SUCCESS
- -rw-r--r-- 3 work supergroup 392 2017-07-18 17:41 /test/wd/part-00000-r-00000.parquet
将生成的檔案複制到hive表test_parquet的路徑下:
[java] view plain copy
- hadoop dfs -cp /test/wd/part-00000-r-00000.parquet /user/work/warehouse/test_parquet/
測試hive表讀取parquet檔案
[java] view plain copy
- hive> select * from test_parquet limit 10;
- OK
- action 2
- hadoop 2
- hello 3
- in 2
- presto 1
- spark 1
- world 1
- Time taken: 0.056 seconds, Fetched: 7 row(s)
新版本如下
新版本的MR讀寫Parquet和老版本有點差別,schema必須用在conf中設定,其他的差別不大
[java] view plain copy
- conf.set("parquet.example.schema",writeSchema);
還是貼下完整的代碼
[java] view plain copy
- package com.fan.hadoop.parquet;
- import org.apache.hadoop.conf.Configuration;
- import org.apache.hadoop.fs.Path;
- import org.apache.hadoop.io.IntWritable;
- import org.apache.hadoop.io.LongWritable;
- import org.apache.hadoop.io.Text;
- import org.apache.hadoop.mapreduce.Job;
- import org.apache.hadoop.mapreduce.Mapper;
- import org.apache.hadoop.mapreduce.Reducer;
- import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
- import org.apache.hadoop.mapreduce.lib.input.TextInputFormat;
- import org.apache.parquet.example.data.Group;
- import org.apache.parquet.example.data.simple.SimpleGroupFactory;
- import org.apache.parquet.hadoop.ParquetOutputFormat;
- import org.apache.parquet.hadoop.example.GroupWriteSupport;
- import java.io.IOException;
- import java.util.StringTokenizer;
- public class ParquetNewMR {
- public static class WordCountMap extends
- Mapper<LongWritable, Text, Text, IntWritable> {
- private final IntWritable one = new IntWritable(1);
- private Text word = new Text();
- public void map(LongWritable key, Text value, Context context)
- throws IOException, InterruptedException {
- String line = value.toString();
- StringTokenizer token = new StringTokenizer(line);
- while (token.hasMoreTokens()) {
- word.set(token.nextToken());
- context.write(word, one);
- }
- }
- }
- public static class WordCountReduce extends
- Reducer<Text, IntWritable, Void, Group> {
- private SimpleGroupFactory factory;
- public void reduce(Text key, Iterable<IntWritable> values,
- Context context) throws IOException, InterruptedException {
- int sum = 0;
- for (IntWritable val : values) {
- sum += val.get();
- }
- Group group = factory.newGroup()
- .append("name", key.toString())
- .append("age", sum);
- context.write(null,group);
- }
- @Override
- protected void setup(Context context) throws IOException, InterruptedException {
- super.setup(context);
- factory = new SimpleGroupFactory(GroupWriteSupport.getSchema(context.getConfiguration()));
- }
- }
- public static void main(String[] args) throws Exception {
- Configuration conf = new Configuration();
- String writeSchema = "message example {\n" +
- "required binary name;\n" +
- "required int32 age;\n" +
- "}";
- conf.set("parquet.example.schema",writeSchema);
- Job job = new Job(conf);
- job.setJarByClass(ParquetNewMR.class);
- job.setJobName("parquet");
- String in = "hdfs://localhost:9000/test/wordcount.txt";
- String out = "hdfs://localhost:9000/test/wd1";
- job.setMapOutputKeyClass(Text.class);
- job.setMapOutputValueClass(IntWritable.class);
- job.setOutputValueClass(Group.class);
- job.setMapperClass(WordCountMap.class);
- job.setReducerClass(WordCountReduce.class);
- job.setInputFormatClass(TextInputFormat.class);
- job.setOutputFormatClass(ParquetOutputFormat.class);
- FileInputFormat.addInputPath(job, new Path(in));
- ParquetOutputFormat.setOutputPath(job, new Path(out));
- ParquetOutputFormat.setWriteSupportClass(job, GroupWriteSupport.class);
- job.waitForCompletion(true);
- }
- }
檢視生成的檔案
[java] view plain copy
- hadoop dfs -ls /user/work/warehouse/test_parquet
- Found 4 items
- -rw-r--r-- 1 work work 0 2017-07-18 18:27 /user/work/warehouse/test_parquet/_SUCCESS
- -rw-r--r-- 1 work work 129 2017-07-18 18:27 /user/work/warehouse/test_parquet/_common_metadata
- -rw-r--r-- 1 work work 275 2017-07-18 18:27 /user/work/warehouse/test_parquet/_metadata
- -rw-r--r-- 1 work work 392 2017-07-18 18:27 /user/work/warehouse/test_parquet/part-r-00000.parquet
将生成的檔案複制到hive表test_parquet的路徑下:
[java] view plain copy
- hadoop dfs -cp /test/wd/part-00000-r-00000.parquet /user/work/warehouse/test_parquet/
測試hive
[java] view plain copy
- hive> select name,age from test_parquet limit 10;
- OK
- action 2
- hadoop 2
- hello 3
- in 2
- presto 1
- spark 1
- world 1
- Time taken: 0.036 seconds, Fetched: 7 row(s)
用mapreduce讀parquet檔案
[java] view plain copy
- package com.fan.hadoop.parquet;
- import org.apache.hadoop.conf.Configuration;
- import org.apache.hadoop.fs.Path;
- import org.apache.hadoop.io.LongWritable;
- import org.apache.hadoop.io.Text;
- import org.apache.hadoop.mapreduce.Job;
- import org.apache.hadoop.mapreduce.Mapper;
- import org.apache.hadoop.mapreduce.Reducer;
- import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
- import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat;
- import org.apache.parquet.example.data.Group;
- import org.apache.parquet.hadoop.ParquetInputFormat;
- import org.apache.parquet.hadoop.api.DelegatingReadSupport;
- import org.apache.parquet.hadoop.api.InitContext;
- import org.apache.parquet.hadoop.api.ReadSupport;
- import org.apache.parquet.hadoop.example.GroupReadSupport;
- import java.io.IOException;
- import java.util.*;
- public class ParquetNewMRReader {
- public static class WordCountMap1 extends
- Mapper<Void, Group, LongWritable, Text> {
- protected void map(Void key, Group value,
- Mapper<Void, Group, LongWritable, Text>.Context context)
- throws IOException, InterruptedException {
- String name = value.getString("name",0);
- int age = value.getInteger("age",0);
- context.write(new LongWritable(age),
- new Text(name));
- }
- }
- public static class WordCountReduce1 extends
- Reducer<LongWritable, Text, LongWritable, Text> {
- public void reduce(LongWritable key, Iterable<Text> values,
- Context context) throws IOException, InterruptedException {
- Iterator<Text> iterator = values.iterator();
- while(iterator.hasNext()){
- context.write(key,iterator.next());
- }
- }
- }
- public static final class MyReadSupport extends DelegatingReadSupport<Group> {
- public MyReadSupport() {
- super(new GroupReadSupport());
- }
- @Override
- public org.apache.parquet.hadoop.api.ReadSupport.ReadContext init(InitContext context) {
- return super.init(context);
- }
- }
- public static void main(String[] args) throws Exception {
- Configuration conf = new Configuration();
- String readSchema = "message example {\n" +
- "required binary name;\n" +
- "required int32 age;\n" +
- "}";
- conf.set(ReadSupport.PARQUET_READ_SCHEMA, readSchema);
- Job job = new Job(conf);
- job.setJarByClass(ParquetNewMRReader.class);
- job.setJobName("parquet");
- String in = "hdfs://localhost:9000/test/wd1";
- String out = "hdfs://localhost:9000/test/wd2";
- job.setMapperClass(WordCountMap1.class);
- job.setReducerClass(WordCountReduce1.class);
- job.setInputFormatClass(ParquetInputFormat.class);
- ParquetInputFormat.setReadSupportClass(job, MyReadSupport.class);
- ParquetInputFormat.addInputPath(job, new Path(in));
- job.setOutputFormatClass(TextOutputFormat.class);
- FileOutputFormat.setOutputPath(job, new Path(out));
- job.waitForCompletion(true);
- }
- }
檢視生成的檔案
[java] view plain copy
- hadoop dfs -cat /test/wd2/part-r-00000
- 1 world
- 1 spark
- 1 presto
- 2 in
- 2 hadoop
- 2 action
- 3 hello