版權聲明:本文由董可倫首發于https://dongkelun.com,非商業轉載請注明作者及原創出處。商業轉載請聯系作者本人。 https://blog.csdn.net/dkl12/article/details/80588445
我的原創位址:
https://dongkelun.com/2018/05/30/sparkGZ/前言
本文講如何用spark讀取gz類型的壓縮檔案,以及如何解決我遇到的各種問題。
1、檔案壓縮
下面這一部分摘自Spark快速大資料分析:
在大資料工作中,我們經常需要對資料進行壓縮以節省存儲空間和網絡傳輸開銷。對于大多數Hadoop輸出格式來說,我們可以指定一種壓縮編解碼器來壓縮資料。
選擇一個輸出壓縮編解碼器可能會對這些資料以後的使用者産生巨大影響。對于像Spark 這樣的分布式系統,我們通常會嘗試從多個不同機器上一起讀入資料。要實作這種情況,每個工作節點都必須能夠找到一條新記錄的開端。有些壓縮格式會使這變得不可能,而必須要單個節點來讀入所有資料,這就很容易産生性能瓶頸。可以很容易地從多個節點上并行讀取的格式被稱為“可分割”的格式。下表列出了可用的壓縮選項。
格式 | 可分割 | 平均壓縮速度 | 文本檔案壓縮效率 | Hadoop壓縮編解碼器 | 純Java實作 | 原生 | 備注 |
---|---|---|---|---|---|---|---|
gzip | 否 | 快 | 高 | org.apache.hadoop.io.compress.GzipCodec | 是 | ||
lzo | 是(取決于所使用的庫) | 非常快 | 中等 | com.hadoop.compression.lzo.LzoCodec | 需要在每個節點上安裝LZO | ||
bzip2 | 慢 | 非常高 | org.apache.hadoop.io.compress.Bzip2Codec | 為可分割版本使用純Java | |||
zlib | org.apache.hadoop.io.compress.DefaultCodec | Hadoop 的預設壓縮編解碼器 | |||||
Snappy | 低 | org.apache.hadoop.io.compress.SnappyCodec | Snappy 有純Java的移植版,但是在Spark/Hadoop中不能用 |
盡管Spark 的textFile() 方法可以處理壓縮過的輸入,但即使輸入資料被以可分割讀取的方式壓縮,Spark 也不會打開splittable。是以,如果你要讀取單個壓縮過的輸入,最好不要考慮使用Spark 的封裝,而是使用newAPIHadoopFile 或者hadoopFile,并指定正确的壓縮編解碼器。
關于上面一段話的個人測試:選取一個大檔案txt,大小為1.5G,寫spark程式讀取hdfs上的該檔案然後寫入hive,經測試在多個分區的情況下,txt執行時間最短,因為在多個機器并行執行,而gz檔案是不可分割的,即使指定分區數目,但依然是一個分區,一個task,即在一個機器上執行,bzip2格式的檔案雖然是可分割的,即可以按照指定的分區分為不同的task在多個機器上執行,但是執行時間長,比gz時間還長,經過四次改變bzip2的分區,發現最快的時間和gz時間是一樣的,如果指定一個分區的話,比gz要慢很多,我想這樣就可以更好的了解:”盡管Spark 的textFile() 方法可以處理壓縮過的輸入,但即使輸入資料被以可分割讀取的方式壓縮,Spark 也不會打開splittable”這句話了。
後續測試:根據叢集的cpu合理配置設定executor的個數的情況下,txt的時間縮短到1分鐘,bzip2縮短到1.3分鐘,而對gz重新分區(reparation)縮短到2分鐘,可以看到在合理配置設定資源的情況下,bzip2比gz快不少,但依然趕不上txt,當然這也的結果可能受檔案大小和叢集資源的限制,是以根據自己的實際需求測試再決定用哪個即可。
2、代碼
代碼很簡單,用textFile()即可,假設,我的資料名為data.txt.gz,我把它放在hdfs上的/tmp/dkl路徑下那麼代碼為:
val path = "hdfs://ambari.master.com:8020/tmp/dkl/data.txt.gz"
val data = sc.textFile(path)
注:把資料放在hdfs的指令為
hadoop fs -put data.tar.gz /tml/dkl
3、一些小問題
3.1 資料
首先造幾個資料吧,先建立一個txt,名字為data.txt,内容如下
1 張三 上海 2018-05-25
2 張三 上海 2018-05-25
3 張三 上海 2018-05-25
4 張三 上海 2018-05-25
5 張三 上海 2018-05-25
3.2 如何壓縮
那麼如如何打包為gz格式的壓縮檔案呢,分兩種
一、 在windows上打包,如果不想在Linux伺服器上用指令打包,那麼可以直接用windows上的軟體打包(win上常見的zip,rar格式,spark是不支援的),我用7-zip軟體壓縮,大家可百度7-zip或直接在
https://www.7-zip.org/下載下傳安裝,壓縮格式選gzip即可。
二、 在Linux上壓縮,可通過下面的指令
1、保留原檔案
gzip –c data.txt > data.txt.gz
2、不保留原檔案,預設生成的檔案名為原檔案名.gz,即data.txt.gz
gzip data.txt
壓縮完了之後,跑一下程式測試一下
data.take(3).foreach(println)
1 張三 上海 2018-05-25
2 張三 上海 2018-05-25
3 張三 上海 2018-05-25
根據結果看沒問題。
三、 說明
在Linux上用tar指令壓縮,spark雖然可以讀,但是第一行會有檔案資訊
tar -zcvf data.tar.gz data.txt
3.3 檔案編碼問題
别人給我的原檔案是.rar,那我需要将其解壓之後得到txt,然後按照上述方式壓縮為.gz,然後上傳到hdfs,進行代碼測試,列印前幾條發現亂碼,查了一下發現原檔案是gbk編碼的,且sc.textFile()不能指定編碼,隻能讀取utf8格式,其他格式就會亂碼。
注意:因為實際情況下解壓後的txt檔案很大,windows是直接打不開的,是以不能通過打開檔案修改編碼的方法去解決。
3.3.1 建構測試gbk格式的檔案
1、windows上可以用記事本打開,另存為,編碼選擇ANSI即可
2、Linux可以通過下面的指令修改
iconv -f utf8 -t gbk data.txt > data_gbk.txt
測試一下輸出,發現确實亂碼了(直接測試txt即可)
1 ���� �Ϻ� 2018-05-25
2 ���� �Ϻ� 2018-05-25
3 ���� �Ϻ� 2018-05-25
3.3.2 代碼解決
通過如下代碼測試即可
定義方法
import org.apache.spark.rdd.RDD
import org.apache.spark.SparkContext
import org.apache.hadoop.io.LongWritable
import org.apache.hadoop.mapred.TextInputFormat
import org.apache.hadoop.io.Text
def transfer(sc: SparkContext, path: String): RDD[String] = {
sc.hadoopFile(path, classOf[TextInputFormat], classOf[LongWritable], classOf[Text], 1)
.map(p => new String(p._2.getBytes, 0, p._2.getLength, "GBK"))
}
測試方法
transfer(sc, path3).take(3).foreach(println)
參考:
Spark Scala 讀取GBK檔案的方法3.3.3 Linux指令
可直接通過Linux指令轉換txt的編碼格式,再壓縮,這樣代碼就不用修改
其實在3.2.1中已經涉及到了
1、通過Linux自帶的指令iconv
iconv不能覆寫原來的檔案,隻能生成新的檔案之後,再通過mv指令去覆寫
iconv -f gbk -t utf8 data_gbk.txt > data_new.txt
2、通過enca
enca可以直接覆寫原來的檔案,這樣如果不想改變來的檔案名,就少一步mv操作了,enca不是子系統自帶的,需要自己下載下傳安裝,可在
http://dl.cihar.com/enca/下載下傳最新版本。
#下載下傳&解壓
wget http://dl.cihar.com/enca/enca-1.19.tar.gz
tar -zxvf enca-1.19.tar.gz
cd enca-1.19
#編譯安裝
./configure
make
make install
安裝好了之後通過下面的指令轉換即可
enca -L zh_CN -x UTF-8 data_gbk.txt
轉換編碼格式之後,在通過程式測試即可。
參考:
linux 下的檔案編碼格式轉換3.4 rdd換df
由于檔案過大,不能直接打開看也沒用垃圾資料,造成格式問題,如果有垃圾資料,在rdd轉df的過程中會産生異常,這裡記錄一下我碰見的問題。
1、首先可以先列印出前幾行資料檢視一下該檔案的大體格式
2、碰到的一個一個異常
代碼用的
舊版spark(1.6版本) 将rdd動态轉為dataframe裡面的方法。
if (assertnotnull(input[0, org.apache.spark.sql.Row, true]).isNullAt) null else staticinvoke(class org.apache.spark.unsafe.types.UTF8String, StringType, fromString, validateexternaltype(getexternalrowfield(assertnotnull(input[0, org.apache.spark.sql.Row, true])....
原因是因為檔案裡有一行資料為垃圾資料,這行資料的列數和列名的個數不一樣導緻的,可以在代碼中過濾掉這樣資料即可。
.filter(_.length == colName.length)