Mapreduce的文件和hbase共同输入

package duogemap;

import java.io.IOException;

import org.apache.hadoop.conf.Configuration;

import org.apache.hadoop.fs.Path;

import org.apache.hadoop.hbase.HBaseConfiguration;

import org.apache.hadoop.hbase.client.Result;

import org.apache.hadoop.hbase.client.Scan;

import org.apache.hadoop.hbase.io.ImmutableBytesWritable;

import org.apache.hadoop.hbase.mapreduce.TableInputFormat;

import org.apache.hadoop.hbase.mapreduce.TableMapReduceUtil;

import org.apache.hadoop.hbase.mapreduce.TableMapper;

import org.apache.hadoop.hbase.util.Bytes;

import org.apache.hadoop.io.Text;

import org.apache.hadoop.mapreduce.Job;

import org.apache.hadoop.mapreduce.Mapper;

import org.apache.hadoop.mapreduce.Reducer;

import org.apache.hadoop.mapreduce.lib.input.MultipleInputs;

import org.apache.hadoop.mapreduce.lib.input.TextInputFormat;

import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;

import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat;

public class MixMR {

public static class Map extends Mapper<Object, Text, Text, Text> {

public void map(Object key, Text value, Context context) throws IOException, InterruptedException {

String s = value.toString();

String[] sa = s.split(",");

if (sa.length == 2) {

context.write(new Text(sa[0]), new Text(sa[1]));

}

}

}

public static class TableMap extends TableMapper<Text, Text> {

public static final byte[] CF = "cf".getBytes();

public static final byte[] ATTR1 = "c1".getBytes();

public void map(ImmutableBytesWritable row, Result value, Context context) throws IOException, InterruptedException {

String key = Bytes.toString(row.get());

String val = new String(value.getValue(CF, ATTR1));

context.write(new Text(key), new Text(val));

}

}

public static class Reduce extends Reducer <Object, Text, Object, Text> {

public void reduce(Object key, Iterable<Text> values, Context context)

throws IOException, InterruptedException {

String ks = key.toString();

for (Text val : values){

context.write(new Text(ks), val);

}

}

}

public static void main(String[] args) throws Exception {

Path inputPath1 = new Path(args[0]);

Path inputPath2 = new Path(args[1]);

Path outputPath = new Path(args[2]);

String tableName = "test";

Configuration config = HBaseConfiguration.create();

Job job = new Job(config, "ExampleRead");

job.setJarByClass(MixMR.class); // class that contains mapper

Scan scan = new Scan();

scan.setCaching(500); // 1 is the default in Scan, which will be bad for MapReduce jobs

scan.setCacheBlocks(false); // don‘t set to true for MR jobs

scan.addFamily(Bytes.toBytes("cf"));

TableMapReduceUtil.initTableMapperJob(

tableName, // input HBase table name

scan, // Scan instance to control CF and attribute selection

TableMap.class, // mapper

Text.class, // mapper output key

Text.class, // mapper output value

job);

job.setReducerClass(Reduce.class); // reducer class

job.setOutputFormatClass(TextOutputFormat.class);

// inputPath1 here has no effect for HBase table

MultipleInputs.addInputPath(job, inputPath1, TextInputFormat.class, Map.class);

MultipleInputs.addInputPath(job, inputPath2, TableInputFormat.class, TableMap.class);

FileOutputFormat.setOutputPath(job, outputPath);

job.waitForCompletion(true);

}

}

时间: 2024-08-08 23:07:49

Mapreduce的文件和hbase共同输入的相关文章

HBase - MapReduce - HBase 作为输入源的示例 | 那伊抹微笑

博文作者:那伊抹微笑 csdn 博客地址:http://blog.csdn.net/u012185296 itdog8 地址链接 : http://www.itdog8.com/thread-203-1-1.html 博文标题:HBase - MapReduce - HBase 作为输入源的示例 | 那伊抹微笑 个性签名:世界上最遥远的距离不是天涯,也不是海角,而是我站在妳的面前,妳却感觉不到我的存在 技术方向:Flume+Kafka+Storm+Redis/Hbase+Hadoop+Hive+

MapReduce 读取和操作HBase中的数据

MapReduce 中如何处理HBase中的数据?如何读取HBase数据给Map?如何将结果存储到HBase中? 2012-07-05 13:40 89人阅读 评论(0) 收藏 举报 MapReduce 中如何处理HBase中的数据?如何读取HBase数据给Map?如何将结果存储到HBase中? Mapper类:包括一个内部类(Context)和四个方法(setup,map,cleanup,run):          setup,cleanup用于管理Mapper生命周期中的资源.setup

Linux文件编辑器vim输出输入重定向、管道以及进程

合抱之木,生于毫末:九层之台,起于累土:千里之行,始于足下.正因基础的重要性,才又撸起了linux. linux编辑工具: VI VIM EMACS vim 是vi的升级版本,它不仅兼容vi的所有指令,而且还有一些新的特性在里面.vim的这些优势主要体现在以下几个方面: 易用性 vi只能运行于unix中,而vim不仅可以运行于unix,windows ,mac等多操作平台. 语法加亮 vim可以用不同的颜色来加亮你的代码. 可视化操作(ESC+V) 就是说vim不仅可以在终端运行,也可以运行于x

编程题:为了展示文件包含功能,输入一个字符串,输出其长度。

1.c源代码如下: #include<stdio.h> #include"2.c" void main() { char string[20]; scanf("%s",string);         /*整个程序的功能:输入一个字符串,输出其长度*/ printf("There are %d characters.\n",string_len(string)); } 2.c源代码如下: int string_len(char str

mapreduce 多文件输出新API续

对于上一篇hadoop mapreduce 多文件输出,有一些地方介绍的不准确,这里做个续简单更正一下,同时正好解决了上一篇的不能多文件夹输出的问题 1.针对于上一篇代码中的 MultipleOutputs.addNamedOutput(job, "errorlog",     TextOutputFormat.class, Text.class, NullWritable.class);  方法,其实第二个参数并非是这么用的,下面看代码: private MultipleOutput

浅谈hadoop中mapreduce的文件分发

最近在做数据分析的时候,需要在mapreduce中调用c语言写的接口,此时就需要把动态链接库so文件分发到hadoop的各个节点上,原来想自己来做这个分发,大概过程就是把so文件放在hdfs上面,然后做mapreduce的时候把so文件从hdfs下载到本地,但查询资料后发现hadoop有相应的组件来帮助我们完成这个操作,这个组件就是DistributedCache,分布式缓存,运用这个东西可以做到第三方文件的分发和缓存功能,下面详解: 如果我们需要在map之间共享一些数据,如果信息量不大,我们可

BulkLoad加载本地文件到HBase表

BulkLoad加载文件到HBase表 1.功能 将本地数据导入到HBase中 2.原理 BulkLoad会将tsv/csv格式的文件编程hfile文件,然后再进行数据的导入,这样可以避免大量数据导入时造成的集群写入压力过大. 1.tsv格式的文件:字段之间以制表符\t分割 2.csv格式的文件:字段之间以逗号,分割 3.作用 减小HBase集群插入数据的压力 提高了Job运行的速度,降低了Job执行时间 4.案例 Step1.配置临时环境变量 $ export HBASE_HOME=/opt/

hadoop执行hdfs文件到hbase表插入操作(xjl456852原创)

本例中需要将hdfs上的文本文件,解析后插入到hbase的表中. 本例用到的hadoop版本2.7.2 hbase版本1.2.2 hbase的表如下: create 'ns2:user', 'info' hdfs上的文本文件如下[data/hbase_input/hbase.txt] 1,xiejl,20 2,haha,30 3,liudehua,40 4,daoming,41 可以通过命令查看hadoop的classpath现在包含哪些jar包: [[email protected] ~]$

MapReduce TopK 文件

问题描述:对于每日访问google 的ip做个记录 对应计算出当天前K个访问次数最多的ip地址. 对应此问题 先自定制一个ip格式的数据类型 继承WritableComparable接口. package reverseIndex; import java.io.DataInput; import java.io.DataOutput; import java.io.IOException; import org.apache.hadoop.io.IntWritable; import org.