7.从Hbase中读取数据写入hdfs

/**
public abstract classTableMapper<KEYOUT, VALUEOUT>
extends Mapper<ImmutableBytesWritable,Result, KEYOUT, VALUEOUT> {
}
 *@author [email protected]
 *
 */
public class HbaseReader {
 
       publicstatic String flow_fields_import = "flow_fields_import";
       staticclass HdfsSinkMapper extends TableMapper<Text, NullWritable>{
 
              @Override
              protectedvoid map(ImmutableBytesWritable key, Result value, Context context) throwsIOException, InterruptedException {
 
                     byte[]bytes = key.copyBytes();
                     Stringphone = new String(bytes);
                     byte[]urlbytes = value.getValue("f1".getBytes(),"url".getBytes());
                     Stringurl = new String(urlbytes);
                     context.write(newText(phone + "\t" + url), NullWritable.get());
                    
              }
             
       }
      
       staticclass HdfsSinkReducer extends Reducer<Text, NullWritable, Text,NullWritable>{
             
              @Override
              protectedvoid reduce(Text key, Iterable<NullWritable> values, Context context)throws IOException, InterruptedException {
                    
                     context.write(key,NullWritable.get());
              }
       }
      
       publicstatic void main(String[] args) throws Exception {
              Configurationconf = HBaseConfiguration.create();
              conf.set("hbase.zookeeper.quorum","spark01");
             
              Jobjob = Job.getInstance(conf);
             
              job.setJarByClass(HbaseReader.class);
             
//            job.setMapperClass(HdfsSinkMapper.class);
              Scanscan = new Scan();
              TableMapReduceUtil.initTableMapperJob(flow_fields_import,scan, HdfsSinkMapper.class, Text.class, NullWritable.class, job);
              job.setReducerClass(HdfsSinkReducer.class);
             
              FileOutputFormat.setOutputPath(job,new Path("c:/hbasetest/output"));
             
              job.setOutputKeyClass(Text.class);
              job.setOutputValueClass(NullWritable.class);
             
              job.waitForCompletion(true);
             
             
       }
      
}
/**
public abstract classTableReducer<KEYIN, VALUEIN, KEYOUT>
extends Reducer<KEYIN, VALUEIN, KEYOUT,Writable> {
}
 *@author [email protected]
 *
 */
public class HbaseSinker {
 
       publicstatic String flow_fields_import = "flow_fields_import";
       staticclass HbaseSinkMrMapper extends Mapper<LongWritable, Text, FlowBean,NullWritable>{
              @Override
              protectedvoid map(LongWritable key, Text value, Context context) throws IOException,InterruptedException {
 
                     Stringline = value.toString();
                     String[] fields =line.split("\t");
                     Stringphone = fields[0];
                     Stringurl = fields[1];
                    
                     FlowBeanbean = new FlowBean(phone,url);
                    
                     context.write(bean,NullWritable.get());
              }
       }
      
       staticclass HbaseSinkMrReducer extends TableReducer<FlowBean, NullWritable,ImmutableBytesWritable>{
             
              @Override
              protectedvoid reduce(FlowBean key, Iterable<NullWritable> values, Context context)throws IOException, InterruptedException {
                    
                     Putput = new Put(key.getPhone().getBytes());
                     put.add("f1".getBytes(),"url".getBytes(), key.getUrl().getBytes());
                    
                     context.write(newImmutableBytesWritable(key.getPhone().getBytes()), put);
                    
              }
             
       }
      
       publicstatic void main(String[] args) throws Exception {
              Configurationconf = HBaseConfiguration.create();
              conf.set("hbase.zookeeper.quorum","spark01");
             
              HBaseAdminhBaseAdmin = new HBaseAdmin(conf);
             
              booleantableExists = hBaseAdmin.tableExists(flow_fields_import);
              if(tableExists){
                     hBaseAdmin.disableTable(flow_fields_import);
                     hBaseAdmin.deleteTable(flow_fields_import);
              }
              HTableDescriptordesc = new HTableDescriptor(TableName.valueOf(flow_fields_import));
              HColumnDescriptorhColumnDescriptor = new HColumnDescriptor ("f1".getBytes());
              desc.addFamily(hColumnDescriptor);
             
              hBaseAdmin.createTable(desc);
             
             
              Jobjob = Job.getInstance(conf);
             
              job.setJarByClass(HbaseSinker.class);
             
              job.setMapperClass(HbaseSinkMrMapper.class);
              TableMapReduceUtil.initTableReducerJob(flow_fields_import,HbaseSinkMrReducer.class, job);
             
              FileInputFormat.setInputPaths(job,new Path("c:/hbasetest/data"));
             
              job.setMapOutputKeyClass(FlowBean.class);
              job.setMapOutputValueClass(NullWritable.class);
             
              job.setOutputKeyClass(ImmutableBytesWritable.class);
              job.setOutputValueClass(Mutation.class);
             
              job.waitForCompletion(true);
             
             
       }
      
}
时间: 2024-11-29 03:09:10

7.从Hbase中读取数据写入hdfs的相关文章

hbase使用MapReduce操作4(实现将 HDFS 中的数据写入到 HBase 表中)

实现将 HDFS 中的数据写入到 HBase 表中 Runner类 1 package com.yjsj.hbase_mr2; 2 3 import com.yjsj.hbase_mr2.ReadFruitFromHDFSMapper; 4 import com.yjsj.hbase_mr2.WriteFruitMRFromTxtReducer; 5 import org.apache.hadoop.conf.Configuration; 6 import org.apache.hadoop.c

Spark读取Hbase中的数据_云帆大数据分享

Spark读取Hbase中的数据 大家可能都知道很熟悉Spark的两种常见的数据读取方式(存放到RDD中):(1).调用parallelize函数直接从集合中获取数据,并存入RDD中:Java版本如下: 1 JavaRDD<Integer> myRDD = sc.parallelize(Arrays.asList(1,2,3)); Scala版本如下: 1 val myRDD= sc.parallelize(List(1,2,3)) 这种方式很简单,很容易就可以将一个集合中的数据变成RDD的初

MapReduce 读取和操作HBase中的数据

MapReduce 中如何处理HBase中的数据?如何读取HBase数据给Map?如何将结果存储到HBase中? 2012-07-05 13:40 89人阅读 评论(0) 收藏 举报 MapReduce 中如何处理HBase中的数据?如何读取HBase数据给Map?如何将结果存储到HBase中? Mapper类:包括一个内部类(Context)和四个方法(setup,map,cleanup,run):          setup,cleanup用于管理Mapper生命周期中的资源.setup

程序一 用记事本建立文件src.dat,其中存放若干字符。编写程序,从文件src.dat中读取数据,统计其中的大写字母、小写字母、数字、其它字符的个数,并将这些数据写入到文件test.dat中。

用记事本建立文件src.dat,其中存放若干字符.编写程序,从文件src.dat中读取数据,统计其中的大写字母.小写字母.数字.其它字符的个数,并将这些数据写入到文件test.dat中. #include<stdio.h>#include<stdlib.h>#include<string.h>int main(){ FILE*fp1,*fp2; char ch; int da=0,xiao=0,shuzi=0,qita=0; if((fp1=fopen("sr

使用Hive或Impala执行SQL语句,对存储在HBase中的数据操作

使用Hive或Impala执行SQL语句,对存储在HBase中的数据操作 Hive Impala HBase HiveQL 大数据 使用Hive或Impala执行SQL语句,对存储在HBase中的数据操作 〇.摘要 一.基础环境 二.数据存储在HBase中,使用Hive执行SQL语句 Ⅰ.创建Hive外部表 Ⅱ.从HBase读 Ⅲ.向HBase写 三.数据存储在HBase中,使用Impala执行SQL语句 Ⅰ.从HBase读 Ⅱ.向HBase写 四.综上所述 〇.摘要 Hive是基于Hadoop

c/c++中读取和写入mat文件 --- MAT File I/O Library(二)

本篇记述c/c++中读取mat文件示例 测试数据matioTest.zip下载地址: http://pan.baidu.com/s/1sjPkMsd 1. 读取matioTest.zip中s.mat文件,数据为普通matrix 1 #include <iostream> 2 #include <cassert> 3 #include <matio.h> 4 5 6 int main() 7 { 8 mat_t *matfp; 9 const char* matFileP

非常标准的将数据保存到file并从file中读取数据。

字符流:Reader(读) Writer(写) 字节流:InputStream(读数据)  OutputStream(写数据) 1,字节流 InputStream(读),OutputStream(写) 2,字符流 Reader(读),Writer(写) 结论:只要是处理纯文本数据,就要优先考虑使用字符流,除此之外都用字节流. 向文件中写入内容 try { FileOutputStream fos = openFileOutput(INTERNAL_FILENAME , MODE_APPEND);

Redis 中文入库成功,读取数据写入文件乱码问题

最近需要用到redis ,但是在编码这个问题上,纠结了很久. 需求 : 每天一个进程将中文文件入库到redis中(不定时更新) ,另外几个进程读取redis中的信息 ,并处理数据结果. 使用的redis模块 : redis-py 问题 : 入库正常,读取数据成功,以GBK编码写入文件出现异常. 通过以下参数连接 redis : client  = redis.StrictRedis(host='localhost', port=6379, db=0, password="***") 从

从Hadoop URL中读取数据

要从Hadoop文件系统中读取文件,一个最简单的方法是使用java.net.URL对象来打开一个数据流,从而从中读取数据.一般的格式如下: 1.      InputStream in = null; 2.  try { 3.       in = new URL("hdfs://host/path").openStream(); 4.       // process in 5.  } finally { 6.       IOUtils.closeStream(in); 7.  }