一般来说,计算机处理的数据都存在一些冗余度,同时数据中间,尤其是相邻数据间存在着相关性,所以可以通过一些有别于原始编码的特殊编码方式来保存数据,使数据占用的存储空间比较小,这个过程一般叫压缩。和压缩对应的概念是解压缩,就是将被压缩的数据从特殊编码方式还原为原始数据的过程。
压缩广泛应用于海量数据处理中,对数据文件进行压缩,可以有效减少存储文件所需的空间,并加快数据在网络上或者到磁盘上的传输速度。在hadoop中,压缩应用于文件存储、map阶段到reduce阶段的数据交换(需要打开相关的选项)等情景。
数据压缩的方式非常多,不同特点的数据有不同的数据压缩方式:如对声音和图像等特殊数据的压缩,就可以采用有损的压缩方法,允许压缩过程中损失一定
的信息,换取比较大的压缩比;而对音乐数据的压缩,由于数据有自己比较特殊的编码方式,因此也可以采用一些针对这些特殊编码的专用数据压缩算法。
hadoop作为一个较通用的海量数据处理平台,在使用压缩方式方面,主要考虑压缩速度和压缩文件的可分割性。
所有的压缩算法都会考虑时间和空间的权衡,更快的压缩和解压缩速度通常会耗费更多的空间(压缩比较低)。例如,通过gzip命令压缩数据时,用户可以设置不同的选项来选择速度优先或空间优先,选项–1表示优先考虑速度,选项–9表示空间最优,可以获得最大的压缩比。
需要注意的是,有些压缩算法的压缩和解压缩速度会有比较大的差别:gzip和zip是通用的压缩工具,在时间/空间处理上相对平衡,gzip2压缩比gzip和zip更有效,但速度较慢,而且bzip2的解压缩速度快于它的压缩速度。
当使用mapreduce处理压缩文件时,需要考虑压缩文件的可分割性。考
虑我们需要对保持在hdfs上的一个大小为1gb的文本文件进行处理,当前hdfs的数据块大小为64mb的情况下,该文件被存储为16块,对应的
mapreduce作业将会将该文件分为16个输入分片,提供给16个独立的map任务进行处理。但如果该文件是一个gzip格式的压缩文件(大小不
变),这时,mapreduce作业不能够将该文件分为16个分片,因为不可能从gzip数据流中的某个点开始,进行数据解压。但是,如果该文件是一个
bzip2格式的压缩文件,那么,mapreduce作业可以通过bzip2格式压缩文件中的块,将输入划分为若干输入分片,并从块开始处开始解压缩数
据。bzip2格式压缩文件中,块与块间提供了一个48位的同步标记,因此,bzip2支持数据分割。
hadoop支持的压缩格式:
压缩格式
unix工具
算法
文件扩展名
支持多文件
可分割
deflate
无
.deflate
否
gzip
.gz
zip
.zip
是
bzip
bzip2
.bz2
lzo
lzop
.lzo
为了支持多种压缩解压缩算法,hadoop引入了编码/解码器。与hadoop序列化框架类似,编码/解码器也是使用抽象工厂的设计模式。目前,hadoop支持的编码/解码器如下
压缩算法及其编码/解码器:
对应的编码/解码器
org.apache.hadoop.io.compress.defaultcodec
org.apache.hadoop.io.compress.gzipcodec
org.apache.hadoop.io.compress.bzip2codec
snappy
org.apache.hadoop.io.compress.snappycodec
同一个压缩方法对应的压缩、解压缩相关工具,都可以通过相应的编码/解码器获得。
下面将介绍使用编码/解码器的典型实例(代码在org.hadoopinternal.compress包中)。其中,compress()方法接
受一个字符串参数,用于指定编码/解码器,并用对应的压缩算法对文本文件readme.txt进行压缩。字符串参数使用java的反射机制创建对应的编码
/解码器对象,通过compressioncodec对象,进一步使用它的createoutputstream()方法构造一个
compressionoutputstream流,未压缩的数据通过ioutils.copybytes()方法,从输入文件流中复制写入
compressionoutputstream流,最终以压缩格式写入底层的输出流中。
在本实例中,底层使用的是文件输出流fileoutputstream,它关联文件的文件名,是在原有文件名的基础上添加压缩算法相应的扩展名生
成。该扩展名可以通过compressioncodec对象的getdefaultextension()方法获得。相关代码如下:
需要解压缩文件时,通常通过其扩展名来推断它对应的编码/解码器,进而用相应的解码流对数据进行解码,如扩展名为gz的文件可以使用gzipcodec阅读。
compressioncodecfactory提供了getcodec()方法,用于将文件扩展名映射到对应的编码/解码器,如下面的例子。有了
compressioncodec对象,就可以使用和压缩类似的过程,通过对象的createinputstream()方法获得
compressioninputstream对象,解码数据。相关代码如下:
hadoop通过以编码/解码器为基础的抽象工厂方法,提供了一个可扩展的框架,支持多种压缩方法。下面就来研究hadoop压缩框架的实现。
1. 编码/解码器
前面已经提过,compressioncodec接口实现了编码/解码器,使用的是抽象工厂的设计模式。compressioncodec提供了一系列方法,用于创建特定压缩算法的相关设施,其类图如图所示:
![](https://img.laitimes.com/img/_0nNw4CM6IyYiwiM6ICdiwiInBnauEjN4ITOzEzMygzM3EDMz8CXzATNxAjMvwVM4AzNxYzLcVTMwIzZvxmYvwVbvNmLn9GbiRXauNmLzV2Zh1Wavw1LcpDc0RHaiojIsJye.jpg)
compressioncodec中的方法很对称,一个压缩功能总对应着一个解压缩功能。其中,与压缩有关的方法包括:
createoutputstream()用于通过底层输出流创建对应压缩算法的压缩流,重载的createoutputstream()方法可使用压缩器创建压缩流;
createcompressor()方法用于创建压缩算法对应的压缩器。后续会继续介绍压缩流compressionoutputstream和压缩器compressor。解压缩也有对应的方法和类。
compressioncodec中还提供了获取对应文件扩展名的方法getdefaultextension(),如对于
org.apache.hadoop.io.compress.bzip2codec,该方法返回字符串“.bz2”,注意字符串的第一个字符。相关代码
如下:
compressioncodecfactory是hadoop压缩框架中的另一个类,它应用了工厂方法,使用者可以通过它提供的方法获得compressioncodec。
注意:抽象工厂方法和工厂方法这两个设计模式有很大的区别,抽象工厂方法用于创建一
系列相关或互相依赖的对象,如compressioncodec可以获得和某一个压缩算法相关的对象,包括压缩流和解压缩流等。而工厂方法(严格来
说,compressioncodecfactory是参数化工厂方法),用于创建多种产品,如通过compressioncodecfactory的
getcodec()方法,可以创建gzipcodec对象或bzip2codec对象。
在前面的实例中已经使用过getcodec()方法,为某一个压缩文件寻找对应的compressioncodec。为了分析该方法,需要了解compressioncodec类中保存文件扩展名和compressioncodec映射关系的成员变量codecs。
codecs是一个有序映射表,即它本身是一个map,同时它对map的键排序,下面是codecs中保存的一个可能的映射关系:
可以看到,map中的键是排序的。
getcodec()方法的输入是path对象,保存着文件路径,如实例中的“readme.txt.bz2”。
首先通过获取path对象对应的文件名并逆转该字符串得到“2zb.txt.emdaer”,然后通过有序映射sortedmap的
headmap()方法,查找最接近上述逆转字符串的有序映射的部分视图,如输入“2zb.txt.emdaer”的查找结果submap,只包含
“2zb.”对应的那个键–值对,如果输入是“zg.txt.emdaer”,则submap会包含成员变量codecs中保存的所有键–值对。然后,简
单地获取submap最后一个元素的键,如果该键是逆转文件名的前缀,那么就找到了文件对应的编码/解码器,否则返回空。实现代码如下:
compressioncodecfactory.getcodec()方法的代码看似复杂,但通过灵活使用有序映射sortedmap,实现其实还是非常简单的。
2. 压缩器和解压器
压缩器(compressor)和解压器(decompressor)是hadoop压缩框架中的一对重要概念。
compressor可以插入压缩输出流的实现中,提供具体的压缩功能;相反,decompressor提供具体的解压功能并插入
compressioninputstream中。compressor和decompressor的这种设计,最初是在java的zlib压缩程序库中
引入的,对应的实现分别是java.util.zip.deflater和java.util.zip.inflater。下面以compressor为
例介绍这对组件。
compressor的用法相对复杂,compressor通过setinput()方法接收数据到内部缓冲区,自然可以多次调用
setinput()方法,但内部缓冲区总是会被写满。如何判断压缩器内部缓冲区是否已满呢?可以通过needsinput()的返回值,如果是
false,表明缓冲区已经满,这时必须通过compress()方法获取压缩后的数据,释放缓冲区空间。
为了提高压缩效率,并不是每次用户调用setinput()方法,压缩器就会立即工作,所以,为了通知压缩器所有数据已经写入,必须使用
finish()方法。finish()调用结束后,压缩器缓冲区中保持的已经压缩的数据,可以继续通过compress()方法获得。至于要判断压缩器
中是否还有未读取的压缩数据,则需要利用finished()方法来判断。
注意:finished()和finish()的作用不同,finish()结束数据输入的过程,而finished()返回false,表明压缩器中还有未读取的压缩数据,可以继续通过compress()方法读取。
compressor接口源代码如下:
使用compressor的一个典型实例如下:
以上代码实现了setinput()、needsinput()、finish()、compress()和finished()的配合过程。将输
入inbuf分成几个部分,通过setinput()方法送入压缩器,而在finish()调用结束后,通过finished()循序判断压缩器是否还有
未读取的数据,并使用compress()方法获取数据。
在压缩的过程中,compressor可以通过getbytesread()和getbyteswritten()方法获得compressor输
入未压缩字节的总数和输出压缩字节的总数,如实例中最后一行的输出语句。compressor和decompressor的类图如图所示。
compressor.end()方法用于关闭解压缩器并放弃所有未处理的输入;reset()方法用于重置压缩器,以处理新的输入数据集合;reinit()方法更进一步允许使用hadoop的配置系统,重置并重新配置压缩器。
3. 压缩流和解压缩流
java最初版本的输入/输出系统是基于流的,流抽象了任何有能力产出数据的数据源,或者是有能力接收数据的接收端。一般来说,通过设计模式装饰,
可以为流添加一些额外的功能,如前面提及的序列化流objectinputstream和objectoutputstream。
压缩流(compressionoutputstream)和解压缩流(compressioninputstream)是hadoop压缩框架中
的另一对重要概念,它提供了基于流的压缩解压缩能力。如图3-7所示是从java.io.inputstream和
java.io.outputstream开始的类图。
这里只分析和压缩相关的代码,即compressionoutputstream及其子类。
outputstream是一个抽象类,提供了进行流输出的基本方法,它包含三个write成员函数,分别用于往流中写入一个字节、一个字节数组或一个字节数组的一部分(需要提供起始偏移量和长度)。
注意:流实现中一般需要支持的close()和flush()方法,是java.io包中的相应接口的成员函数,不是outputstream的成员函数。
compressionoutputstream继承自outputstream,也是个抽象类。如前面提到的
objectoutputstream、compressionoutputstream为其他流添加了附加额外的压缩功能,其他流保存在类的成员变量
out中,并在构造的时候被赋值。
compressionoutputstream实现了outputstream的close()方法和flush()方法,但用于输出数据的
write()方法、用于结束压缩过程并将输入写到底层流的finish()方法和重置压缩状态的resetstate()方法还是抽象方法,需要
compressionoutputstream的子类实现。相关代码如下:
compressionoutputstream规定了压缩流的对外接口,如果已经有了一个压缩器的实现,能否提供一个通用的、使用压缩器的压缩流实现呢?答案是肯定的,compressorstream使用压缩器实现了一个通用的压缩流,其主要代码如下:
compressorstream提供了几个不同的构造函数,用于初始化相关的成员变量。上述代码片段中保留了参数最多的构造函数,其
中,compressorstream需要的底层输出流out和压缩时使用的压缩器,都作为参数传入构造函数。另一个参数是
compressorstream工作时使用的缓冲区buffer的大小,构造时会利用这个参数分配该缓冲区。
compressorstream.write()方法用于将待压缩的数据写入流中。待压缩的数据在进行一番检查后,最终调用压缩器的
setinput()方法进入压缩器。setinput()方法调用结束后,通过compressor.needsinput()判断是否需要调用
compress()方法,获取压缩后的输出数据。上一节已经讨论了这个问题,如果内部缓冲区已满,则需要通过compress()方法提取数据,提取后
的数据直接通过底层流的write()方法输出。
当finish()被调用(往往是compressorstream被关闭),这时compressorstream流调用压缩器的finish()方法通知输入已经结束,然后进入另一个循环,该循环不断读取压缩器中未读取的数据,然后输出到底层流out中。
compressorstream中的其他方法,如resetstate()和close()都比较简单,不再一一介绍了。
compressorstream利用压缩器compressor实现了一个通用的压缩流,在hadoop中引入一个新的压缩算法,如果没有特殊的
考虑,一般只需要实现相关的压缩器和解压器,然后通过compressorstream和decompressorstream,就实现相关压缩算法的输
入/输出流了。
compressorstream的实现并不复杂,只需要注意压缩器几个方法间的配合,下图给出了这些方法的一个典型调用顺序:
4. java本地方法
数据压缩往往是计算密集型的操作,考虑到性能,建议使用本地库(native library)来压缩和解压。在某个测试中,与java实现的内置gzip压缩相比,使用本地gzip压缩库可以将解压时间减少50%,而压缩时间大概减少10%。
hadoop的deflate、gzip和snappy都支持算法的本地实现,其中apache发行版中还包含了deflate和gzip的32位
和64位linux本地压缩库(cloudera发行版还包括snappy压缩方法)。默认情况下,hadoop会在它运行的平台上查找本地库。
假设有一个c
函数,它实现了某些功能,同时因为某种原因(如效率),使得用户不希望用java语言重新实现该功能,那么java本地方法(native
method)就是一个不错的选择。java提供了一些钩子函数,使得调用本地方法成为可能,同时,jdk也提供了一些工具,协助用户减轻编程负担。
java语言中的关键字native用于表示某个方法为本地方法,显然,本地方法是类的成员方法。下
面是一个本地方法的例子,代码片段来自cloudera的snappy压缩实现,在
org.apache.hadoop.io.compress.snappy包中。其中,静态方法initids()和方法
compressbytesdirect()用关键字native修饰,表明这是一个java本地方法。相关代码如下: