天天看點

Hive總結(十)Hive 輸入輸出适配類(輸出CSV,XML)

在最初使用 hive ,應該說上手還是挺快的。 Hive 提供的類 SQL 語句與 mysql 語句極為相似,文法上有大量相同的地方,這給我們上手帶來了很大的友善,但是要得心應手地寫好這些語句,還需要對 hive 有較好的了解,才能結合 hive 特色寫出精妙的語句。

關于 hive 語言的詳細文法可參考官方 wiki 的語言手冊:http://wiki.apache.org/hadoop/Hive/LanguageManual

雖然文法風格為我們提供了便利,但初次使用遇到的問題還是不少的,下面針對業務場景談談我們遇到的問題,和對 hive 功能的定制。

1、 分隔符問題

首先遇到的是日志資料的分隔符問題,我們的日志資料的大緻格式如下:

2010-05-24 00:00:02@$_$@QQ2010@$_$@all@$_$@NOKIA_1681C@$_$@1@$_$@10@$_$@@$_$@-1@$_$@10@$_$@application@$_$@1

從格式可見其分隔符是“ @$_$@ ”,這是為了盡可能防止日志正文出現與分隔符相同的字元而導緻資料混淆。本來 hive支援在建表的時候指定自定義分隔符的,但經過多次測試發現隻支援單個字元的自定義分隔符,像“ @$_$@ ”這樣的分隔符是不能被支援的,但是我們可以通過對分隔符的定制解決這個問題, hive 的内部分隔符是“ \001 ”,隻要把分隔符替換成“\001 ”即可。

經過探索我們發現有兩條途徑解決這個問題。

a)自定義 outputformat 和 inputformat 。

Hive 的 outputformat/inputformat 與 hadoop 的 outputformat/inputformat 相當類似, inputformat 負責把輸入資料進行格式化,然後提供給 hive , outputformat 負責把 hive 輸出的資料重新格式化成目标格式再輸出到檔案,這種對格式進行定制的方式較為底層,對其進行定制也相對簡單,重寫 InputFormat 中 RecordReader 類中的 next 方法即可,示例代碼如下:

1.  public boolean next(LongWritable key, BytesWritable value)
2.          throws IOException {
3.  
4.          while ( reader .next(key, text ) ) {
5.  
6.          String strReplace = text .toString().toLowerCase().replace( "@$_$@" , "\001" );
7.  
8.          Text txtReplace = new Text();
9.  
10.          txtReplace.set(strReplace );
11.  
12.          value.set(txtReplace.getBytes(), 0, txtReplace.getLength());
13.  
14.          return true ;
15.  
16.        }
17.  
18.           return false ;
19.  
20.  }
21.  
22.          重寫 HiveIgnoreKeyTextOutputFormat 中 RecordWriter 中的 write 方法,示例代碼如下:
23.  
24.      public void write (Writable w) throws IOException {
25.  
26.        String strReplace = ((Text)w).toString().replace( "\001" , "@$_$@" );
27.  
28.        Text txtReplace = new Text();
29.  
30.        txtReplace.set(strReplace);
31.  
32.        byte [] output = txtReplace.getBytes();
33.  
34.        bytesWritable .set(output, 0, output. length );
35.  
36.        writer .write( bytesWritable );
37.  
38.  }      

複制代碼

自定義 outputformat/inputformat 後,在建表時需要指定 outputformat/inputformat ,如下示例:

stored as INPUTFORMAT 'com.aspire.search.loganalysis.hive.SearchLogInputFormat' OUTPUTFORMAT 'com.aspire.search.loganalysis.hive.SearchLogOutputFormat'

b) 通過 SerDe(serialize/deserialize) ,在資料序列化和反序列化時格式化資料。

這種方式稍微複雜一點,對資料的控制能力也要弱一些,它使用正規表達式來比對和處理資料,性能也會有所影響。但它的優點是可以自定義表屬性資訊 SERDEPROPERTIES ,在 SerDe 中通過這些屬性資訊可以有更多的定制行為。

2、 資料導入導出

a) 多版本日志格式的相容

由于 hive 的應用場景主要是處理冷資料(隻讀不寫),是以它隻支援批量導入和導出資料,并不支援單條資料的寫入或更新,是以如果要導入的資料存在某些不太規範的行,則需要我們定制一些擴充功能對其進行處理。

我們需要處理的日志資料存在多個版本,各個版本每個字段的資料内容存在一些差異,可能版本 A 日志資料的第二個列是搜尋關鍵字,但版本 B 的第二列卻是搜尋的終端類型,如果這兩個版本的日志直接導入 hive 中,很明顯資料将會混亂,統計結果也不會正确。我們的任務是要使多個版本的日志資料能在 hive 資料倉庫中共存,且表的 input/output 操作能夠最終映射到正确的日志版本的正确字段。

這裡我們不關心這部分繁瑣的工作,隻關心技術實作的關鍵點,這個功能該在哪裡實作才能讓 hive 認得這些不同格式的資料呢?經過多方嘗試,在中間任何環節做這個版本适配都将導緻複雜化,最終這個工作還是在 inputformat/outputformat 中完成最為優雅,畢竟 inputformat 是源頭, outputformat 是最終歸宿。具體來說,是在前面提到的 inputformat 的 next 方法中和在 outputformat 的 write 方法中完成這個适配工作。

b) Hive 操作本地資料

一開始,總是把本地資料先傳到 HDFS ,再由 hive 操作 hdfs 上的資料,然後再把資料從 HDFS 上傳回本地資料。後來發現大可不必如此, hive 語句都提供了“ local ”關鍵字,支援直接從本地導入資料到 hive ,也能從 hive 直接導出資料到本地,不過其内部計算時當然是用 HDFS 上的資料,隻是自動為我們完成導入導出而已。

3、 資料處理

日志資料的統計處理在這裡反倒沒有什麼特别之處,就是一些 SQL 語句而已,也沒有什麼高深的技巧,不過還是列舉一些語句示例,以示 hive 處理資料的友善之處,并展示 hive 的一些用法。

a) 為 hive 添加使用者定制功能,自定義功能都位于 hive_contrib.jar 包中

add jar /opt/hadoop/hive-0.5.0-bin/lib/hive_contrib.jar;

b)  統計每個關鍵詞的搜尋量,并按搜尋量降序排列,然後把結果存入表 keyword_20100603 中

create table keyword_20100603 as select keyword,count(keyword) as count from searchlog_20100603 group by keyword order by count desc;

c) 統計每類使用者終端的搜尋量,并按搜尋量降序排列,然後把結果存入表 device_20100603 中

create table device_20100603 as select device,count(device) as count from searchlog_20100603 group by device order by count desc;

d) 建立表 time_20100603 ,使用自定義的 INPUTFORMAT 和 OUTPUTFORMAT ,并指定表資料的真實存放位置在 '/LogAnalysis/results/time_20100603' ( HDFS 路徑),而不是放在 hive 自己的資料目錄中

create external table if not exists time_20100603(time string, count int) stored as INPUTFORMAT 'com.aspire.search.loganalysis.hive.XmlResultInputFormat' OUTPUTFORMAT 'com.aspire.search.loganalysis.hive.XmlResultOutputFormat' LOCATION '/LogAnalysis/results/time_20100603';

e) 統計每秒通路量 TPS ,按通路量降序排列,并把結果輸出到表 time_20100603 中,這個表我們在上面剛剛定義過,其真實位置在 '/LogAnalysis/results/time_20100603' ,并且由于 XmlResultOutputFormat 的格式化,檔案内容是 XML 格式。

insert overwrite table time_20100603 select time,count(time) as count from searchlog_20100603 group by time order by count desc;

f) 計算每個搜尋請求響應時間的最大值,最小值和平均值

insert overwrite table response_20100603 select max(responsetime) as max,min(responsetime) as min,avg(responsetime) as avg from searchlog_20100603;

g)建立一個表用于存放今天與昨天的關鍵詞搜尋量和增量及其增量比率,表資料位于 '/LogAnalysis/results/keyword_20100604_20100603' ,内容将是 XML 格式。

create external table if not exists keyword_20100604_20100603(keyword string, count int, increment int, incrementrate double) stored as INPUTFORMAT 'com.aspire.search.loganalysis.hive.XmlResultInputFormat' OUTPUTFORMAT 'com.aspire.search.loganalysis.hive.XmlResultOutputFormat' LOCATION '/LogAnalysis/results/keyword_20100604_20100603';

h)設定表的屬性,以便 XmlResultInputFormat 和 XmlResultOutputFormat 能根據 output.resulttype 的不同内容輸出不同格式的 XML 檔案。

alter table keyword_20100604_20100603 set tblproperties ('output.resulttype'='keyword');

i) 關聯今天關鍵詞統計結果表( keyword_20100604 )與昨天關鍵詞統計結果表( keyword_20100603 ),統計今天與昨天同時出現的關鍵詞的搜尋次數,今天相對昨天的增量和增量比率,并按增量比率降序排列,結果輸出到剛剛定義的 keyword_20100604_20100603 表中,其資料檔案内容将為 XML 格式。

insert overwrite table keyword_20100604_20100603 select cur.keyword, cur.count, cur.count-yes.count as increment, (cur.count-yes.count)/yes.count as incrementrate from keyword_20100604 cur join keyword_20100603 yes on (cur.keyword = yes.keyword) order by incrementrate desc;

4、使用者自定義函數 UDF

部分統計結果需要以 CSV 的格式輸出,對于這類檔案體全是有效内容的檔案,不需要像 XML 一樣包含 version , encoding 等資訊的檔案頭,最适合用 UDF(user define function) 了。

UDF 函數可直接應用于 select 語句,對查詢結構做格式化處理之後,再輸出内容。自定義 UDF 需要繼承 org.apache.hadoop.hive.ql.exec.UDF ,并實作 evaluate 函數, Evaluate 函數支援重載,還支援可變參數。我們實作了一個支援可變字元串參數的 UDF ,支援把 select 得出的任意個數的不同類型資料轉換為字元串後,按 CSV 格式輸出,由于代碼較簡單,這裡給出源碼示例:

1.  public String evaluate(String... strs) {
2.         StringBuilder sb = new StringBuilder();
3.  
4.         for ( int i = 0; i < strs. length ; i++) {
5.  
6.             sb.append(ConvertCSVField(strs[i])).append( ',' );
7.  
8.         }
9.  
10.         sb.deleteCharAt(sb.length()-1);
11.  
12.         return sb.toString();
13.  
14.  }複制代碼
      

需要注意的是,要使用 UDF 功能,除了實作自定義 UDF 外,還需要加入包含 UDF 的包,示例:

add jar /opt/hadoop/hive-0.5.0-bin/lib/hive_contrib.jar;

然後建立臨時方法,示例:

CREATE TEMPORARY FUNCTION Result2CSv AS ‘com.aspire.search.loganalysis.hive. Result2CSv';

使用完畢還要 drop 方法,示例:

DROP TEMPORARY FUNCTION Result2CSv;

5、輸出 XML 格式的統計結果

前面看到部分日志統計結果輸出到一個表中,借助 XmlResultInputFormat 和 XmlResultOutputFormat 格式化成 XML 檔案,考慮到建立這個表隻是為了得到 XML 格式的輸出資料,我們隻需實作 XmlResultOutputFormat 即可,如果還要支援 select 查詢,則我們還需要實作 XmlResultInputFormat ,這裡我們隻介紹 XmlResultOutputFormat 。

前面介紹過,定制 XmlResultOutputFormat 我們隻需重寫 write 即可,這個方法将會把 hive 的以 ’\001’ 分隔的多字段資料格式化為我們需要的 XML 格式,被簡化的示例代碼如下:

1.  public void write(Writable w) throws IOException {
2.             String[] strFields = ((Text) w).toString().split( "\001" );
3.  
4.             StringBuffer sbXml = new StringBuffer();
5.  
6.             if ( strResultType .equals( "keyword" )) {
7.  
8.      sbXml.append( "<record><keyword>" ).append(strFields[0]).append(
9.  
10.      "</keyword><count>" ).append(strFields[1]).append(           "</count><increment>" ).append(strFields[2]).append(
11.  
12.      "</increment><rate>" ).append(strFields[3]).append(
13.  
14.  "</rate></result>" );
15.  
16.             }
17.  
18.             Text txtXml = new Text();
19.  
20.             byte [] strBytes = sbXml.toString().getBytes( "utf-8" );
21.  
22.             txtXml.set(strBytes, 0, strBytes. length );
23.  
24.             byte [] output = txtXml.getBytes();
25.  
26.             bytesWritable .set(output, 0, output. length );
27.  
28.             writer .write( bytesWritable );
29.  
30.      }      

其中的 strResultType .equals( "keyword" ) 指定關鍵詞統計結果,這個屬性來自以下語句對結果類型的指定,通過這個屬性我們還可以用同一個 outputformat 輸出多種類型的結果。

alter table keyword_20100604_20100603 set tblproperties ('output.resulttype'='keyword');

仔細看看 write 函數的實作便可發現,其實這裡隻輸出了 XML 檔案的正文,而 XML 的檔案頭和結束标簽在哪裡輸出呢?所幸我們采用的是基于 outputformat 的實作,我們可以在構造函數輸出 version , encoding 等檔案頭資訊,在 close() 方法中輸出結束标簽。

這也是我們為什麼不使用 UDF 來輸出結果的原因,自定義 UDF 函數不能輸出檔案頭和檔案尾,對于 XML 格式的資料無法輸出完整格式,隻能輸出 CSV 這類所有行都是有效資料的檔案。