mapreduce实现从hbase中统计数据,结果存入mysql中

最近开始学习使用mapreduce统计hbase中的数据,并将结果集存入mysql中,供前台查询使用。

使用hadoop版本为2.5.1,hbase版本为0.98.6.1

mapreduce程序分为三个部分:job、map函数、reduce函数

job类:

 1 public class DayFaultStatisticsJob {
 2     private static final Logger logger = LoggerFactory.getLogger(DayFaultStatisticsJob.class);
 3
 4     public void runJob(String start){
 5         try {
 6             logger.info("开始运行mapreduce,统计故障信息");
 7             Configuration config = HBaseConfiguration.create();
 8             // 传递统计条件给map
 9             config.set("search.time.start",start);
10
11             DBConnector dbConnector = DBConnectorUtil.getDBConnector();
12             DBConfiguration.configureDB(config, "com.mysql.jdbc.Driver", "jdbc:mysql://" + dbConnector.getHost() + ":" + dbConnector.getPort() + "/xcloud", dbConnector.getUser(), dbConnector.getPwd());
13
14             Job job = Job.getInstance(config);
15             job.setJarByClass(DayFaultStatisticsJob.class); // class that contains mapper and reducer
16             Scan scan = new Scan();
17             scan.setCaching(500);        // 1 is the default in Scan, which will be bad for MapReduce jobs
18             scan.setCacheBlocks(false);  // don‘t set to true for MR jobs
19             scan.addFamily(GlobalConstants.DEVICE_FAULT_FAMILY_NAME);
20
21             // set other scan attrs
22             // TODO 增加遍历条件
23             /*scan.setStartRow();
24               scan.setStopRow();*/
25             //scan.setFilter();
26
27             TableMapReduceUtil.initTableMapperJob(
28                     GlobalConstants.TABLE_NAME_DEVICE_FAULT,          // input table
29                     scan,                // Scan instance to control CF and attribute selection
30                     DayFaultStatisticsMapper.class,     // mapper class
31                     Text.class,         // mapper output key
32                     IntWritable.class,  // mapper output value
33                     job);
34             job.setReducerClass(DayFaultStatisticsReducer.class);    // reducer class
35             job.setNumReduceTasks(2);    // at least one, adjust as required
36             //FileOutputFormat.setOutputPath(job, new Path("/usr/local/mapreduce"));  // adjust directories as required
37             DBOutputFormat.setOutput(job, GlobalConstants.MYSQL_DEVICE_FAULT_DAY, GlobalConstants.MYSQL_DEVICE_FAULT_DAY_FIELDS);
38             boolean b = job.waitForCompletion(true);
39             if (b) {
40                 logger.info("mapreduce任务正常结束");
41                 System.exit(0);
42             } else {
43                 logger.info("mapreduce任务异常结束");
44                 System.exit(1);
45             }
46         } catch (IOException e) {
47             logger.error(e.getMessage(),e);
48         } catch (InterruptedException e) {
49             logger.error(e.getMessage(),e);
50         } catch (ClassNotFoundException e) {
51             logger.error(e.getMessage(),e);
52         }
53     }
54
55 }

mapper类:

public class DayFaultStatisticsMapper extends TableMapper<Text, IntWritable> {
    private static final Logger logger = LoggerFactory.getLogger(DayFaultStatisticsMapper.class);

    private Text text = new Text();
    private IntWritable ONE = new IntWritable(1);
    HashMap<String,String> conditionMap = new HashMap<String,String>();

    // 接收过滤条件
    protected void setup(Context context) throws IOException,
            InterruptedException {
        conditionMap.put("start",context.getConfiguration().get("search.time.start").trim());
    }

    public void map(ImmutableBytesWritable key, Result value, Context context) throws IOException, InterruptedException {
        String timestamp = null;
        String company_id = null;
        String product_id = null;
        String model_id = null;
        String fault_id = null;
        String province_id = null;
        String city_id = null;
        String district_id = null;
        for(java.util.Map.Entry<byte[], byte[]> val : value.getFamilyMap(Bytes.toBytes("info")).entrySet()){
            String ikey = new String(val.getKey());
            String ivalue = new String(val.getValue());
            // 按条件进行map
            if (StringUtil.isEmpty(ikey) || StringUtil.isEmpty(ivalue)){
                 return;
            } else if (ikey.equals("company_id")){
                company_id = ivalue;
                continue;
            } else if (ikey.equals("product_id")){
                product_id = ivalue;
                continue;
            } else if (ikey.equals("model_id")){
                model_id = ivalue;
                continue;
            } else if (ikey.equals("fault_id")){
                fault_id = ivalue;
                continue;
            } else if (ikey.equals("province_id")){
                province_id = ivalue;
                continue;
            } else if (ikey.equals("city_id")){
                city_id = ivalue;
                continue;
            } else if (ikey.equals("district_id")){
                district_id = ivalue;
                continue;
            } else if (ikey.equals("timestamp")){
                String time = ivalue.substring(0,8);// 判断是否是当日
                if (time.equals(conditionMap.get("start"))){
                    timestamp = time; // 统计日故障发生次数
                }
                continue;
            }
        }
        if (company_id == null || product_id ==  null || model_id == null || fault_id == null || timestamp == null || fault_id.equals("-1")) {
            return;
        }
        // 故障码为 -1的不统计
        String val = company_id + "-" +product_id + "-" +model_id + "-"+fault_id + "-"+timestamp + "-"+province_id + "-"+ city_id+ "-"+ district_id; // 分组key
        text.set(val);     // we can only emit Writables...
        context.write(text, ONE);
    }
}

reducer类:

 1 public class DayFaultStatisticsReducer extends Reducer<Text, IntWritable, FaultDay, Text> {
 2     private static final Logger logger = LoggerFactory.getLogger(DayFaultStatisticsReducer.class);
 3
 4     public void reduce(Text key, Iterable<IntWritable> values, Context context) throws IOException, InterruptedException {
 5         int value = 0;
 6         for (IntWritable val : values) {
 7             value += val.get();
 8         }
 9         //company_id + "-" +product_id + "-" +model_id + "-"+fault_id + "-"+timestamp + "-"+province_id + "-"+ city_id+ "-"+ district_id
10         String fields[] = key.toString().split("-");
11         if (fields.length != 8){
12             return;
13         }
14         // 处理day day格式:2015-02-01
15         String day = fields[4].substring(0,4) + "-"  + fields[4].substring(4,6) + "-" + fields[4].substring(6,8);
16         // DateUtil.format(new Date(), new SimpleDateFormat("yyyy-MM-dd‘T‘HH:mm:ss"));
17         FaultDay faultDay = new FaultDay();
18         faultDay.setId(UUIDGen.generate());
19         faultDay.setDay(day);
20         faultDay.setCompany_id(fields[0]);
21         faultDay.setProduct_id(fields[1]);
22         faultDay.setModel_id(fields[2]);
23         faultDay.setFault_id(fields[3]);
24         if (!fields[5].equals("null")){
25             faultDay.setProvince_id(fields[5]);
26         }
27         if (!fields[6].equals("null")){
28             faultDay.setCity_id(fields[6]);
29         }
30         if (!fields[7].equals("null")){
31             faultDay.setDistrict_id(fields[7]);
32         }
33         faultDay.setNum(value);
34         context.write(faultDay, null);
35     }
36
37 }

FaultDay中实现DBWritable接口

  1 public class FaultDay implements Writable, DBWritable {
  2     private String id;
  3
  4     private String day; // 2015-02-01
  5     private String company_id;
  6     private String product_id;
  7     private String model_id;
  8     private String fault_id;
  9     private String province_id;
 10     private String city_id;
 11     private String district_id;
 12     private int num;
 13
 14     @Override
 15     public void write(PreparedStatement statement) throws SQLException {
 16         int index = 1;
 17         statement.setString(index++, this.getId());
 18         statement.setString(index++, this.getDay());
 19         statement.setString(index++, this.getCompany_id());
 20         statement.setString(index++, this.getProduct_id());
 21         statement.setString(index++, this.getModel_id());
 22         statement.setString(index++, this.getFault_id());
 23         statement.setString(index++, this.getProvince_id());
 24         statement.setString(index++, this.getCity_id());
 25         statement.setString(index++, this.getDistrict_id());
 26         statement.setInt(index++, this.getNum());
 27     }
 28
 29     @Override
 30     public void readFields(ResultSet resultSet) throws SQLException {
 31         this.id = resultSet.getString(1);
 32         this.day = resultSet.getString(2);
 33         this.company_id = resultSet.getString(3);
 34         this.product_id = resultSet.getString(4);
 35         this.model_id = resultSet.getString(5);
 36         this.fault_id = resultSet.getString(6);
 37         this.province_id = resultSet.getString(7);
 38         this.city_id = resultSet.getString(8);
 39         this.district_id = resultSet.getString(9);
 40         this.num = resultSet.getInt(10);
 41     }
 42
 43     @Override
 44     public void write(DataOutput out) throws IOException {
 45         //To change body of implemented methods use File | Settings | File Templates.
 46     }
 47
 48     @Override
 49     public void readFields(DataInput in) throws IOException {
 50         //To change body of implemented methods use File | Settings | File Templates.
 51     }
 52
 53
 54     public String getCity_id() {
 55         return city_id;
 56     }
 57
 58     public void setCity_id(String city_id) {
 59         this.city_id = city_id;
 60     }
 61
 62     public String getCompany_id() {
 63         return company_id;
 64     }
 65
 66     public void setCompany_id(String company_id) {
 67         this.company_id = company_id;
 68     }
 69
 70     public String getDay() {
 71         return day;
 72     }
 73
 74     public void setDay(String day) {
 75         this.day = day;
 76     }
 77
 78     public String getDistrict_id() {
 79         return district_id;
 80     }
 81
 82     public void setDistrict_id(String district_id) {
 83         this.district_id = district_id;
 84     }
 85
 86     public String getFault_id() {
 87         return fault_id;
 88     }
 89
 90     public void setFault_id(String fault_id) {
 91         this.fault_id = fault_id;
 92     }
 93
 94     public String getId() {
 95         return id;
 96     }
 97
 98     public void setId(String id) {
 99         this.id = id;
100     }
101
102     public String getModel_id() {
103         return model_id;
104     }
105
106     public void setModel_id(String model_id) {
107         this.model_id = model_id;
108     }
109
110     public int getNum() {
111         return num;
112     }
113
114     public void setNum(int num) {
115         this.num = num;
116     }
117
118     public String getProduct_id() {
119         return product_id;
120     }
121
122     public void setProduct_id(String product_id) {
123         this.product_id = product_id;
124     }
125
126     public String getProvince_id() {
127         return province_id;
128     }
129
130     public void setProvince_id(String province_id) {
131         this.province_id = province_id;
132     }
133 }

主类:

public class FaultStatistics {
    private static final Logger logger = LoggerFactory.getLogger(FaultStatistics.class);

    public static void main(String[] args) {
        // 传递统计条件给map
        String start;
        if ( args.length != 0 && StringUtil.notEmpty(args[0])) {
            start = args[0].substring(0,8);
        } else {
            // 默认为当前时间的前一天
            start = DateUtil.format(DateUtil.getPreDay(new Date()), new SimpleDateFormat("yyyyMMdd"));
        }

        DayFaultStatisticsJob dayFaultStatisticsJob = new DayFaultStatisticsJob();
        dayFaultStatisticsJob.runJob(start);
    }

}

打成jar包,hadoop中运行,可在mysql中查询到运行结果(mysql中要存在对应的表)

时间: 2024-08-06 07:36:45

mapreduce实现从hbase中统计数据,结果存入mysql中的相关文章

Sql Server中的数据类型和Mysql中的数据类型的对应关系(转)

Sql Server中的数据类型和Mysql中的数据类型的对应关系(转):https://blog.csdn.net/lilong329329/article/details/78899477 一.SQL SERVER与MySQL数据存储的差异 1.SQL SERVER中的datetime,保留到微秒(秒后小数点3位),而mysql仅保留到秒,转换后是否会影响业务,如果影响,需要新增一个字段专门来存储微秒或者毫秒,虽然mysql中没有时间数据类型的精度到达微秒或者毫秒,但是mysql提供对微秒的

python将oracle中的数据导入到mysql中。

一.导入表结构.使用工具:navicate premium 和PowerDesinger 1. 先用navicate premium把oracle中的数据库导出为oracle脚本. 2. 在PowerDesinger里找到 File -->> Reverse Engineer --->> Database 将数据库导入到模型. 3  在.PowerDesinger里找到Database"--->"Change Current DBMS" 将数据库

requests从api中获取数据并存放到mysql中

python的requests库是一个非常强大的库,requests的安装方法十分简单,用: pip install requests 即可安装requests,安装成功后: import requests 即可导入requests模块,requests有get和post两种方法: 1.requests.get()用法: url = "http://xxx" a_content = requests.get(url) aa = a_content.content #.content和.

talend 将hbase中数据导入到mysql中

首先,解决talend连接hbase的问题: 公司使用的机器是HDP2.2的机器,上面配置好Hbase服务,在集群的/etc/hbase/conf/hbase-site.xml下,有如下配置: <property> <name>zookeeper.znode.parent</name> <value>/hbase-unsecure</value> </property> 这个配置是决定, Hbase master在zookeeper中

SQL SERVER 使用BULK Insert将txt文件中的数据批量插入表中(1)

1/首先建立数据表 CREATE TABLE BasicMsg( RecvTime FLOAT NOT NULL , --接收时间,不存在时间相同的数据 AA INT NOT NULL, --24位地址码 . FlightID Varchar(10) NULL, --航班号) 2/ 建立存储过程 USE DF17DataProIF EXISTS (SELECT * FROM SYS.PROCEDURES WHERE OBJECT_ID = OBJECT_ID(N'[DBO].[BulkDataP

c#.net循环将DataGridView中的数据赋值到Excel中,并设置样式

Microsoft.Office.Interop.Excel.Application excel =                new Microsoft.Office.Interop.Excel.Application();            excel.SheetsInNewWorkbook = 1;            excel.Workbooks.Add(); //设置Excel列名            excel.Cells[1, 1] = "学号";     

使用OpenXml把Excel中的数据导出到DataSet中

public class OpenXmlHelper { /// <summary> /// 读取Excel数据到DataSet中,默认读取所有Sheet中的数据 /// </summary> /// <param name="filePath">Excel文件路径</param> /// <param name="sheetNames">Sheet名称列表,默认为null查询所有Sheet中的数据<

如何使用免费控件将Word表格中的数据导入到Excel中

我通常使用MS Excel来存储和处理大量数据,但有时候经常会碰到一个问题—我需要的数据存储在word表格中,而不是在Excel中,这样处理起来非常麻烦,尤其是在数据比较庞大的时候, 这时我迫切地需要将word表格中的数据导入到Excel中.相信大家也碰到过同样的问题,下面我就给大家分享一下在C#中如何使用免费控件来实现这一功能.这里,我使用了两个免费API, DocX和Spire.Xls. 有需要的朋友可以下载使用.下载地址: DocX:codeplex官网 Spire.Xls: E-iceb

Python2爬虫获取的数据存储到MySQL中时报错&quot;Incorrect string value: &#39;\\xE6\\x96\\xB0\\xE9\\x97\\xBB&#39; for column &#39;new&#39; at row 1&quot;的解决办法

由于一直使用python3进行编码,在使用Python2时,将爬虫数据连接数据库进行存储时,出现如上的报错,经查资料 是数据库编码问题. 如下转自:http://www.cnblogs.com/liuzhixin/p/6274821.html 的博客,在此感谢博主的慷慨分享之情. 错误原因:我们可以看到错误提示中的字符0xF0 0x9F 0x98 0x84 ,这对应UTF-8编码格式中的4字节编码(UTF-8编码规范).正常的汉字一般不会超过3个字节,为什么为出现4个字节呢?实际上是它对应的是智