HBase结合MapReduce批量导入(HDFS中的数据导入到HBase)

HBase结合MapReduce批量导入

 1 package hbase;
 2
 3 import java.text.SimpleDateFormat;
 4 import java.util.Date;
 5
 6 import org.apache.hadoop.conf.Configuration;
 7 import org.apache.hadoop.hbase.client.Put;
 8 import org.apache.hadoop.hbase.mapreduce.TableOutputFormat;
 9 import org.apache.hadoop.hbase.mapreduce.TableReducer;
10 import org.apache.hadoop.hbase.util.Bytes;
11 import org.apache.hadoop.io.LongWritable;
12 import org.apache.hadoop.io.NullWritable;
13 import org.apache.hadoop.io.Text;
14 import org.apache.hadoop.mapreduce.Counter;
15 import org.apache.hadoop.mapreduce.Job;
16 import org.apache.hadoop.mapreduce.Mapper;
17 import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
18 import org.apache.hadoop.mapreduce.lib.input.TextInputFormat;
19
20 public class BatchImport {
21     static class BatchImportMapper extends
22             Mapper<LongWritable, Text, LongWritable, Text> {
23         SimpleDateFormat dateformat1 = new SimpleDateFormat("yyyyMMddHHmmss");
24         Text v2 = new Text();
25
26         protected void map(LongWritable key, Text value, Context context)
27                 throws java.io.IOException, InterruptedException {
28             final String[] splited = value.toString().split("\t");
29             try {
30                 final Date date = new Date(Long.parseLong(splited[0].trim()));
31                 final String dateFormat = dateformat1.format(date);
32                 String rowKey = splited[1] + ":" + dateFormat;//设置行键:手机号码+日期时间
33                 v2.set(rowKey + "\t" + value.toString());
34                 context.write(key, v2);
35             } catch (NumberFormatException e) {
36                 final Counter counter = context.getCounter("BatchImport",
37                         "ErrorFormat");
38                 counter.increment(1L);
39                 System.out.println("出错了" + splited[0] + " " + e.getMessage());
40             }
41         };
42     }
43
44     static class BatchImportReducer extends
45             TableReducer<LongWritable, Text, NullWritable> {
46         protected void reduce(LongWritable key,
47                 java.lang.Iterable<Text> values, Context context)
48                 throws java.io.IOException, InterruptedException {
49             for (Text text : values) {
50                 final String[] splited = text.toString().split("\t");
51
52                 final Put put = new Put(Bytes.toBytes(splited[0]));//第一列行键
53                 put.add(Bytes.toBytes("cf"), Bytes.toBytes("date"),
54                         Bytes.toBytes(splited[1]));//第二列日期
55                 put.add(Bytes.toBytes("cf"), Bytes.toBytes("msisdn"),
56                         Bytes.toBytes(splited[2]));//第三列手机号码
57                 // 省略其他字段,调用put.add(....)即可
58                 context.write(NullWritable.get(), put);
59             }
60         };
61     }
62
63     public static void main(String[] args) throws Exception {
64         final Configuration configuration = new Configuration();
65         // 设置zookeeper
66         configuration.set("hbase.zookeeper.quorum", "hadoop0");
67         // 设置hbase表名称
68         configuration.set(TableOutputFormat.OUTPUT_TABLE, "wlan_log");//先在shell下创建一个表:create ‘wlan_log‘,‘cf‘
69         // 将该值改大,防止hbase超时退出
70         configuration.set("dfs.socket.timeout", "180000");
71
72         final Job job = new Job(configuration, "HBaseBatchImport");
73
74         job.setMapperClass(BatchImportMapper.class);
75         job.setReducerClass(BatchImportReducer.class);
76         // 设置map的输出,不设置reduce的输出类型
77         job.setMapOutputKeyClass(LongWritable.class);
78         job.setMapOutputValueClass(Text.class);
79
80         job.setInputFormatClass(TextInputFormat.class);
81         // 不再设置输出路径,而是设置输出格式类型
82         job.setOutputFormatClass(TableOutputFormat.class);
83
84         FileInputFormat.setInputPaths(job, "hdfs://hadoop0:9000/input");//将手机上网日志文件上传到HDFS中的input文件中
85
86         job.waitForCompletion(true);
87     }
88 }

在eclipse中将上面代码运行成功后,就可以去HBase shell中查看结果:

时间: 2024-12-21 12:18:35

HBase结合MapReduce批量导入(HDFS中的数据导入到HBase)的相关文章

hbase使用MapReduce操作4(实现将 HDFS 中的数据写入到 HBase 表中)

实现将 HDFS 中的数据写入到 HBase 表中 Runner类 1 package com.yjsj.hbase_mr2; 2 3 import com.yjsj.hbase_mr2.ReadFruitFromHDFSMapper; 4 import com.yjsj.hbase_mr2.WriteFruitMRFromTxtReducer; 5 import org.apache.hadoop.conf.Configuration; 6 import org.apache.hadoop.c

HBase结合MapReduce批量导入

2016年5月14日13:17:05 作者:数据分析玩家 Hbase是Hadoop生态体系配置的数据库,我们可以通过HTable api中的put方法向Hbase数据库中插入数据,但是由于put效率太低,不能批量插入大量的数据,文本将详细介绍如何通过MapReduce运算框架向Hbase数据库中导入数据. 开篇先介绍业务场景:将电信手机上网日志中的数据导入到Hbase数据库中,将部分数据以及相应字段描述列出: 图片格式描述: 先介绍一个日期格式的转换: public class TestDate

Hadoop之——HBASE结合MapReduce批量导入数据

转载请注明出处:http://blog.csdn.net/l1028386804/article/details/46463889 废话不多说.直接上代码,你懂得 package hbase; import java.text.SimpleDateFormat; import java.util.Date; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.hbase.client.Put; import

使用sqoop将MySQL数据库中的数据导入Hbase

使用sqoop将MySQL数据库中的数据导入Hbase 前提:安装好 sqoop.hbase. 下载jbdc驱动:mysql-connector-java-5.1.10.jar 将 mysql-connector-java-5.1.10.jar 拷贝到 /usr/lib/sqoop/lib/ 下 MySQL导入HBase命令: sqoop import --connect jdbc:mysql://10.10.97.116:3306/rsearch --table researchers --h

【甘道夫】通过bulk load将HDFS上的数据导入HBase

引言 通过bulkload将HDFS上的数据装载进HBase是常用的入门级HBase技能,下面简单记录下关键步骤. bulkload的详细情况请参见官网文档. 过程 第一步:每台机器执行 ln -s $HBASE_HOME/conf/hbase-site.xml $HADOOP_HOME/etc/hadoop/hbase-site.xml 第二步:编辑$HADOOP_HOME/etc/hadoop/hadoop-env.sh,拷贝到所有节点 末尾添加: export HADOOP_CLASSPA

Java实现Excel导入数据库,数据库中的数据导入到Excel

实现的功能: Java实现Excel导入数据库,如果存在就更新 数据库中的数据导入到Excel 1.添加jxl.jar mysql-connector-java.1.7-bin.jar包到项目的lib目录下­ 2.Excel文件目录:D://book.xls 3.数据库名:javenforexcel 4.表名:stu 5.编写类:连接mysql的字符串方法.插入的方法.实体类­­ 表结构如下 : 连接数据库的工具类 package com.javen.db; import java.sql.Co

把一个表中的数据导入到另一个表中

最近,需要对表中的数据进行操作.或者将表中的数据导入到另一张表中,或者将表中的数据生成insert脚本. 假如目标表存在 INSERT INTO 目标表名称 SELECT * FROM 来源表 假如目标表不存在 select * into 目标表名称 from 来源表 直接生成insert语句 select 'insert into 目标表 (字段1,字段2) values ('''+字段1+''','''+字段2+''');' sql_str from 目标表;

Linux启动kettle及linux和windows中kettle往hdfs中写数据(3)

在xmanager中的xshell运行进入图形化界面 1 sh spoon.sh 新建一个job 1.往hdfs中写数据 1)linux中kettle往hdfs中写数据 双击hadoop copy files 运行此job 查看数据: 1)windows中kettle往hdfs中写数据 Windows中往power服务器中hdfs写数据 日志: 2016/07/28 16:21:14 - Version checker - OK 2016/07/28 16:21:57 - 数据整合工具-作业设计

如何把Excel中的数据导入到数据库

NPOI: using NPOI.HSSF.UserModel; using NPOI.SS.Formula.Eval; using NPOI.SS.UserModel; using NPOI.XSSF.UserModel; using System; using System.Data; using System.IO; namespace ZZAS.HNYZ.GPSInstallManage.Common { public class ExcelHelper : IDisposable {