hbase使用MapReduce操作4(实现将 HDFS 中的数据写入到 HBase 表中)

实现将 HDFS 中的数据写入到 HBase 表中

Runner类

 1 package com.yjsj.hbase_mr2;
 2
 3 import com.yjsj.hbase_mr2.ReadFruitFromHDFSMapper;
 4 import com.yjsj.hbase_mr2.WriteFruitMRFromTxtReducer;
 5 import org.apache.hadoop.conf.Configuration;
 6 import org.apache.hadoop.conf.Configured;
 7 import org.apache.hadoop.fs.Path;
 8 import org.apache.hadoop.hbase.HBaseConfiguration;
 9 import org.apache.hadoop.hbase.client.Put;
10 import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
11 import org.apache.hadoop.hbase.mapreduce.TableMapReduceUtil;
12 import org.apache.hadoop.mapreduce.Job;
13 import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
14 import org.apache.hadoop.util.Tool;
15 import org.apache.hadoop.util.ToolRunner;
16
17 import java.io.IOException;
18
19 class Txt2FruitRunner extends Configured implements Tool {
20     public int run(String[] args) throws Exception {
21 //得到 Configuration
22         Configuration conf = this.getConf();
23 //创建 Job 任务
24         Job job = Job.getInstance(conf, this.getClass().getSimpleName());
25         job.setJarByClass(Txt2FruitRunner.class);
26         Path inPath = new Path("hdfs://master:9000/input_fruit/fruit.tsv");
27
28         FileInputFormat.addInputPath(job, inPath);
29         //设置 Mapper
30         job.setMapperClass(ReadFruitFromHDFSMapper.class);
31         job.setMapOutputKeyClass(ImmutableBytesWritable.class);
32         job.setMapOutputValueClass(Put.class);
33         //设置 Reducer
34         TableMapReduceUtil.initTableReducerJob("fruit_hdfs", WriteFruitMRFromTxtReducer.class, job);
35         //设置 Reduce 数量,最少 1 个
36         job.setNumReduceTasks(1);
37         boolean isSuccess = job.waitForCompletion(true);
38         if (!isSuccess) {
39             throw new IOException("Job running with error");
40         }
41         return isSuccess ? 0 : 1;
42     }
43
44     public static void main(String[] args) throws Exception {
45         Configuration conf = HBaseConfiguration.create();
46         conf = HBaseConfiguration.create();
47         conf.set("hbase.zookeeper.quorum", "master,node1,node2");
48         conf.set("hbase.zookeeper.property.clientPort", "2181");
49         conf.set("hbase.master", "master:60000");
50         int status = ToolRunner.run(conf, new Txt2FruitRunner(), args);
51         System.exit(status);
52     }
53 }

Mapper类

 1 package com.yjsj.hbase_mr2;
 2
 3 import java.io.IOException;
 4
 5 import org.apache.hadoop.hbase.client.Put;
 6 import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
 7 import org.apache.hadoop.hbase.util.Bytes;
 8 import org.apache.hadoop.io.LongWritable;
 9 import org.apache.hadoop.io.Text;
10 import org.apache.hadoop.mapreduce.Mapper;
11
12 public class ReadFruitFromHDFSMapper extends Mapper<LongWritable, Text, ImmutableBytesWritable, Put> {
13     @Override
14     protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {
15         //从 HDFS 中读取的数据
16         String lineValue = value.toString();
17         //读取出来的每行数据使用\t 进行分割,存于 String 数组
18         String[] values = lineValue.split("\t");
19         //根据数据中值的含义取值
20         String rowKey = values[0];
21         String name = values[1];
22         String color = values[2];
23         //初始化 rowKey
24         ImmutableBytesWritable rowKeyWritable = new ImmutableBytesWritable(Bytes.toBytes(rowKey));
25         //初始化 put 对象
26         Put put = new Put(Bytes.toBytes(rowKey));
27         //参数分别:列族、列、值
28         put.add(Bytes.toBytes("info"), Bytes.toBytes("name"), Bytes.toBytes(name));
29         put.add(Bytes.toBytes("info"), Bytes.toBytes("color"), Bytes.toBytes(color));
30         context.write(rowKeyWritable, put);
31     }
32 }

Reduce类

package com.yjsj.hbase_mr2;

import java.io.IOException;

import org.apache.hadoop.hbase.client.Put;
import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
import org.apache.hadoop.hbase.mapreduce.TableReducer;
import org.apache.hadoop.io.NullWritable;

public class WriteFruitMRFromTxtReducer extends TableReducer<ImmutableBytesWritable, Put, NullWritable> {
    @Override
    protected void reduce(ImmutableBytesWritable key, Iterable<Put> values, Context context) throws IOException, InterruptedException {
        //读出来的每一行数据写入到 fruit_hdfs 表中
        for (Put put : values) {
            context.write(NullWritable.get(), put);
        }
    }
}

原文地址:https://www.cnblogs.com/pursue339/p/10658127.html

时间: 2024-12-12 15:15:04

hbase使用MapReduce操作4(实现将 HDFS 中的数据写入到 HBase 表中)的相关文章

spark 数据写入到 hbase

1)spark把数据写入到hbase需要用到:PairRddFunctions的saveAsHadoopDataset方法,这里用到了 implicit conversion,需要我们引入 import org.apache.spark.SparkContext._ 2)spark写入hbase,实质是借用了org.apache.hadoop.hbase.mapreduce.TableInputFormat这个对象,用其内部的recorderWriter将数据写入hbase 同时,也借用了had

HBase结合MapReduce批量导入(HDFS中的数据导入到HBase)

HBase结合MapReduce批量导入 1 package hbase; 2 3 import java.text.SimpleDateFormat; 4 import java.util.Date; 5 6 import org.apache.hadoop.conf.Configuration; 7 import org.apache.hadoop.hbase.client.Put; 8 import org.apache.hadoop.hbase.mapreduce.TableOutput

mapreduce实现从hbase中统计数据,结果存入mysql中

最近开始学习使用mapreduce统计hbase中的数据,并将结果集存入mysql中,供前台查询使用. 使用hadoop版本为2.5.1,hbase版本为0.98.6.1 mapreduce程序分为三个部分:job.map函数.reduce函数 job类: 1 public class DayFaultStatisticsJob { 2 private static final Logger logger = LoggerFactory.getLogger(DayFaultStatisticsJ

7.从Hbase中读取数据写入hdfs

/** public abstract classTableMapper<KEYOUT, VALUEOUT> extends Mapper<ImmutableBytesWritable,Result, KEYOUT, VALUEOUT> { }  *@author [email protected]  *  */ public class HbaseReader {          publicstatic String flow_fields_import = "fl

MapReduce操作Hbase --table2file

官方手册:http://hbase.apache.org/book.html#mapreduce.example 简单的操作,将hbase表中的数据写入到文件中. RunJob 源码: 1 import org.apache.hadoop.conf.Configuration; 2 import org.apache.hadoop.fs.FileSystem; 3 import org.apache.hadoop.fs.Path; 4 import org.apache.hadoop.hbase

使用Hive或Impala执行SQL语句,对存储在HBase中的数据操作

使用Hive或Impala执行SQL语句,对存储在HBase中的数据操作 Hive Impala HBase HiveQL 大数据 使用Hive或Impala执行SQL语句,对存储在HBase中的数据操作 〇.摘要 一.基础环境 二.数据存储在HBase中,使用Hive执行SQL语句 Ⅰ.创建Hive外部表 Ⅱ.从HBase读 Ⅲ.向HBase写 三.数据存储在HBase中,使用Impala执行SQL语句 Ⅰ.从HBase读 Ⅱ.向HBase写 四.综上所述 〇.摘要 Hive是基于Hadoop

HBase结合MapReduce批量导入

2016年5月14日13:17:05 作者:数据分析玩家 Hbase是Hadoop生态体系配置的数据库,我们可以通过HTable api中的put方法向Hbase数据库中插入数据,但是由于put效率太低,不能批量插入大量的数据,文本将详细介绍如何通过MapReduce运算框架向Hbase数据库中导入数据. 开篇先介绍业务场景:将电信手机上网日志中的数据导入到Hbase数据库中,将部分数据以及相应字段描述列出: 图片格式描述: 先介绍一个日期格式的转换: public class TestDate

使用BulkLoad从HDFS批量导入数据到HBase

在向Hbase中写入数据时,常见的写入方法有使用HBase API,Mapreduce批量导入数据,使用这些方式带入数据时,一条数据写入到HBase数据库中的大致流程如图. 数据发出后首先写入到雨鞋日志WAl中,写入到预写日志中之后,随后写入到内存MemStore中,最后在Flush到Hfile中.这样写数据的方式不会导致数据的丢失,并且道正数据的有序性,但是当遇到大量的数据写入时,写入的速度就难以保证.所以,介绍一种性能更高的写入方式BulkLoad. 使用BulkLoad批量写入数据主要分为

HBase的SHELL操作和API

1.表结构: 2.SHELL操作 命令:hbase shell 显示表:list 创建表:create 'tb_name','column_family_1','column_family_2',...; 或者 create 'user', {NAME => 'column_family_1', VERSIONS => '3'} 插入数据:put 'tb_name','rk_on','column_family : key','value' 获取数据: 获取所有数据:get 'tb_name'