Hbase调用JavaAPI实现批量导入操作

将手机上网日志文件批量导入到Hbase中,操作步骤:

1、将日志文件(请下载附件)上传到HDFS中,利用hadoop的操作命令上传:hadoop  fs -put input  /

2、创建Hbase表,通过Java操作

Java代码  

  1. package com.jiewen.hbase;
  2. import java.io.IOException;
  3. import org.apache.hadoop.conf.Configuration;
  4. import org.apache.hadoop.hbase.HBaseConfiguration;
  5. import org.apache.hadoop.hbase.HColumnDescriptor;
  6. import org.apache.hadoop.hbase.HTableDescriptor;
  7. import org.apache.hadoop.hbase.client.Get;
  8. import org.apache.hadoop.hbase.client.HBaseAdmin;
  9. import org.apache.hadoop.hbase.client.HTable;
  10. import org.apache.hadoop.hbase.client.Put;
  11. import org.apache.hadoop.hbase.client.Result;
  12. import org.apache.hadoop.hbase.client.ResultScanner;
  13. import org.apache.hadoop.hbase.client.Scan;
  14. import org.apache.hadoop.hbase.util.Bytes;
  15. public class HbaseDemo {
  16. public static void main(String[] args) throws IOException {
  17. String tableName = "wlan_log";
  18. String columnFamily = "cf";
  19. HbaseDemo.create(tableName, columnFamily);
  20. // HbaseDemo.put(tableName, "row1", columnFamily, "cl1", "data");
  21. // HbaseDemo.get(tableName, "row1");
  22. // HbaseDemo.scan(tableName);
  23. // HbaseDemo.delete(tableName);
  24. }
  25. // hbase操作必备
  26. private static Configuration getConfiguration() {
  27. Configuration conf = HBaseConfiguration.create();
  28. conf.set("hbase.rootdir", "hdfs://hadoop1:9000/hbase");
  29. // 使用eclipse时必须添加这个,否则无法定位
  30. conf.set("hbase.zookeeper.quorum", "hadoop1");
  31. return conf;
  32. }
  33. // 创建一张表
  34. public static void create(String tableName, String columnFamily)
  35. throws IOException {
  36. HBaseAdmin admin = new HBaseAdmin(getConfiguration());
  37. if (admin.tableExists(tableName)) {
  38. System.out.println("table exists!");
  39. } else {
  40. HTableDescriptor tableDesc = new HTableDescriptor(tableName);
  41. tableDesc.addFamily(new HColumnDescriptor(columnFamily));
  42. admin.createTable(tableDesc);
  43. System.out.println("create table success!");
  44. }
  45. }
  46. // 添加一条记录
  47. public static void put(String tableName, String row, String columnFamily,
  48. String column, String data) throws IOException {
  49. HTable table = new HTable(getConfiguration(), tableName);
  50. Put p1 = new Put(Bytes.toBytes(row));
  51. p1.add(Bytes.toBytes(columnFamily), Bytes.toBytes(column), Bytes
  52. .toBytes(data));
  53. table.put(p1);
  54. System.out.println("put‘" + row + "‘," + columnFamily + ":" + column
  55. + "‘,‘" + data + "‘");
  56. }
  57. // 读取一条记录
  58. public static void get(String tableName, String row) throws IOException {
  59. HTable table = new HTable(getConfiguration(), tableName);
  60. Get get = new Get(Bytes.toBytes(row));
  61. Result result = table.get(get);
  62. System.out.println("Get: " + result);
  63. }
  64. // 显示所有数据
  65. public static void scan(String tableName) throws IOException {
  66. HTable table = new HTable(getConfiguration(), tableName);
  67. Scan scan = new Scan();
  68. ResultScanner scanner = table.getScanner(scan);
  69. for (Result result : scanner) {
  70. System.out.println("Scan: " + result);
  71. }
  72. }
  73. // 删除表
  74. public static void delete(String tableName) throws IOException {
  75. HBaseAdmin admin = new HBaseAdmin(getConfiguration());
  76. if (admin.tableExists(tableName)) {
  77. try {
  78. admin.disableTable(tableName);
  79. admin.deleteTable(tableName);
  80. } catch (IOException e) {
  81. e.printStackTrace();
  82. System.out.println("Delete " + tableName + " 失败");
  83. }
  84. }
  85. System.out.println("Delete " + tableName + " 成功");
  86. }
  87. }

3、将日志文件导入Hbase表wlan_log中:

Java代码  

  1. import java.text.SimpleDateFormat;
  2. import java.util.Date;
  3. import org.apache.hadoop.conf.Configuration;
  4. import org.apache.hadoop.hbase.client.Put;
  5. import org.apache.hadoop.hbase.mapreduce.TableOutputFormat;
  6. import org.apache.hadoop.hbase.mapreduce.TableReducer;
  7. import org.apache.hadoop.hbase.util.Bytes;
  8. import org.apache.hadoop.io.LongWritable;
  9. import org.apache.hadoop.io.NullWritable;
  10. import org.apache.hadoop.io.Text;
  11. import org.apache.hadoop.mapreduce.Counter;
  12. import org.apache.hadoop.mapreduce.Job;
  13. import org.apache.hadoop.mapreduce.Mapper;
  14. import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
  15. import org.apache.hadoop.mapreduce.lib.input.TextInputFormat;
  16. public class HbaseBatchImport {
  17. public static void main(String[] args) throws Exception {
  18. final Configuration configuration = new Configuration();
  19. // 设置zookeeper
  20. configuration.set("hbase.zookeeper.quorum", "hadoop1");
  21. // 设置hbase表名称
  22. configuration.set(TableOutputFormat.OUTPUT_TABLE, "wlan_log");
  23. // 将该值改大,防止hbase超时退出
  24. configuration.set("dfs.socket.timeout", "180000");
  25. final Job job = new Job(configuration, "HBaseBatchImport");
  26. job.setMapperClass(BatchImportMapper.class);
  27. job.setReducerClass(BatchImportReducer.class);
  28. // 设置map的输出,不设置reduce的输出类型
  29. job.setMapOutputKeyClass(LongWritable.class);
  30. job.setMapOutputValueClass(Text.class);
  31. job.setInputFormatClass(TextInputFormat.class);
  32. // 不再设置输出路径,而是设置输出格式类型
  33. job.setOutputFormatClass(TableOutputFormat.class);
  34. FileInputFormat.setInputPaths(job, "hdfs://hadoop1:9000/input");
  35. job.waitForCompletion(true);
  36. }
  37. static class BatchImportMapper extends
  38. Mapper<LongWritable, Text, LongWritable, Text> {
  39. SimpleDateFormat dateformat1 = new SimpleDateFormat("yyyyMMddHHmmss");
  40. Text v2 = new Text();
  41. protected void map(LongWritable key, Text value, Context context)
  42. throws java.io.IOException, InterruptedException {
  43. final String[] splited = value.toString().split("\t");
  44. try {
  45. final Date date = new Date(Long.parseLong(splited[0].trim()));
  46. final String dateFormat = dateformat1.format(date);
  47. String rowKey = splited[1] + ":" + dateFormat;
  48. v2.set(rowKey + "\t" + value.toString());
  49. context.write(key, v2);
  50. } catch (NumberFormatException e) {
  51. final Counter counter = context.getCounter("BatchImport",
  52. "ErrorFormat");
  53. counter.increment(1L);
  54. System.out.println("出错了" + splited[0] + " " + e.getMessage());
  55. }
  56. };
  57. }
  58. static class BatchImportReducer extends
  59. TableReducer<LongWritable, Text, NullWritable> {
  60. protected void reduce(LongWritable key,
  61. java.lang.Iterable<Text> values, Context context)
  62. throws java.io.IOException, InterruptedException {
  63. for (Text text : values) {
  64. final String[] splited = text.toString().split("\t");
  65. final Put put = new Put(Bytes.toBytes(splited[0]));
  66. put.add(Bytes.toBytes("cf"), Bytes.toBytes("date"), Bytes
  67. .toBytes(splited[1]));
  68. // 省略其他字段,调用put.add(....)即可
  69. context.write(NullWritable.get(), put);
  70. }
  71. };
  72. }
  73. }

4、查看导入结果:

时间: 2024-08-29 03:33:21

Hbase调用JavaAPI实现批量导入操作的相关文章

hbase 结合MapReduce 批量导入

hbase结合Mapreduce的批量导入: 直接给出代码讲述:(具体操作结合代码中的注释) package hbase; import java.io.IOException; import java.net.URISyntaxException; import java.text.SimpleDateFormat; import java.util.Date; import org.apache.hadoop.conf.Configuration; import org.apache.had

JEPLUS平台Excel批量导入的操作配置——JEPLUS软件快速开发平台

JEPLUS平台Excel批量导入的操作配置 JEPLUS平台支持Excel数据的批量导入,但是很多客户不明白批量导入怎么配置,今天这个笔记就简单说一下JEPLUS的Excel数据批量导入怎么来配置. 一.效果展示 二.Excel数据批量导入操作的配置过程 1.添加Excel数据批量导入操作需要的按钮 找到目标功能,添加业务需求说需要的按钮,操作如图: 2.为新添加的按钮编写业务所需要的JS事件 Excel数据批量导入时应该按照平台规定的模板来添加,首先还是为"下载模板"这个按钮添加J

使用BulkLoad从HDFS批量导入数据到HBase

在向Hbase中写入数据时,常见的写入方法有使用HBase API,Mapreduce批量导入数据,使用这些方式带入数据时,一条数据写入到HBase数据库中的大致流程如图. 数据发出后首先写入到雨鞋日志WAl中,写入到预写日志中之后,随后写入到内存MemStore中,最后在Flush到Hfile中.这样写数据的方式不会导致数据的丢失,并且道正数据的有序性,但是当遇到大量的数据写入时,写入的速度就难以保证.所以,介绍一种性能更高的写入方式BulkLoad. 使用BulkLoad批量写入数据主要分为

在Exchange 2010中批量导入用户

在某一个项目中,客户需要将2000多个联系人导入到Exchange 2010中,作为外部邮件联系人对MAPI连接的用户可见,而且要划分到特定的地址簿中去.客户提供excel列表以便导入使用. 由于导入操作需要使用CSV格式文件,所以要预先将excel修改.打开excel,然后将文件另存为CSV后缀的即可.需要注意的是,另存为只对单个工作区起作用. 导入操作对CSV文件的字段要求,最基本的需要4个字段:displayName,mail,DN,objectClass. displayName:显示名

HBase结合MapReduce批量导入(HDFS中的数据导入到HBase)

HBase结合MapReduce批量导入 1 package hbase; 2 3 import java.text.SimpleDateFormat; 4 import java.util.Date; 5 6 import org.apache.hadoop.conf.Configuration; 7 import org.apache.hadoop.hbase.client.Put; 8 import org.apache.hadoop.hbase.mapreduce.TableOutput

Hbase笔记:批量导入

工作中可能会有对HBase的复杂操作,我们现在对HBase的操作太简单了.复杂操作一般用HBaseScan操作,还有用框架对HBase进行复杂操作,iparler,sharker.我们说HBase是数据库,数据库是用来查询数据的,那么我们的数据怎么进入HBase呢,可以通过put,但是put有点儿慢,通常我们的数据都是位于hdfs中,我们期望把hdfs中的数据导入到HBase中,进行查询,下面就讲如何把HDFS中的数据导入到HBase,我们使用m/r导入,这也就是我们说的批量导入-BatchIm

Hadoop之——HBASE结合MapReduce批量导入数据

转载请注明出处:http://blog.csdn.net/l1028386804/article/details/46463889 废话不多说.直接上代码,你懂得 package hbase; import java.text.SimpleDateFormat; import java.util.Date; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.hbase.client.Put; import

数据批量导入HBase

测试数据: datas 1001 lilei 17 13800001111 1002 lily 16 13800001112 1003 lucy 16 13800001113 1004 meimei 16 13800001114 数据批量导入使用mr,先生成HFile文件然后在用completebulkload工具导入. 1.需要先在hbase 创建表名: hbase> create 'student', {NAME => 'info'} maven pom.xml配置文件如下: <de

HBase结合MapReduce批量导入

2016年5月14日13:17:05 作者:数据分析玩家 Hbase是Hadoop生态体系配置的数据库,我们可以通过HTable api中的put方法向Hbase数据库中插入数据,但是由于put效率太低,不能批量插入大量的数据,文本将详细介绍如何通过MapReduce运算框架向Hbase数据库中导入数据. 开篇先介绍业务场景:将电信手机上网日志中的数据导入到Hbase数据库中,将部分数据以及相应字段描述列出: 图片格式描述: 先介绍一个日期格式的转换: public class TestDate