hbase 结合MapReduce 批量导入

hbase结合Mapreduce的批量导入:

直接给出代码讲述:(具体操作结合代码中的注释)

package hbase;

import java.io.IOException;
import java.net.URISyntaxException;
import java.text.SimpleDateFormat;
import java.util.Date;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hbase.HBaseConfiguration;
import org.apache.hadoop.hbase.HColumnDescriptor;
import org.apache.hadoop.hbase.HTableDescriptor;
import org.apache.hadoop.hbase.MasterNotRunningException;
import org.apache.hadoop.hbase.ZooKeeperConnectionException;
import org.apache.hadoop.hbase.client.HBaseAdmin;
import org.apache.hadoop.hbase.client.Put;
import org.apache.hadoop.hbase.mapreduce.TableOutputFormat;
import org.apache.hadoop.hbase.mapreduce.TableReducer;
import org.apache.hadoop.hbase.util.Bytes;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Counter;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.input.TextInputFormat;

public class hbaseApp {
	/**
	 * @user XD 基本思路:先创建表 --> 书写MapReduce批量导入
	 */
	static enum Num{
		exNum
	}
	//创建表
	@SuppressWarnings("deprecation")
	public static void createTable() throws MasterNotRunningException, ZooKeeperConnectionException, IOException{
		//配置 必须书写
		Configuration conf = HBaseConfiguration.create();
		String tableName = "wlan";		//表名
		String family_name = "content";			//列族
		conf.set("hbase.rootdir", "hdfs://localhost:9000/hbase");
		conf.set("hbase.zookeeper.quorum","localhost");
		final HBaseAdmin hbaseAdmin = new HBaseAdmin(conf);
		if(!hbaseAdmin.tableExists(tableName)){
			HTableDescriptor tableDescriptor = new HTableDescriptor(tableName);
			HColumnDescriptor family = new HColumnDescriptor(family_name);
			tableDescriptor.addFamily(family);
			hbaseAdmin.createTable(tableDescriptor);
		}
	}
	//导入的文件
	static final String INPUT_PATH = "hdfs://localhost:9000/input1/wlan";

	public static void main(String[] args) throws ClassNotFoundException, IOException, InterruptedException, URISyntaxException {
		hbaseApp.createTable();
		final	Configuration conf = new Configuration();
		conf.set("hbase.rootdir", "hdfs://localhost:9000/hbase");
		conf.set("hbase.zookeeper.quorum","localhost");
		//表名
		conf.set(TableOutputFormat.OUTPUT_TABLE,"wlan");
		conf.set("dfs.socket.timeout", "180000");

		Job job = new Job(conf,hbaseApp.class.getSimpleName());
		FileInputFormat.setInputPaths(job, INPUT_PATH);
		job.setMapperClass(Map.class);

		job.setMapOutputKeyClass(LongWritable.class);
		job.setMapOutputValueClass(Text.class);
		job.setJarByClass(hbaseApp.class);
		job.setReducerClass(Reduce.class);

		//直接创建表 和 导入数据 到hbase里面 所以不需要指定 输出文件路径 输出reducer类型
		job.setInputFormatClass(TextInputFormat.class);
		job.setOutputFormatClass( TableOutputFormat.class);

		job.waitForCompletion(true);
	}
	static class Map extends Mapper <LongWritable , Text , LongWritable , Text >{
		//时间格式
		SimpleDateFormat format1 = new SimpleDateFormat("yy-MM-dd HH:mm:ss");
		private Text v2 = new Text();

		protected void map(LongWritable key , Text value , Context context) throws IOException, InterruptedException{
			final String[] splited = value.toString().split("\t");
			try{
				final Date date = new Date(Long.parseLong(splited[0].trim()));
				final String dateFormat = format1.format(date);
				String rowKey = splited[1]+":"+dateFormat; 		//行键
				v2.set(rowKey + "\t" + value.toString());
				context.write(key, v2);
			}catch(NumberFormatException e){
				final Counter counter = context.getCounter(Num.exNum);
				counter.increment(1L);
				System.out.println("出错"+splited[0]+" "+e.getMessage());
			}
		}
	}
	//注意是TableReducer
	static class Reduce extends TableReducer <LongWritable , Text , NullWritable>{
		protected void reduce(LongWritable key , Iterable<Text>values , Context context) throws IOException, InterruptedException{
			for(Text val : values){
				final String[] splited = val.toString().split("\t");
				final Put put = new Put(Bytes.toBytes(splited[0])); 		//行键
				put.add(Bytes.toBytes("content"),Bytes.toBytes("phone"),Bytes.toBytes(splited[1]));	//列族, 列, 列值
				context.write(NullWritable.get(), put);
			}
		}
	}
}

结果如下:

对应表中的行键 列族 列 列值

时间: 2024-12-30 13:26:24

hbase 结合MapReduce 批量导入的相关文章

HBase结合MapReduce批量导入(HDFS中的数据导入到HBase)

HBase结合MapReduce批量导入 1 package hbase; 2 3 import java.text.SimpleDateFormat; 4 import java.util.Date; 5 6 import org.apache.hadoop.conf.Configuration; 7 import org.apache.hadoop.hbase.client.Put; 8 import org.apache.hadoop.hbase.mapreduce.TableOutput

Hadoop之——HBASE结合MapReduce批量导入数据

转载请注明出处:http://blog.csdn.net/l1028386804/article/details/46463889 废话不多说.直接上代码,你懂得 package hbase; import java.text.SimpleDateFormat; import java.util.Date; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.hbase.client.Put; import

HBase结合MapReduce批量导入

2016年5月14日13:17:05 作者:数据分析玩家 Hbase是Hadoop生态体系配置的数据库,我们可以通过HTable api中的put方法向Hbase数据库中插入数据,但是由于put效率太低,不能批量插入大量的数据,文本将详细介绍如何通过MapReduce运算框架向Hbase数据库中导入数据. 开篇先介绍业务场景:将电信手机上网日志中的数据导入到Hbase数据库中,将部分数据以及相应字段描述列出: 图片格式描述: 先介绍一个日期格式的转换: public class TestDate

Hbase笔记:批量导入

工作中可能会有对HBase的复杂操作,我们现在对HBase的操作太简单了.复杂操作一般用HBaseScan操作,还有用框架对HBase进行复杂操作,iparler,sharker.我们说HBase是数据库,数据库是用来查询数据的,那么我们的数据怎么进入HBase呢,可以通过put,但是put有点儿慢,通常我们的数据都是位于hdfs中,我们期望把hdfs中的数据导入到HBase中,进行查询,下面就讲如何把HDFS中的数据导入到HBase,我们使用m/r导入,这也就是我们说的批量导入-BatchIm

使用BulkLoad从HDFS批量导入数据到HBase

在向Hbase中写入数据时,常见的写入方法有使用HBase API,Mapreduce批量导入数据,使用这些方式带入数据时,一条数据写入到HBase数据库中的大致流程如图. 数据发出后首先写入到雨鞋日志WAl中,写入到预写日志中之后,随后写入到内存MemStore中,最后在Flush到Hfile中.这样写数据的方式不会导致数据的丢失,并且道正数据的有序性,但是当遇到大量的数据写入时,写入的速度就难以保证.所以,介绍一种性能更高的写入方式BulkLoad. 使用BulkLoad批量写入数据主要分为

数据批量导入HBase

测试数据: datas 1001 lilei 17 13800001111 1002 lily 16 13800001112 1003 lucy 16 13800001113 1004 meimei 16 13800001114 数据批量导入使用mr,先生成HFile文件然后在用completebulkload工具导入. 1.需要先在hbase 创建表名: hbase> create 'student', {NAME => 'info'} maven pom.xml配置文件如下: <de

Hbase调用JavaAPI实现批量导入操作

将手机上网日志文件批量导入到Hbase中,操作步骤: 1.将日志文件(请下载附件)上传到HDFS中,利用hadoop的操作命令上传:hadoop  fs -put input  / 2.创建Hbase表,通过Java操作 Java代码   package com.jiewen.hbase; import java.io.IOException; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.hbase.

Hbase框架原理及相关的知识点理解、Hbase访问MapReduce、Hbase访问Java API、Hbase shell及Hbase性能优化总结

转自:http://blog.csdn.net/zhongwen7710/article/details/39577431 本blog的内容包含: 第一部分:Hbase框架原理理解 第二部分:Hbase调用MapReduce函数使用理解 第三部分:Hbase调用Java API使用理解 第四部分:Hbase Shell操作 第五部分:Hbase建表.读写操作方式性能优化总结 第一部分:Hbase框架原理理解 概述 HBase是一个构建在HDFS上的分布式列存储系统:HBase是基于Google

HBase和Mapreduce

HBase和Mapreduce整合的代码网上有很多可以参考,在部署jar程序时遇到一个工程问题,值得注意,联系到之前在做spark时遇到过的一个类似的问题,这里详细介绍一下问题和解决方式 任务本身是读取hdfs上的数据,提取所需要的字段然后写入到hbase中,是一个常见的HBase和MapReduce结合的应用程序,在完成代码编写打包提交之后,运行代码时任务正常提交到了集群,并且Map任务顺利执行,没有出现异常,但是当任务运行到reduce阶段的时候出现了HBase的jar包中的一个类class