大数据技术之压缩解压缩案例

7.10 压缩/解压缩案例

7.10.1 对数据流的压缩和解压缩

CompressionCodec有两个方法可以用于轻松地压缩或解压缩数据。要想对正在被写入一个输出流的数据进行压缩,我们可以使用createOutputStream(OutputStreamout)方法创建一个CompressionOutputStream,将其以压缩格式写入底层的流。相反,要想对从输入流读取而来的数据进行解压缩,则调用createInputStream(InputStreamin)函数,从而获得一个CompressionInputStream,从而从底层的流读取未压缩的数据。

测试一下如下压缩方式:


DEFLATE


org.apache.hadoop.io.compress.DefaultCodec


gzip


org.apache.hadoop.io.compress.GzipCodec


bzip2


org.apache.hadoop.io.compress.BZip2Codec

package com.xyg.mapreduce.compress;
import java.io.File;
import java.io.FileInputStream;
import java.io.FileNotFoundException;
import java.io.FileOutputStream;
import java.io.IOException;
import java.io.InputStream;
import java.io.OutputStream;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IOUtils;
import org.apache.hadoop.io.compress.CompressionCodec;
import org.apache.hadoop.io.compress.CompressionCodecFactory;
import org.apache.hadoop.io.compress.CompressionOutputStream;
import org.apache.hadoop.util.ReflectionUtils;

public class TestCompress {

    public static void main(String[] args) throws Exception, IOException {
//        compress("e:/test.txt","org.apache.hadoop.io.compress.BZip2Codec");
        decompres("e:/test.txt.bz2");
    }

    /*
     * 压缩
     * filername:要压缩文件的路径
     * method:欲使用的压缩的方法(org.apache.hadoop.io.compress.BZip2Codec)
     */
    public static void compress(String filername, String method) throws ClassNotFoundException, IOException {

        // 1 创建压缩文件路径的输入流
        File fileIn = new File(filername);
        InputStream in = new FileInputStream(fileIn);

        // 2 获取压缩的方式的类
        Class codecClass = Class.forName(method);

        Configuration conf = new Configuration();
        // 3 通过名称找到对应的编码/解码器
        CompressionCodec codec = (CompressionCodec) ReflectionUtils.newInstance(codecClass, conf);

        // 4 该压缩方法对应的文件扩展名
        File fileOut = new File(filername + codec.getDefaultExtension());

        OutputStream out = new FileOutputStream(fileOut);
        CompressionOutputStream cout = codec.createOutputStream(out);

        // 5 流对接
        IOUtils.copyBytes(in, cout, 1024 * 1024 * 5, false); // 缓冲区设为5MB

        // 6 关闭资源
        in.close();
        cout.close();
        out.close();
    }

    /*
     * 解压缩
     * filename:希望解压的文件路径
     */
    public static void decompres(String filename) throws FileNotFoundException, IOException {

        Configuration conf = new Configuration();
        CompressionCodecFactory factory = new CompressionCodecFactory(conf);

        // 1 获取文件的压缩方法
        CompressionCodec codec = factory.getCodec(new Path(filename));

        // 2 判断该压缩方法是否存在
        if (null == codec) {
            System.out.println("Cannot find codec for file " + filename);
            return;
        }

        // 3 创建压缩文件的输入流
        InputStream cin = codec.createInputStream(new FileInputStream(filename));

        // 4 创建解压缩文件的输出流
        File fout = new File(filename + ".decoded");
        OutputStream out = new FileOutputStream(fout);

        // 5 流对接
        IOUtils.copyBytes(cin, out, 1024 * 1024 * 5, false);

        // 6 关闭资源
        cin.close();
        out.close();
    }
}

7.10.2 在Map输出端采用压缩

即使你的MapReduce的输入输出文件都是未压缩的文件,你仍然可以对map任务的中间结果输出做压缩,因为它要写在硬盘并且通过网络传输到reduce节点,对其压缩可以提高很多性能,这些工作只要设置两个属性即可,我们来看下代码怎么设置:

给大家提供的hadoop源码支持的压缩格式有:BZip2Codec 、DefaultCodec

package com.xyg.mapreduce.compress;
import java.io.IOException;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.io.compress.BZip2Codec;
import org.apache.hadoop.io.compress.CompressionCodec;
import org.apache.hadoop.io.compress.GzipCodec;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;

public class WordCountDriver {

    public static void main(String[] args) throws IOException, ClassNotFoundException, InterruptedException {

        Configuration configuration = new Configuration();

        // 开启map端输出压缩
        configuration.setBoolean("mapreduce.map.output.compress", true);
        // 设置map端输出压缩方式
        configuration.setClass("mapreduce.map.output.compress.codec", BZip2Codec.class, CompressionCodec.class);

        Job job = Job.getInstance(configuration);

        job.setJarByClass(WordCountDriver.class);

        job.setMapperClass(WordCountMapper.class);
        job.setReducerClass(WordCountReducer.class);

        job.setMapOutputKeyClass(Text.class);
        job.setMapOutputValueClass(IntWritable.class);

        job.setOutputKeyClass(Text.class);
        job.setOutputValueClass(IntWritable.class);

        FileInputFormat.setInputPaths(job, new Path(args[0]));
        FileOutputFormat.setOutputPath(job, new Path(args[1]));

        boolean result = job.waitForCompletion(true);

        System.exit(result ? 1 : 0);
    }
}

2)Mapper保持不变

package com.xyg.mapreduce.compress;
import java.io.IOException;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Mapper;

public class WordCountMapper extends Mapper<LongWritable, Text, Text, IntWritable>{

    @Override
    protected void map(LongWritable key, Text value, Context context)
            throws IOException, InterruptedException {

        String line = value.toString();

        String[] words = line.split(" ");

        for(String word:words){
            context.write(new Text(word), new IntWritable(1));
        }
    }
}

3)Reducer保持不变

package com.xyg.mapreduce.compress;
import java.io.IOException;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Reducer;

public class WordCountReducer extends Reducer<Text, IntWritable, Text, IntWritable>{

    @Override
    protected void reduce(Text key, Iterable<IntWritable> values,
            Context context) throws IOException, InterruptedException {

        int count = 0;

        for(IntWritable value:values){
            count += value.get();
        }

        context.write(key, new IntWritable(count));
    }
}

7.10.3 Reduce输出端采用压缩

基于workcount案例处理

1)修改驱动

package com.xyg.mapreduce.compress;
import java.io.IOException;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.io.compress.BZip2Codec;
import org.apache.hadoop.io.compress.DefaultCodec;
import org.apache.hadoop.io.compress.GzipCodec;
import org.apache.hadoop.io.compress.Lz4Codec;
import org.apache.hadoop.io.compress.SnappyCodec;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;

public class WordCountDriver {

    public static void main(String[] args) throws IOException, ClassNotFoundException, InterruptedException {

        Configuration configuration = new Configuration();

        Job job = Job.getInstance(configuration);

        job.setJarByClass(WordCountDriver.class);

        job.setMapperClass(WordCountMapper.class);
        job.setReducerClass(WordCountReducer.class);

        job.setMapOutputKeyClass(Text.class);
        job.setMapOutputValueClass(IntWritable.class);

        job.setOutputKeyClass(Text.class);
        job.setOutputValueClass(IntWritable.class);

        FileInputFormat.setInputPaths(job, new Path(args[0]));
        FileOutputFormat.setOutputPath(job, new Path(args[1]));

        // 设置reduce端输出压缩开启
        FileOutputFormat.setCompressOutput(job, true);

        // 设置压缩的方式
        FileOutputFormat.setOutputCompressorClass(job, BZip2Codec.class);
//        FileOutputFormat.setOutputCompressorClass(job, GzipCodec.class);
//        FileOutputFormat.setOutputCompressorClass(job, DefaultCodec.class); 

        boolean result = job.waitForCompletion(true);

        System.exit(result?1:0);
    }
}

2)Mapper和Reducer保持不变(详见7.10.2)

原文地址:https://www.cnblogs.com/frankdeng/p/9255935.html

时间: 2024-08-05 13:31:09

大数据技术之压缩解压缩案例的相关文章

大数据技术之日志清洗案例

7.7 日志清洗案例 7.7.1 简单解析版 1)需求: 去除日志中字段长度小于等于11的日志. 2)输入数据 194.237.142.21 - - [18/Sep/2013:06:49:18 +0000] "GET /wp-content/uploads/2013/07/rstudio-git3.png HTTP/1.1" 304 0 "-" "Mozilla/4.0 (compatible;)" 183.49.46.228 - - [18/S

基于大数据技术推荐系统算法案例实战视频教程(项目实战)

38套大数据,云计算,架构,数据分析师,Hadoop,Spark,Storm,Kafka,人工智能,机器学习,深度学习,项目实战视频教程 视频课程包含: 38套大数据和人工智能精品高级课包含:大数据,云计算,架构,数据挖掘实战,实时推荐系统实战,电视收视率项目实战,实时流统计项目实战,离线电商分析项目实战,Spark大型项目实战用户分析,智能客户系统项目实战,Linux基础,Hadoop,Spark,Storm,Docker,Mapreduce,Kafka,Flume,OpenStack,Hiv

大数据技术之MapReduce中多表合并案例

大数据技术之MapReduce中多表合并案例 1)需求: 订单数据表t_order: id pid amount 1001 01 1 1002 02 2 1003 03 3 订单数据order.txt 1001 01 1 1002 02 2 1003 03 3 1004 01 4 1005 02 5 1006 03 6 商品信息表t_product pid pname 01 小米 02 华为 03 格力 商品数据pd.txt 01 小米 02 华为 03 格力 将商品信息表中数据根据商品pid合

大数据技术之Hadoop入门

? 第1章 大数据概论 1.1 大数据概念 大数据概念如图2-1 所示. 图2-1 大数据概念 1.2 大数据特点(4V) 大数据特点如图2-2,2-3,2-4,2-5所示 图2-2 大数据特点之大量 图2-3 大数据特点之高速 图2-4 大数据特点之多样 图2-5 大数据特点之低价值密度 1.3 大数据应用场景 大数据应用场景如图2-6,2-7,2-8,2-9,2-10,2-11所示 图2-6 大数据应用场景之物流仓储 图2-7 大数据应用场景之零售 图2-8 大数据应用场景之旅游 图2-9

【教程分享】基于Greenplum Hadoop分布式平台的大数据解决方案及商业应用案例剖析

基于Greenplum Hadoop分布式平台的大数据解决方案及商业应用案例剖析 课程讲师:迪伦 课程分类:Java 适合人群:高级 课时数量:96课时 用到技术:MapReduce.HDFS.Map-Reduce.Hive.Sqoop 涉及项目:Greenplum Hadoop大数据分析平台 更新程度:完毕 对这个课程有兴趣的朋友可以加我的QQ2059055336和我联系 下载地址:链接:   pan.baidu.com/s/1nthYpKH 密码: niyi 随着云计算.大数据迅速发展,亟需

大数据学习路线图 让你精准掌握大数据技术学习?

大数据指不用随机分析法这样捷径,而采用所有数据进行分析处理的方法.互联网时代每个企业每天都要产生庞大的数据,对数据进行储存,对有效的数据进行挖掘分析并应用需要依赖于大数据开发,大数据开发课程采用真实商业数据源并融合云计算+机器学习,让学员有实力入职一线互联网企业. 今天小编的技术分享详细学习大数据的精准路线图,学好大数据就还得靠专业的工具. 大数据学习QQ群:119599574 阶段一. Java语言基础 Java开发介绍.熟悉Eclipse开发工具.Java语言基础.Java流程控制.Java

大数据学习路线 让你精准掌握大数据技术学习

大数据指不用随机分析法这样捷径,而采用所有数据进行分析处理的方法.互联网时代每个企业每天都要产生庞大的数据,对数据进行储存,对有效的数据进行挖掘分析并应用需要依赖于大数据开发,大数据开发课程采用真实商业数据源并融合云计算+机器学习,让学员有实力入职一线互联网企业. 今天小编的技术分享详细学习大数据的精准路线图,学好大数据就还得靠专业的工具. 阶段一. Java语言基础 Java开发介绍.熟悉Eclipse开发工具.Java语言基础.Java流程控制.Java字符串.Java数组与类和对象.数字处

《Spark大数据分析:核心概念、技术及实践》大数据技术一览

本节书摘来自华章出版社<Spark大数据分析:核心概念.技术及实践>一书中的第1章,第1节,作者穆罕默德·古勒(Mohammed Guller)更多章节内容可以访问云栖社区"华章计算机"公众号查看. 大数据技术一览 我们正处在大数据时代.数据不仅是任何组织的命脉,而且在指数级增长.今天所产生的数据比过去几年所产生的数据大好几个数量级.挑战在于如何从数据中获取商业价值.这就是大数据相关技术想要解决的问题.因此,大数据已成为过去几年最热门的技术趋势之一.一些非常活跃的开源项目都

周鸿祎:以大数据技术对抗大数据平台安全威胁

1月,中国大陆境内所有通用顶级域(.com/.net/.org等)解析出现问题,所有相关域名均被指向一个位于美国的IP地址(65.49.2.178),导致数千万网民在数小时内无法访问网站. 4月,OpenSSL"心脏出血(Heartbleed)"重大安全漏洞被曝光,这一漏洞让黑客能够读取服务器系统的运行内存.有业内人士利用该漏洞在某知名电商网站上测试时,成功获得多位用户的账号及密码,并成功登陆网站. 9月,"iCloud艳照门"事件爆发,数百张好莱坞女演员不雅照在网