Hadoop之——HBASE结合MapReduce批量导入数据

转载请注明出处:http://blog.csdn.net/l1028386804/article/details/46463889

废话不多说。直接上代码,你懂得

package hbase;

import java.text.SimpleDateFormat;
import java.util.Date;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hbase.client.Put;
import org.apache.hadoop.hbase.mapreduce.TableOutputFormat;
import org.apache.hadoop.hbase.mapreduce.TableReducer;
import org.apache.hadoop.hbase.util.Bytes;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Counter;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.input.TextInputFormat;
/**
 * HBASE结合MapReduce批量导入
 * @author liuyazhuang
 */
public class BatchImport {
	static class BatchImportMapper extends Mapper<LongWritable, Text, LongWritable, Text>{
		SimpleDateFormat dateformat1=new SimpleDateFormat("yyyyMMddHHmmss");
		Text v2 = new Text();

		protected void map(LongWritable key, Text value, Context context) throws java.io.IOException ,InterruptedException {
			final String[] splited = value.toString().split("\t");
			try {
				final Date date = new Date(Long.parseLong(splited[0].trim()));
				final String dateFormat = dateformat1.format(date);
				String rowKey = splited[1]+":"+dateFormat;
				v2.set(rowKey+"\t"+value.toString());
				context.write(key, v2);
			} catch (NumberFormatException e) {
				final Counter counter = context.getCounter("BatchImport", "ErrorFormat");
				counter.increment(1L);
				System.out.println("出错了"+splited[0]+" "+e.getMessage());
			}
		};
	}

	static class BatchImportReducer extends TableReducer<LongWritable, Text, NullWritable>{
		protected void reduce(LongWritable key, java.lang.Iterable<Text> values, 	Context context) throws java.io.IOException ,InterruptedException {
			for (Text text : values) {
				final String[] splited = text.toString().split("\t");

				final Put put = new Put(Bytes.toBytes(splited[0]));
				put.add(Bytes.toBytes("cf"), Bytes.toBytes("date"), Bytes.toBytes(splited[1]));
				put.add(Bytes.toBytes("cf"), Bytes.toBytes("msisdn"), Bytes.toBytes(splited[2]));
				//省略其它字段,调用put.add(....)就可以
				context.write(NullWritable.get(), put);
			}
		};
	}

	public static void main(String[] args) throws Exception {
		final Configuration configuration = new Configuration();
		//设置zookeeper
		configuration.set("hbase.zookeeper.quorum", "hadoop0");
		//设置hbase表名称
		configuration.set(TableOutputFormat.OUTPUT_TABLE, "wlan_log");
		//将该值改大,防止hbase超时退出
		configuration.set("dfs.socket.timeout", "180000");

		final Job job = new Job(configuration, "HBaseBatchImport");

		job.setMapperClass(BatchImportMapper.class);
		job.setReducerClass(BatchImportReducer.class);
		//设置map的输出,不设置reduce的输出类型
		job.setMapOutputKeyClass(LongWritable.class);
		job.setMapOutputValueClass(Text.class);

		job.setInputFormatClass(TextInputFormat.class);
		//不再设置输出路径,而是设置输出格式类型
		job.setOutputFormatClass(TableOutputFormat.class);

		FileInputFormat.setInputPaths(job, "hdfs://hadoop0:9000/input");

		job.waitForCompletion(true);
	}
}
时间: 2024-08-04 02:32:28

Hadoop之——HBASE结合MapReduce批量导入数据的相关文章

HBase结合MapReduce批量导入

2016年5月14日13:17:05 作者:数据分析玩家 Hbase是Hadoop生态体系配置的数据库,我们可以通过HTable api中的put方法向Hbase数据库中插入数据,但是由于put效率太低,不能批量插入大量的数据,文本将详细介绍如何通过MapReduce运算框架向Hbase数据库中导入数据. 开篇先介绍业务场景:将电信手机上网日志中的数据导入到Hbase数据库中,将部分数据以及相应字段描述列出: 图片格式描述: 先介绍一个日期格式的转换: public class TestDate

HBase结合MapReduce批量导入(HDFS中的数据导入到HBase)

HBase结合MapReduce批量导入 1 package hbase; 2 3 import java.text.SimpleDateFormat; 4 import java.util.Date; 5 6 import org.apache.hadoop.conf.Configuration; 7 import org.apache.hadoop.hbase.client.Put; 8 import org.apache.hadoop.hbase.mapreduce.TableOutput

hbase 结合MapReduce 批量导入

hbase结合Mapreduce的批量导入: 直接给出代码讲述:(具体操作结合代码中的注释) package hbase; import java.io.IOException; import java.net.URISyntaxException; import java.text.SimpleDateFormat; import java.util.Date; import org.apache.hadoop.conf.Configuration; import org.apache.had

使用BulkLoad从HDFS批量导入数据到HBase

在向Hbase中写入数据时,常见的写入方法有使用HBase API,Mapreduce批量导入数据,使用这些方式带入数据时,一条数据写入到HBase数据库中的大致流程如图. 数据发出后首先写入到雨鞋日志WAl中,写入到预写日志中之后,随后写入到内存MemStore中,最后在Flush到Hfile中.这样写数据的方式不会导致数据的丢失,并且道正数据的有序性,但是当遇到大量的数据写入时,写入的速度就难以保证.所以,介绍一种性能更高的写入方式BulkLoad. 使用BulkLoad批量写入数据主要分为

Dynamics 2015 数据管理 之 如何批量导入数据到 正式区(一)

单一个项目的数据导入可以在具体功能 中导入,大体位置如下. 如果项目新上线的话,要批量导入数据的话,就要到如下的功能中实现了. 进入方式: 在 设置 ---- 数据管理 下载 下载后,用EXCEL打开, 给业务人员批量输入数据. 编辑好以后,上传吧,点 导入. 下一步即可以,导入以后,可以返回看看数据导入的情况,格式有没有错误等.

django 批量导入数据

一.需求 我在数据库中建了一张表,用来保存ucloud云上的project id 和project name models.py代码如下 #coding:utf-8 from django.db import models class Project(models.Model):     name = models.CharField(u'项目名称',max_length=32,blank=True)     id = models.CharField(u'项目ID',max_length=32

利用OLEDB+SqlClient实现EXCEL批量导入数据

以下是几个自己写的类 /// <summary> /// 取得Excel对象 /// </summary> /// <param name="strConn">OLEDB连接字符串</param> /// <param name="sql">SQL语句</param> /// <returns></returns> public static DataTable GetE

asp.net线程批量导入数据时通过ajax获取执行状态

最近因为工作中遇到一个需求,需要做了一个批量导入功能,但长时间运行没个反馈状态,很容易让人看了心急,产生各种臆想!为了解决心里障碍,写了这么个功能. 通过线程执行导入,并把正在执行的状态存入session,既共享执行状态,通过ajax调用session里的执行状态,从而实现反馈导入状态的功能! 上代码: 前端页面 <!DOCTYPE html> <html lang="en"> <head>  <meta charset="UTF-8

redis pipe 批量导入数据

redis pipe 批量导入数据 速度非常快, 文本需要支持redis的协议, 使用Python生成文件 代码如下 delimiter = "\r\n" data = "*3" + delimiter + "$3" + delimiter + "set" + delimiter + "$" + str(len(row[0])) + delimiter + row[0] + delimiter + &quo