/** public abstract classTableMapper<KEYOUT, VALUEOUT> extends Mapper<ImmutableBytesWritable,Result, KEYOUT, VALUEOUT> { } *@author [email protected] * */ public class HbaseReader { publicstatic String flow_fields_import = "flow_fields_import"; staticclass HdfsSinkMapper extends TableMapper<Text, NullWritable>{ @Override protectedvoid map(ImmutableBytesWritable key, Result value, Context context) throwsIOException, InterruptedException { byte[]bytes = key.copyBytes(); Stringphone = new String(bytes); byte[]urlbytes = value.getValue("f1".getBytes(),"url".getBytes()); Stringurl = new String(urlbytes); context.write(newText(phone + "\t" + url), NullWritable.get()); } } staticclass HdfsSinkReducer extends Reducer<Text, NullWritable, Text,NullWritable>{ @Override protectedvoid reduce(Text key, Iterable<NullWritable> values, Context context)throws IOException, InterruptedException { context.write(key,NullWritable.get()); } } publicstatic void main(String[] args) throws Exception { Configurationconf = HBaseConfiguration.create(); conf.set("hbase.zookeeper.quorum","spark01"); Jobjob = Job.getInstance(conf); job.setJarByClass(HbaseReader.class); // job.setMapperClass(HdfsSinkMapper.class); Scanscan = new Scan(); TableMapReduceUtil.initTableMapperJob(flow_fields_import,scan, HdfsSinkMapper.class, Text.class, NullWritable.class, job); job.setReducerClass(HdfsSinkReducer.class); FileOutputFormat.setOutputPath(job,new Path("c:/hbasetest/output")); job.setOutputKeyClass(Text.class); job.setOutputValueClass(NullWritable.class); job.waitForCompletion(true); } }
/** public abstract classTableReducer<KEYIN, VALUEIN, KEYOUT> extends Reducer<KEYIN, VALUEIN, KEYOUT,Writable> { } *@author [email protected] * */ public class HbaseSinker { publicstatic String flow_fields_import = "flow_fields_import"; staticclass HbaseSinkMrMapper extends Mapper<LongWritable, Text, FlowBean,NullWritable>{ @Override protectedvoid map(LongWritable key, Text value, Context context) throws IOException,InterruptedException { Stringline = value.toString(); String[] fields =line.split("\t"); Stringphone = fields[0]; Stringurl = fields[1]; FlowBeanbean = new FlowBean(phone,url); context.write(bean,NullWritable.get()); } } staticclass HbaseSinkMrReducer extends TableReducer<FlowBean, NullWritable,ImmutableBytesWritable>{ @Override protectedvoid reduce(FlowBean key, Iterable<NullWritable> values, Context context)throws IOException, InterruptedException { Putput = new Put(key.getPhone().getBytes()); put.add("f1".getBytes(),"url".getBytes(), key.getUrl().getBytes()); context.write(newImmutableBytesWritable(key.getPhone().getBytes()), put); } } publicstatic void main(String[] args) throws Exception { Configurationconf = HBaseConfiguration.create(); conf.set("hbase.zookeeper.quorum","spark01"); HBaseAdminhBaseAdmin = new HBaseAdmin(conf); booleantableExists = hBaseAdmin.tableExists(flow_fields_import); if(tableExists){ hBaseAdmin.disableTable(flow_fields_import); hBaseAdmin.deleteTable(flow_fields_import); } HTableDescriptordesc = new HTableDescriptor(TableName.valueOf(flow_fields_import)); HColumnDescriptorhColumnDescriptor = new HColumnDescriptor ("f1".getBytes()); desc.addFamily(hColumnDescriptor); hBaseAdmin.createTable(desc); Jobjob = Job.getInstance(conf); job.setJarByClass(HbaseSinker.class); job.setMapperClass(HbaseSinkMrMapper.class); TableMapReduceUtil.initTableReducerJob(flow_fields_import,HbaseSinkMrReducer.class, job); FileInputFormat.setInputPaths(job,new Path("c:/hbasetest/data")); job.setMapOutputKeyClass(FlowBean.class); job.setMapOutputValueClass(NullWritable.class); job.setOutputKeyClass(ImmutableBytesWritable.class); job.setOutputValueClass(Mutation.class); job.waitForCompletion(true); } }
时间: 2024-11-29 03:09:10