http://marsorp.iteye.com/blog/1559613
文件压缩带来了两大益处1)减少存贮空间2)加速网络(磁盘)传输。基于大数据的传输,都需要经过压缩处理。
压缩格式
压缩格式 工具
算法 文件扩展名 可分块
DEFLATE N/A DEFLATE .deflate No
gzip gzip DEFLATE .gz No
bzip2 bzip2 bzip2 .bz2 Yes
LZO lzop LZO .lzo No
Snappy N/A Snappy .snappy No
压缩及
解压缩
文件解压实例
Java代码 复制代码 收藏代码
1.package com.bigdata.io;
2.
3.import java.io.IOException;
4.import java.io.InputStream;
5.import java.io.OutputStream;
6.import java.net.URI;
7.
8.import org.apache.
hadoop.conf.Configuration;
9.import org.apache.hadoop.fs.FileSystem;
10.import org.apache.hadoop.fs.Path;
11.import org.apache.hadoop.io.IOUtils;
12.import org.apache.hadoop.io.compress.CompressionCodec;
13.import org.apache.hadoop.io.compress.CompressionCodecFactory;
14.
15.public
class FileDecompressor {
16.
17. public static void main(String[] args) throws IOException {
18. String uri = args[0];
19. Configuration conf = new Configuration();
20. FileSystem fs = FileSystem.get(URI.create(uri),conf);
21. Path inputPath = new Path(uri);
22.
23. CompressionCodecFactory factory = new CompressionCodecFactory(conf);
24. // io.compression.codecs 定义列表中的一个
25. CompressionCodec codec = factory.getCodec(inputPath);
26. if(null == codec){
27. System.err.println("No codec for " + uri);
28. System.exit(-1);
29. }
30.
31. String outputUri = CompressionCodecFactory.removeSuffix(uri, codec.get
DefaultExtension());
32. InputStream in = null;
33. OutputStream out = null;
34. try{
35. in = codec.createInputStream(fs.open(inputPath));
36. out = fs.create(new Path(outputUri));
37. IOUtils.copyBytes(in, out, conf);
38. }finally{
39. IOUtils.closeStream(in);
40. IOUtils.closeStream(out);
41. }
42. }
43.}
$hadoop jar stream.jar com.bigdata.io.FileDecompressor test.txt.gz
12/06/14 09:48:28 INFO util.NativeCodeLoader: Loaded the native-hadoop library
12/06/14 09:48:28 INFO zlib.ZlibFactory: Successfully loaded & initialized native-zlib library
Native Libraries
Native gzip 库减少解压缩时间在50%,压缩时间在10%(同java实现的压缩算法)
禁用Native Library , 需要设置属性hadoop.native.lib 为 false
Java代码 复制代码 收藏代码
1.package com.bigdata.io;
2.
3.import java.io.IOException;
4.
5.import org.apache.hadoop.conf.Configuration;
6.import org.apache.hadoop.io.IOUtils;
7.import org.apache.hadoop.io.compress.CodecPool;
8.import org.apache.hadoop.io.compress.CompressionCodec;
9.import org.apache.hadoop.io.compress.CompressionOutputStream;
10.import org.apache.hadoop.io.compress.Compressor;
11.import org.apache.hadoop.util.ReflectionUtils;
12.
13.public class PooledStreamCompressor {
14.
15. public static void main(String[] args) throws ClassNotFoundException {
16. String codecClassName = args[0];
17. Class<?> codecClass = Class.forName(codecClassName);
18.
19. Configuration conf = new Configuration();
20. CompressionCodec codec = (CompressionCodec) ReflectionUtils.newInstance(codecClass, conf);
21. Compressor compressor = null;
22.
23. try {
24. compressor = CodecPool.getCompressor(codec);
25. CompressionOutputStream out = codec.createOutputStream(System.out, compressor);
26. IOUtils.copyBytes(System.in, out,4096,false);
27. out.finish();
28. } catch (IOException e) {
29. e.printStackTrace();
30. }finally{
31. CodecPool.returnCompressor(compressor);
32. }
33. }
34.}
echo "Text" | hadoop jar stream.jar com.bigdata.io.PooledStreamCompressor org.apache.hadoop.io.compress.GzipCodec | gunzip -
12/06/14 10:57:49 INFO util.NativeCodeLoader: Loaded the native-hadoop library
12/06/14 10:57:49 INFO zlib.ZlibFactory: Successfully loaded & initialized native-zlib library
12/06/14 10:57:49 INFO compress.CodecPool: Got brand-new compressor
Text
使用压缩格式规则
压缩格式的使用取决于你的应用程序。你想最大化您的应用程序的速度,或者是你更关注关于保持存储成本下降?在一般情况下,你应该尝试不同的策略,根据数据集和
基准测试,找到最好的办法。
对于大文件,如日志文件,选项有:
1.存储成非压缩格式
2.使用一个支持块分割的压缩格式,比如bzip2(当然bzip2比较慢)或者可以使用LZO(被索引后支持块分割)
3.把应用分割成大快,每个大快可以支持任意的压缩格式(不用关心在HDFS中是否可以分割)。在这种情况,
你需要计算一下你的压缩后的尺寸,要同HDFS Block的大小相匹配
4.使用Sequence 文件,支持压缩及分块
5.使用Avro数据文件,支持压缩及分块,就像Sequence文件,但比Sequence要更高级,可以被多种语言支持
对于大文件,你不应该使用不支持分块的压缩。因为,你失去了地方,使MapReduce应用非常没有效率
为了归档目的,考虑Hadoop 归档格式,虽然他不支持压缩。
MapReduce中使用压缩
步骤:
1.设置mapred.output.compress=true
2.设置mapred.output.compression.codec=the classname of the above list
或者
FileOutputFormat.setCompressOutput(job,true)
FileOutputFormat.setOutputCompressorClass(job,GzipCodec.class)
Java代码 复制代码 收藏代码
1.public class MaxTemperatureWithCompression {
2. public static void main(String[] args) throws Exception {
3. if (args.length != 2) {
4. System.err.println("
Usage: MaxTemperatureWithCompression <input path> " +
5. "<output path>");
6. System.exit(-1);
7. }
8. Job job = new Job();
9. job.setJarByClass(MaxTemperature.class);
10. FileInputFormat.addInputPath(job, new Path(args[0]));
11. FileOutputFormat.setOutputPath(job, new Path(args[1]));
12. job.setOutputKeyClass(Text.class);
13. job.setOutputValueClass(IntWritable.class);
14.<span style="color: rgb(255, 0, 0);"> FileOutputFormat.setCompressOutput(job, true);
15. FileOutputFormat.setOutputCompressorClass(job, GzipCodec.class);</span>
16. job.setMapperClass(MaxTemperatureMapper.class);
17. job.setCombinerClass(MaxTemperatureReducer.class);
18. job.setReducerClass(MaxTemperatureReducer.class);
Java代码 复制代码 收藏代码
1.<span style="white-space: normal;"><pre class="java" name="code">System.exit(job.waitForCompletion(true) ? 0 : 1);</pre>
2.</span>
3. }
4.}
复制代码收藏代码
对于SequenceFile格式
默认压缩类型是RECORD,建议更改为BLOCk,压缩一组record
SqeuenceFileOutputFormat.setOutputCompressionType()设置这个压缩类型
MapReduce压缩属性汇总
Property name Type Default value Description
mapred.output.compress boolean false Compress outputs
mapred.output.compression.codec Class name org.apache.hadoop.io.compress.DefaultCodec The compression codec to use for outputs
mapred.output.compression.type String RECORD Type type of compression to use for SequenceFile outputs:NONE,RECORD,or BLOCK.
Map阶段压缩输出(即中间结果是否压缩)
建议使用性能
比较好的LZO、Snappy
Map 输出压缩属性汇总
Property name Type Default value Description
mapred.compress.map.output boolean false Compress map outputs
mapred.map.output.compression.codec Class org.apache.hadoop.io.compress.DefaultCodec The compression codec to use for map outputs.
Map阶段使用压缩样例
Java代码 复制代码 收藏代码
1.Configuration conf = new Configuration();
2.conf.setBoolean("mapred.compress.map.output", true);
3.conf.setClass("mapred.map.output.compression.codec", GzipCodec.class,
4.CompressionCodec.class);
5.Job job = new Job(conf);