基于HBase的MapReduce实现大量邮件信息统计分析

一:概述

在大多数情况下,如果使用MapReduce进行batch处理,文件一般是存储在HDFS上的,但这里有个很重要的场景不能忽视,那就是对于大量的小文件的处理(此处小文件没有确切的定义,一般指文件大小比较小,比如5M以内的文件),而HDFS的文件块一般是64M,这将会影响到HDFS的性能,因为小文件过多,那么NameNode需要保存的文件元信息将占用更多的空间,加大NameNode的负载进而影响性能,假如对于每个文件,在NameNode中保存的元数据大小是100字节,那么1千万这样的小文件,将占用10亿byte,约1G的内存空间,目前有以下几种对于众多小文件的处理方法:

HAR File方式,将小文件合并成大文件

SequenceFile方式,以文件名为key,文件内容为value,生成一个序列文件

以HBase作为小文件的存储,rowkey使用文件名,列族单元保存文件内容,文件后缀名等信息

在本文中的案例,就是采用第三种方法

二:实现

1:邮件格式如下,为了简单起见及安全性,这里作了简化,每一封这样的邮件,大小将近15k左右

2:HBase表

需要创建2张HBase表格,一张保存邮件文件,另外一张保存MapReduce的输出结果,在hbase shell中分别创建:

create ‘email‘, {NAME=>‘f1‘, VERSIONS=>2}
create ‘summary‘, {NAME=>‘f1‘, VERSIONS=>2}

3:邮件文件导入到HBase中

请参考上篇文章 将文件以API方式导入到HBase(小文件处理),此处,假设是把所有的每天已经生产的邮件文件,从本地导入到HBase中,另外一种方案是创建一个独立的RESTful API,供第三方程序调用,将邮件信息写入到HBase中

4:MapReduce

  • 在IDEA中创建Maven工程

  • 修改pom.xml文件,添加依赖hbase-client及hbase-server
<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"
         xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
         xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
    <modelVersion>4.0.0</modelVersion>

    <groupId>mronhbase</groupId>
    <artifactId>mronhbase</artifactId>
    <version>1.0-SNAPSHOT</version>

    <repositories>
        <repository>
            <id>apache</id>
            <url>http://maven.apache.org</url>
        </repository>
    </repositories>

    <dependencies>
        <dependency>
            <groupId>org.apache.hbase</groupId>
            <artifactId>hbase-client</artifactId>
            <version>1.2.6</version>
        </dependency>

        <dependency>
            <groupId>org.apache.hbase</groupId>
            <artifactId>hbase-server</artifactId>
            <version>1.2.6</version>
        </dependency>
    </dependencies>

    <build>
        <plugins>
            <plugin>
                <groupId>org.apache.maven.plugins</groupId>
                <artifactId>maven-shade-plugin</artifactId>
                <executions>
                    <!-- Run shade goal on package phase -->
                    <execution>
                        <phase>package</phase>
                        <goals>
                            <goal>shade</goal>
                        </goals>
                        <configuration>
                            <filters>
                                <filter>
                                    <!-- Do not copy the signatures in the META-INF folder.
                                    Otherwise, this might cause SecurityExceptions when using the JAR. -->
                                    <artifact>*:*</artifact>
                                    <excludes>
                                        <exclude>META-INF/*.SF</exclude>
                                        <exclude>META-INF/*.DSA</exclude>
                                        <exclude>META-INF/*.RSA</exclude>
                                    </excludes>
                                </filter>
                            </filters>

                            <transformers>
                                <transformer implementation="org.apache.maven.plugins.shade.resource.ManifestResourceTransformer">
                                    <mainClass>jiecxy.App</mainClass>
                                </transformer>
                            </transformers>

                            <createDependencyReducedPom>false</createDependencyReducedPom>
                        </configuration>
                    </execution>
                </executions>
            </plugin>

            <plugin>
                <groupId>org.apache.maven.plugins</groupId>
                <artifactId>maven-compiler-plugin</artifactId>
                <configuration>
                    <source>1.8</source>
                    <target>1.8</target>
                </configuration>
            </plugin>

        </plugins>
    </build>
</project>
  • 创建java类,先引入必要的包:
package examples;

import java.io.IOException;
import java.util.Iterator;
import java.util.regex.Matcher;
import java.util.regex.Pattern;

import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
import org.apache.hadoop.hbase.util.Bytes;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hbase.mapreduce.TableMapReduceUtil;
import org.apache.hadoop.hbase.client.*;
import org.apache.hadoop.hbase.mapreduce.TableMapper;
import org.apache.hadoop.hbase.mapreduce.TableReducer;
import org.apache.hadoop.mapreduce.Job;
  • 类文件hbasemr,其中包含主程序入口
public class hbasemr {
    public static void main(String[] args) throws IOException, ClassNotFoundException, InterruptedException
    {
        String hbaseTableName1 = "email";
        String hbaseTableName2 = "summary";

        Configuration conf = new Configuration();

        Job job = Job.getInstance(conf);
        job.setJarByClass(hbasemr.class);
        job.setJobName("mronhbase");

        Scan scan = new Scan();
        scan.setCaching(500);
        scan.setCacheBlocks(false);

        TableMapReduceUtil.initTableMapperJob(hbaseTableName1, scan, MyMapper.class, Text.class, IntWritable.class, job);
        TableMapReduceUtil.initTableReducerJob(hbaseTableName2, MyReducer.class, job);
        System.exit(job.waitForCompletion(true) ? 1 : 0);
    }

    public static String getSubString(String value,String rgex){
        Pattern pattern = Pattern.compile(rgex);
        Matcher m = pattern.matcher(value);
        while(m.find()){
            return m.group(1);
        }
        return "";
    }
}
  • 添加Mapper类及Reducer类:

Hbases实现了TableMapper类及TableReducer类,

创建MyMapper

   public static class MyMapper extends TableMapper<Text, IntWritable>{
        private final static IntWritable one = new IntWritable(1);

        public void map(ImmutableBytesWritable key, Result value, Context context)
                throws IOException,InterruptedException
        {
            String rowValue = Bytes.toString(value.getValue("f1".getBytes(),"message".getBytes()));
            if(rowValue !=null) {

                String rgex = "Project Name:(.*?)\\r\\n";
                String temp = hbasemr.getSubString(rowValue,rgex);
                String username = temp.substring(0, temp.indexOf(‘_‘));

                rgex = "Accounts:(.*?)\\r\\n";
                String count = hbasemr.getSubString(rowValue,rgex).trim();

                IntWritable intCount = new IntWritable(Integer.parseInt(count));
                context.write(new Text(username), intCount);
            }
        }
    }

创建MyReducer:

    public static class MyReducer extends TableReducer<Text,IntWritable, NullWritable>
    {
        private IntWritable result = new IntWritable();
        public void reduce(Text key, Iterable<IntWritable> values, Context context) throws IOException, InterruptedException
        {
            int sum = 0;
            Iterator<IntWritable> item = values.iterator();
            while (item.hasNext()) {
                sum += item.next().get();
            }
            this.result.set(sum);

            Put put = new Put(key.toString().getBytes());
            put.addColumn("f1".getBytes(), "count".getBytes(), String.valueOf(sum).getBytes());
            context.write(NullWritable.get(), put);
        }
    }

5:运行与调试

启动Hadoop及HBase,在IDEA 中对MyMapper类及MyReducer类设置好断点,以调试方式运行程序,运行完后,进入到hbase shell查看运行结果

原文地址:https://www.cnblogs.com/benfly/p/8417541.html

时间: 2024-11-07 13:32:44

基于HBase的MapReduce实现大量邮件信息统计分析的相关文章

基于HBase Hadoop 分布式集群环境下的MapReduce程序开发

HBase分布式集群环境搭建成功后,连续4.5天实验客户端Map/Reduce程序开发,这方面的代码网上多得是,写个测试代码非常容易,可是真正运行起来可说是历经挫折.下面就是我最终调通并让程序在集群上运行起来的一些经验教训. 一.首先说一下我的环境: 1,集群的环境配置请见这篇博文. 2,开发客户机环境:操作系统是CentOS6.5,JDK版本是1.7.0-60,开发工具是Eclipse(原始安装是从google的ADT网站下载的ADT专用开发环境,后来加装了Java企业开发的工具,启动Flas

Hbase框架原理及相关的知识点理解、Hbase访问MapReduce、Hbase访问Java API、Hbase shell及Hbase性能优化总结

转自:http://blog.csdn.net/zhongwen7710/article/details/39577431 本blog的内容包含: 第一部分:Hbase框架原理理解 第二部分:Hbase调用MapReduce函数使用理解 第三部分:Hbase调用Java API使用理解 第四部分:Hbase Shell操作 第五部分:Hbase建表.读写操作方式性能优化总结 第一部分:Hbase框架原理理解 概述 HBase是一个构建在HDFS上的分布式列存储系统:HBase是基于Google

HQueue:基于HBase的消息队列

HQueue:基于HBase的消息队列 凌柏 ?1. HQueue简介 HQueue是一淘搜索网页抓取离线系统团队基于HBase开发的一套分布式.持久化消息队列.它利用HTable存储消息数据,借助HBase Coprocessor将原始的KeyValue数据封装成消息数据格式进行存储,并基于HBase Client API封装了HQueue Client API用于消息存取. HQueue可以有效使用在需要存储时间序列数据.作为MapReduce Job和iStream等输入.输出供上下游共享

基于HBase的冠字号查询系统1--理论部分

1. 软件版本和部署 maven:3.3.9,jdk:1.7 ,Struts2:2.3.24.1,hibernate:4.3.6,spring:4.2.5,MySQL:5.1.34,Junit:4,Myeclipse:2014: Hadoop2.6.4,HBase1.1.2 源码下载:https://github.com/fansy1990/ssh_v3/releases 部署参考:http://blog.csdn.net/fansy1990/article/details/51356583 数

基于HBASE的并行计算架构之rowkey设计篇

1.大数据在HBASE存储.计算以及查询的应用场景 海量数据都是事务数据,事务数据都是在时间的基础上产生的.数据的业务时间可能会顺序产生,也可能不会顺序产生,比如某些事务发生在早上10点,但是在下午5点才结束闭并生成出来,这样的数据就会造成存储加载时的时间连续性.另外海量数据的挖掘后产生的是统计数据,统计数据也有时间属性,统计数据如果进行保存必须保证在统计计算之后数据尽量不再变化,如果统计发生后又有新的事务数据产生,那么将重新触发统计计算然后重新保存覆盖原有已经存储的数据.其它数据则主要是以配置

基于Lumisoft.NET组件的POP3邮件接收和删除操作

Lumisoft.NET组件是一个非常强大的邮件发送.邮件接收等功能的开源组件,一般用它来处理邮件的相关操作,是非常合适的.之前也写过一些该组件的随笔文章,不过主要是利用来发送邮件居多,最近由于项目需要,需要利用该组件来接收邮件,邮件通过POP3协议进行接收到本地,故对该组件进行了全面的了解和使用.本文主要是在此背景上,介绍该组件的POP3协议处理类的使用.Lumisoft.NET组件2013年作者有做了一定的更新,修复了一些问题,本文是基于该组件的最新版本进行开发使用. 1.POP3登录及头部

HBase概念学习(七)HBase与Mapreduce集成

这篇文章是看了HBase权威指南之后,根据上面的讲解搬下来的例子,但是稍微有些不一样. HBase与mapreduce的集成无非就是mapreduce作业以HBase表作为输入,或者作为输出,也或者作为mapreduce作业之间共享数据的介质. 这篇文章将讲解两个例子: 1.读取存储在hdfs上的txt文本数据,简单地以json字符串的形式存储到HBase表中. 2.将第一步存储的HBase表中的json字符串读取出来,解析存储到新的HBase表中,可以进行查询. 本文详细给出了源码以及如何运行

基于HBase的冠字号查询系统2--实现部分

1. 软件版本和部署 maven:3.3.9,jdk:1.7 ,Struts2:2.3.24.1,hibernate:4.3.6,spring:4.2.5,MySQL:5.1.34,Junit:4,Myeclipse:2014: Hadoop2.6.4,HBase1.1.2 源码下载:https://github.com/fansy1990/ssh_v3/releases 部署参考:http://blog.csdn.net/fansy1990/article/details/51356583 数

一种基于HBase韵海量图片存储技术

针对海量图片存储,已有若干个基于Hadoop的方案被设计出来.这些方案在系统层小文件合并.全局名字空间以及通用性方面存在不足.本文基于HBase提出了一种海量图片存储技术,成功解决了上述问题.本文将介绍基于HBase海量图片存储技术方案,分析其原理及优势,该方案在城市交通监控中得到应用验证. 随着互联网.云计算及大数据等信息技术的发展,越来越多的应用依赖于对海量数据的存储和处理,如智能监控.电子商务.地理信息等,这些应用都需要对海量图片的存储和检索.由于图片大多是小文件(80%大小在数MB以内)