天天看點

讀寫parquet格式檔案的幾種方式 轉:http://blog.csdn.net/woloqun/article/details/76068147 摘要

轉:http://blog.csdn.net/woloqun/article/details/76068147

摘要

本文将介紹常用parquet檔案讀寫的幾種方式

1.用spark的hadoopFile api讀取hive中的parquet格式檔案

2.用sparkSql讀寫hive中的parquet格式

3.用新舊MapReduce讀寫parquet格式檔案

讀parquet檔案

首先建立hive表,資料用tab分隔

[java]  view plain  copy

  1. create table test(name string,age int)   
  2.  row format delimited  
  3.  fields terminated by '\t';  

加載資料

[java]  view plain  copy

  1. load data local inpath '/home/work/test/ddd.txt' into table test;  

資料樣例格式:

[java]  view plain  copy

  1. hive> select * from test limit 5;   
  2. OK  
  3. leo 27  
  4. jim 38  
  5. leo 15  
  6. jack    22  
  7. jay 7  
  8. Time taken: 0.101 seconds, Fetched: 5 row(s)  

建立parquet格式表 [java]  view plain  copy

  1. create table test_parquet(name string,age int) stored as parquet  

檢視表結構 [java]  view plain  copy

  1. hive> show create table test_parquet;  
  2. OK  
  3. CREATE TABLE `test_parquet`(  
  4.   `name` string,   
  5.   `age` int)  
  6. ROW FORMAT SERDE   
  7.   'org.apache.hadoop.hive.ql.io.parquet.serde.ParquetHiveSerDe'  
  8. STORED AS INPUTFORMAT   
  9.   'org.apache.hadoop.hive.ql.io.parquet.MapredParquetInputFormat'  
  10. OUTPUTFORMAT   
  11.   'org.apache.hadoop.hive.ql.io.parquet.MapredParquetOutputFormat'  
  12. LOCATION  
  13.   'hdfs://localhost:9000/user/hive/warehouse/test_parquet'  
  14. TBLPROPERTIES (  
  15.   'transient_lastDdlTime'='1495038003')  

可以看到資料的inputFormat是MapredParquetInputFormat,之後我們将用這個類來解析資料檔案

往parquet格式表中插入資料

[java]  view plain  copy

  1. insert into table test_parquet select * from test;  

a.用spark中hadoopFile api解析hive中parquet格式檔案

如果是用spark-shell中方式讀取檔案一定要将hive-exec-0.14.0.jar加入到啟動指令行中(MapredParquetInputFormat在這個jar中),還有就是要指定序列化的類,啟動指令行如下

[java]  view plain  copy

  1. spark-shell --master spark://xiaobin:7077 --jars /home/xiaobin/soft/apache-hive-0.14.0-bin/lib/hive-exec-0.14.0.jar  
  2.    --conf spark.serializer=org.apache.spark.serializer.KryoSerializer  

具體讀取代碼如下

[java]  view plain  copy

  1. scala> import org.apache.hadoop.hive.ql.io.parquet.MapredParquetInputFormat  
  2. import org.apache.hadoop.hive.ql.io.parquet.MapredParquetInputFormat  
  3. scala> import org.apache.hadoop.io.{ArrayWritable, NullWritable, Text}  
  4. import org.apache.hadoop.io.{ArrayWritable, NullWritable, Text}  
  5. scala>     val file =sc.hadoopFile("hdfs://localhost:9000/user/hive/warehouse/test_parquet/000000_0",  
  6.      |       classOf[MapredParquetInputFormat],classOf[Void],classOf[ArrayWritable])  
  7. file: org.apache.spark.rdd.RDD[(Void, org.apache.hadoop.io.ArrayWritable)] =  
  8.       hdfs://localhost:9000/user/hive/warehouse/test_parquet/000000_0 HadoopRDD[0] at hadoopFile at <console>:29  
  9. scala> file.take(10).foreach{case(k,v)=>  
  10.      |       val writables = v.get()  
  11.      |       val name = writables(0)  
  12.      |       val age = writables(1)  
  13.      |       println(writables.length+"    "+name+"   "+age)  
  14.      |     }  

用MapredParquetInputFormat解析hive中parquet格式檔案,每行資料将會解析成一個key和value,這裡的key是空值,value是一個ArrayWritable,value的長度和表的列個數一樣,value各個元素對應hive表中行各個字段的值

b.用spark DataFrame 解析parquet檔案

[java]  view plain  copy

  1. val conf = new SparkConf().setAppName("test").setMaster("local")  
  2. val sc = new SparkContext(conf)  
  3. val sqlContext = new org.apache.spark.sql.SQLContext(sc)  
  4. val parquet: DataFrame =  
  5.      sqlContext.read.parquet("hdfs://192.168.1.115:9000/user/hive/warehouse/test_parquet")  
  6. parquet.printSchema()  
  7. parquet.select(parquet("name"), parquet("age") + 1).show()  
  8. root  
  9.  |-- name: string (nullable = true)  
  10.  |-- age: integer (nullable = true)  
  11. +----+---------+  
  12. |name|(age + 1)|  
  13. +----+---------+  
  14. | leo|       28|  
  15. | jim|       39|  
  16. | leo|       16|  
  17. |jack|       23|  
  18. | jay|        8|  
  19. | jim|       38|  
  20. |jack|       37|  
  21. | jay|       12|  

c.用hivesql直接讀取hive表

在local模式下沒有測試成功,打包用spark-submit測試,代碼如下

[java]  view plain  copy

  1. val conf = new SparkConf().setAppName("test")  
  2. val sc = new SparkContext(conf)  
  3. val hiveContext = new HiveContext(sc)  
  4. val sql: DataFrame = hiveContext.sql("select * from test_parquet limit 10")  
  5. sql.take(10).foreach(println)  
  6. [leo,27]                                                                          
  7. [jim,38]  
  8. [leo,15]  
  9. [jack,22]  
  10. [jay,7]  
  11. [jim,37]  
  12. [jack,36]  
  13. [jay,11]  
  14. [leo,35]  
  15. [leo,33]  

送出任務指令行

[java]  view plain  copy

  1. spark-submit --class quickspark.QuickSpark02 --master spark://192.168.1.115:7077 sparkcore-1.0-SNAPSHOT.jar  

寫parquet檔案

a.用spark寫parquet檔案

[java]  view plain  copy

  1. val conf = new SparkConf().setAppName("test").setMaster("local")  
  2. val sc = new SparkContext(conf)  
  3. val sqlContext = new org.apache.spark.sql.SQLContext(sc)  
  4. //    讀取檔案生成RDD  
  5. val file = sc.textFile("hdfs://192.168.1.115:9000/test/user.txt")  
  6.  //定義parquet的schema,資料字段和資料類型需要和hive表中的字段和資料類型相同,否則hive表無法解析  
  7. val schema = (new StructType)  
  8.       .add("name", StringType, true)  
  9.       .add("age", IntegerType, false)  
  10. val rowRDD = file.map(_.split("\t")).map(p => Row(p(0), Integer.valueOf(p(1).trim)))  
  11. //    将RDD裝換成DataFrame  
  12. val peopleDataFrame = sqlContext.createDataFrame(rowRDD, schema)  
  13. peopleDataFrame.registerTempTable("people")  
  14.     peopleDataFrame.write.parquet("hdfs://192.168.1.115:9000/user/hive/warehouse/test_parquet/")  

用hivesql讀取用spark DataFrame生成的parquet檔案

[java]  view plain  copy

  1. hive> select * from test_parquet limit 10;  
  2. OK  
  3. SLF4J: Failed to load class "org.slf4j.impl.StaticLoggerBinder".  
  4. SLF4J: Defaulting to no-operation (NOP) logger implementation  
  5. SLF4J: See http://www.slf4j.org/codes.html#StaticLoggerBinder for further details.  
  6. leo 27  
  7. jim 38  
  8. leo 15  
  9. jack    22  
  10. jay 7  
  11. jim 37  
  12. jack    36  
  13. jay 11  
  14. leo 35  
  15. leo 33  

b.用MapReduce寫parquet檔案

用MR讀寫parquet檔案,剛開始打算使用hive中指定的org.apache.hadoop.hive.ql.io.parquet.MapredParquetOutputFormat這個類,但是這個類的getRecordWriter方法沒實作,直接抛出異常

[java]  view plain  copy

  1. @Override  
  2. public RecordWriter<Void, ArrayWritable> getRecordWriter(  
  3.     final FileSystem ignored,  
  4.     final JobConf job,  
  5.     final String name,  
  6.     final Progressable progress  
  7.     ) throws IOException {  
  8.   throw new RuntimeException("Should never be used");  
  9. }  

是以使用官方提供的parquet解析方式,github位址:https://github.com/apache/parquet-mr/,導入依賴

[java]  view plain  copy

  1. <dependency>  
  2.                 <groupId>org.apache.parquet</groupId>  
  3.                 <artifactId>parquet-common</artifactId>  
  4.                 <version>1.8.1</version>  
  5.             </dependency>  
  6.             <dependency>  
  7.                 <groupId>org.apache.parquet</groupId>  
  8.                 <artifactId>parquet-encoding</artifactId>  
  9.                 <version>1.8.1</version>  
  10.             </dependency>  
  11.             <dependency>  
  12.                 <groupId>org.apache.parquet</groupId>  
  13.                 <artifactId>parquet-column</artifactId>  
  14.                 <version>1.8.1</version>  
  15.             </dependency>  
  16.             <dependency>  
  17.                 <groupId>org.apache.parquet</groupId>  
  18.                 <artifactId>parquet-hadoop</artifactId>  
  19.                 <version>1.8.1</version>  
  20.             </dependency>  

Parquet讀寫有新舊兩個版本,主要是新舊MR api之分,我們用新舊老版本的MR實作下parquet檔案的讀寫

舊版本如下

[java]  view plain  copy

  1. package com.fan.hadoop.parquet;  
  2. import java.io.IOException;  
  3. import java.util.*;  
  4. import org.apache.hadoop.fs.Path;  
  5. import org.apache.hadoop.io.*;  
  6. import org.apache.hadoop.mapred.*;  
  7. import org.apache.parquet.hadoop.example.GroupWriteSupport;  
  8. import org.apache.parquet.example.data.Group;  
  9. import org.apache.parquet.example.data.simple.SimpleGroupFactory;  
  10. import org.apache.parquet.hadoop.mapred.DeprecatedParquetOutputFormat;  
  11. import org.apache.parquet.schema.MessageTypeParser;  
  12. public class ParquetMR {  
  13.     public static class Map extends MapReduceBase implements  
  14.             Mapper<LongWritable, Text, Text, IntWritable> {  
  15.         private final static IntWritable one = new IntWritable(1);  
  16.         private Text word = new Text();  
  17.         public void map(LongWritable key, Text value,  
  18.                         OutputCollector<Text, IntWritable> output,  
  19.                             Reporter reporter) throws IOException {  
  20.             String line = value.toString();  
  21.             StringTokenizer tokenizer = new StringTokenizer(line);  
  22.             while (tokenizer.hasMoreTokens()) {  
  23.                 word.set(tokenizer.nextToken());  
  24.                 output.collect(word, one);  
  25.             }  
  26.         }  
  27.     }  
  28.     public static class Reduce extends MapReduceBase implements  
  29.             Reducer<Text, IntWritable, Void, Group> {  
  30.         private SimpleGroupFactory factory;  
  31.         public void reduce(Text key, Iterator<IntWritable> values,  
  32.                            OutputCollector<Void, Group> output,  
  33.                            Reporter reporter) throws IOException {  
  34.             int sum = 0;  
  35.             while (values.hasNext()) {  
  36.                 sum += values.next().get();  
  37.             }  
  38.             Group group = factory.newGroup()  
  39.                     .append("name",  key.toString())  
  40.                     .append("age", sum);  
  41.             output.collect(null,group);  
  42.         }  
  43.         @Override  
  44.         public void configure(JobConf job) {  
  45.             factory = new SimpleGroupFactory(GroupWriteSupport.getSchema(job));  
  46.         }  
  47.     }  
  48.     public static void main(String[] args) throws Exception {  
  49.         JobConf conf = new JobConf(ParquetMR.class);  
  50.         conf.setJobName("wordcount");  
  51.         String in = "hdfs://localhost:9000/test/wordcount.txt";  
  52.         String out = "hdfs://localhost:9000/test/wd";  
  53.         String writeSchema = "message example {\n" +  
  54.                 "required binary name;\n" +  
  55.                 "required int32 age;\n" +  
  56.                 "}";  
  57.         conf.setMapOutputKeyClass(Text.class);  
  58.         conf.setMapOutputValueClass(IntWritable.class);  
  59.         conf.setOutputKeyClass(NullWritable.class);  
  60.         conf.setOutputValueClass(Group.class);  
  61.         conf.setMapperClass(Map.class);  
  62.         conf.setReducerClass(Reduce.class);  
  63.         conf.setInputFormat(TextInputFormat.class);  
  64.         conf.setOutputFormat(DeprecatedParquetOutputFormat.class);  
  65.         FileInputFormat.setInputPaths(conf, new Path(in));  
  66.         DeprecatedParquetOutputFormat.setWriteSupportClass(conf, GroupWriteSupport.class);  
  67.         GroupWriteSupport.setSchema(MessageTypeParser.parseMessageType(writeSchema), conf);  
  68.         DeprecatedParquetOutputFormat.setOutputPath(conf, new Path(out));  
  69.         JobClient.runJob(conf);  
  70.     }  
  71. }  

生成的檔案:

[java]  view plain  copy

  1. hadoop dfs -ls /test/wd  
  2. Found 2 items  
  3. -rw-r--r--   3 work supergroup          0 2017-07-18 17:41 /test/wd/_SUCCESS  
  4. -rw-r--r--   3 work supergroup        392 2017-07-18 17:41 /test/wd/part-00000-r-00000.parquet  

将生成的檔案複制到hive表test_parquet的路徑下:

[java]  view plain  copy

  1. hadoop dfs -cp /test/wd/part-00000-r-00000.parquet /user/work/warehouse/test_parquet/  

測試hive表讀取parquet檔案

[java]  view plain  copy

  1. hive> select * from test_parquet limit 10;  
  2. OK  
  3. action  2  
  4. hadoop  2  
  5. hello   3  
  6. in  2  
  7. presto  1  
  8. spark   1  
  9. world   1  
  10. Time taken: 0.056 seconds, Fetched: 7 row(s)  

新版本如下

新版本的MR讀寫Parquet和老版本有點差別,schema必須用在conf中設定,其他的差別不大

[java]  view plain  copy

  1. conf.set("parquet.example.schema",writeSchema);  

還是貼下完整的代碼

[java]  view plain  copy

  1. package com.fan.hadoop.parquet;  
  2. import org.apache.hadoop.conf.Configuration;  
  3. import org.apache.hadoop.fs.Path;  
  4. import org.apache.hadoop.io.IntWritable;  
  5. import org.apache.hadoop.io.LongWritable;  
  6. import org.apache.hadoop.io.Text;  
  7. import org.apache.hadoop.mapreduce.Job;  
  8. import org.apache.hadoop.mapreduce.Mapper;  
  9. import org.apache.hadoop.mapreduce.Reducer;  
  10. import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;  
  11. import org.apache.hadoop.mapreduce.lib.input.TextInputFormat;  
  12. import org.apache.parquet.example.data.Group;  
  13. import org.apache.parquet.example.data.simple.SimpleGroupFactory;  
  14. import org.apache.parquet.hadoop.ParquetOutputFormat;  
  15. import org.apache.parquet.hadoop.example.GroupWriteSupport;  
  16. import java.io.IOException;  
  17. import java.util.StringTokenizer;  
  18. public class ParquetNewMR {  
  19.     public static class WordCountMap extends  
  20.             Mapper<LongWritable, Text, Text, IntWritable> {  
  21.         private final IntWritable one = new IntWritable(1);  
  22.         private Text word = new Text();  
  23.         public void map(LongWritable key, Text value, Context context)  
  24.                 throws IOException, InterruptedException {  
  25.             String line = value.toString();  
  26.             StringTokenizer token = new StringTokenizer(line);  
  27.             while (token.hasMoreTokens()) {  
  28.                 word.set(token.nextToken());  
  29.                 context.write(word, one);  
  30.             }  
  31.         }  
  32.     }  
  33.     public static class WordCountReduce extends  
  34.             Reducer<Text, IntWritable, Void, Group> {  
  35.         private SimpleGroupFactory factory;  
  36.         public void reduce(Text key, Iterable<IntWritable> values,  
  37.                            Context context) throws IOException, InterruptedException {  
  38.             int sum = 0;  
  39.             for (IntWritable val : values) {  
  40.                 sum += val.get();  
  41.             }  
  42.             Group group = factory.newGroup()  
  43.                     .append("name",  key.toString())  
  44.                     .append("age", sum);  
  45.             context.write(null,group);  
  46.         }  
  47.         @Override  
  48.         protected void setup(Context context) throws IOException, InterruptedException {  
  49.             super.setup(context);  
  50.             factory = new SimpleGroupFactory(GroupWriteSupport.getSchema(context.getConfiguration()));  
  51.         }  
  52.     }  
  53.     public static void main(String[] args) throws Exception {  
  54.         Configuration conf = new Configuration();  
  55.         String writeSchema = "message example {\n" +  
  56.                 "required binary name;\n" +  
  57.                 "required int32 age;\n" +  
  58.                 "}";  
  59.         conf.set("parquet.example.schema",writeSchema);  
  60.         Job job = new Job(conf);  
  61.         job.setJarByClass(ParquetNewMR.class);  
  62.         job.setJobName("parquet");  
  63.         String in = "hdfs://localhost:9000/test/wordcount.txt";  
  64.         String out = "hdfs://localhost:9000/test/wd1";  
  65.         job.setMapOutputKeyClass(Text.class);  
  66.         job.setMapOutputValueClass(IntWritable.class);  
  67.         job.setOutputValueClass(Group.class);  
  68.         job.setMapperClass(WordCountMap.class);  
  69.         job.setReducerClass(WordCountReduce.class);  
  70.         job.setInputFormatClass(TextInputFormat.class);  
  71.         job.setOutputFormatClass(ParquetOutputFormat.class);  
  72.         FileInputFormat.addInputPath(job, new Path(in));  
  73.         ParquetOutputFormat.setOutputPath(job, new Path(out));  
  74.         ParquetOutputFormat.setWriteSupportClass(job, GroupWriteSupport.class);  
  75.         job.waitForCompletion(true);  
  76.     }  
  77. }  

檢視生成的檔案

[java]  view plain  copy

  1. hadoop dfs -ls /user/work/warehouse/test_parquet  
  2. Found 4 items  
  3. -rw-r--r--   1 work work          0 2017-07-18 18:27 /user/work/warehouse/test_parquet/_SUCCESS  
  4. -rw-r--r--   1 work work        129 2017-07-18 18:27 /user/work/warehouse/test_parquet/_common_metadata  
  5. -rw-r--r--   1 work work        275 2017-07-18 18:27 /user/work/warehouse/test_parquet/_metadata  
  6. -rw-r--r--   1 work work        392 2017-07-18 18:27 /user/work/warehouse/test_parquet/part-r-00000.parquet  

将生成的檔案複制到hive表test_parquet的路徑下:

[java]  view plain  copy

  1. hadoop dfs -cp /test/wd/part-00000-r-00000.parquet /user/work/warehouse/test_parquet/  

測試hive

[java]  view plain  copy

  1. hive> select name,age from test_parquet limit 10;  
  2. OK  
  3. action  2  
  4. hadoop  2  
  5. hello   3  
  6. in  2  
  7. presto  1  
  8. spark   1  
  9. world   1  
  10. Time taken: 0.036 seconds, Fetched: 7 row(s)  

用mapreduce讀parquet檔案

[java]  view plain  copy

  1. package com.fan.hadoop.parquet;  
  2. import org.apache.hadoop.conf.Configuration;  
  3. import org.apache.hadoop.fs.Path;  
  4. import org.apache.hadoop.io.LongWritable;  
  5. import org.apache.hadoop.io.Text;  
  6. import org.apache.hadoop.mapreduce.Job;  
  7. import org.apache.hadoop.mapreduce.Mapper;  
  8. import org.apache.hadoop.mapreduce.Reducer;  
  9. import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;  
  10. import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat;  
  11. import org.apache.parquet.example.data.Group;  
  12. import org.apache.parquet.hadoop.ParquetInputFormat;  
  13. import org.apache.parquet.hadoop.api.DelegatingReadSupport;  
  14. import org.apache.parquet.hadoop.api.InitContext;  
  15. import org.apache.parquet.hadoop.api.ReadSupport;  
  16. import org.apache.parquet.hadoop.example.GroupReadSupport;  
  17. import java.io.IOException;  
  18. import java.util.*;  
  19. public class ParquetNewMRReader {  
  20.     public static class WordCountMap1 extends  
  21.             Mapper<Void, Group, LongWritable, Text> {  
  22.         protected void map(Void key, Group value,  
  23.                            Mapper<Void, Group, LongWritable, Text>.Context context)  
  24.                 throws IOException, InterruptedException {  
  25.             String name = value.getString("name",0);  
  26.             int  age = value.getInteger("age",0);  
  27.             context.write(new LongWritable(age),  
  28.                     new Text(name));  
  29.         }  
  30.     }  
  31.     public static class WordCountReduce1 extends  
  32.             Reducer<LongWritable, Text, LongWritable, Text> {  
  33.         public void reduce(LongWritable key, Iterable<Text> values,  
  34.                            Context context) throws IOException, InterruptedException {  
  35.             Iterator<Text> iterator = values.iterator();  
  36.             while(iterator.hasNext()){  
  37.                 context.write(key,iterator.next());  
  38.             }  
  39.         }  
  40.     }  
  41.     public static final class MyReadSupport extends DelegatingReadSupport<Group> {  
  42.         public MyReadSupport() {  
  43.             super(new GroupReadSupport());  
  44.         }  
  45.         @Override  
  46.         public org.apache.parquet.hadoop.api.ReadSupport.ReadContext init(InitContext context) {  
  47.             return super.init(context);  
  48.         }  
  49.     }  
  50.     public static void main(String[] args) throws Exception {  
  51.         Configuration conf = new Configuration();  
  52.         String readSchema = "message example {\n" +  
  53.                 "required binary name;\n" +  
  54.                 "required int32 age;\n" +  
  55.                 "}";  
  56.         conf.set(ReadSupport.PARQUET_READ_SCHEMA, readSchema);  
  57.         Job job = new Job(conf);  
  58.         job.setJarByClass(ParquetNewMRReader.class);  
  59.         job.setJobName("parquet");  
  60.         String in = "hdfs://localhost:9000/test/wd1";  
  61.         String  out = "hdfs://localhost:9000/test/wd2";  
  62.         job.setMapperClass(WordCountMap1.class);  
  63.         job.setReducerClass(WordCountReduce1.class);  
  64.         job.setInputFormatClass(ParquetInputFormat.class);  
  65.         ParquetInputFormat.setReadSupportClass(job, MyReadSupport.class);  
  66.         ParquetInputFormat.addInputPath(job, new Path(in));  
  67.         job.setOutputFormatClass(TextOutputFormat.class);  
  68.         FileOutputFormat.setOutputPath(job, new Path(out));  
  69.         job.waitForCompletion(true);  
  70.     }  
  71. }  

檢視生成的檔案

[java]  view plain  copy

  1. hadoop dfs -cat /test/wd2/part-r-00000  
  2. 1       world  
  3. 1       spark  
  4. 1       presto  
  5. 2       in  
  6. 2       hadoop  
  7. 2       action  
  8. 3       hello  

繼續閱讀