package hbase; import java.io.IOException; import java.net.URI; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; import org.apache.hadoop.hbase.Cell; import org.apache.hadoop.hbase.CellUtil; import org.apache.hadoop.hbase.HBaseConfiguration; import org.apache.hadoop.hbase.client.Result; import org.apache.hadoop.hbase.client.Scan; import org.apache.hadoop.hbase.io.ImmutableBytesWritable; import org.apache.hadoop.hbase.mapreduce.TableMapReduceUtil; import org.apache.hadoop.hbase.mapreduce.TableMapper; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapreduce.Job; import org.apache.hadoop.mapreduce.Mapper; import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat; import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat; public class HBase2HdfsUtils { /** * args[0] 表名 * args[1] 列族、列名称列表,格式---列族:列 * args[2] 输出路径 * @param args * @throws Exception */ public static void main(String[] args) throws Exception { //获取Hbase的配置信息,从resources目录下的hbase-site.xml文件中获取配置信息 Configuration conf = HBaseConfiguration.create(); //设置列族、列名称信息列表参数,格式--列族:列 conf.set("FamilyColumnsList", args[1]); //申明一个客户端 Job job = Job.getInstance(conf, HBase2HdfsUtils.class.getSimpleName()); //打成jar包执行需要指定类名 job.setJarByClass(HBase2HdfsUtils.class); //指定HBase中需要导出表的信息,即map的输入 Scan scan = new Scan(); TableMapReduceUtil.initTableMapperJob(args[0], scan, MyMapper.class, Text.class, Text.class, job); //设置输入的配置信息:key、value的类型 job.setMapperClass(MyMapper.class); job.setMapOutputKeyClass(Text.class); job.setMapOutputValueClass(Text.class); //从hbase导出数据到hdfs不需要reduce,所以设置reduce的任务数为0 job.setNumReduceTasks(0); //设置输出的配置信息:key、value的类型以及输出路径 job.setOutputKeyClass(Text.class); job.setOutputValueClass(Text.class); job.setOutputFormatClass(TextOutputFormat.class); //如果输出目录存在,则删除输出目录 Path path = new Path(args[2]); FileSystem fs = FileSystem.get(new URI(args[2]), new Configuration()); if(fs.exists(path)){ fs.delete(path, true); } FileOutputFormat.setOutputPath(job, new Path(args[2])); job.waitForCompletion(true); } static class MyMapper extends TableMapper<Text, Text>{ Text k2 = new Text(); Text v2 = new Text(); @Override protected void map( ImmutableBytesWritable key, Result value, Mapper<ImmutableBytesWritable, Result, Text, Text>.Context context) throws IOException, InterruptedException { k2.set(""); String v2Text = ""; String familyColumnsList = context.getConfiguration().get("FamilyColumnsList"); String[] splited = familyColumnsList.split(","); String title = ""; //标题 for (String split : splited) { String[] column = split.split(":"); //根据列族、列获取值 Cell cell = value.getColumnLatestCell(column[0].getBytes(), column[1].getBytes()); //判断据列族、列获取到的cell不为空,否则会报空指针错误 if(cell!=null){ title += new String(CellUtil.cloneQualifier(cell)) + ":" + new String(CellUtil.cloneQualifier(cell)) + "\t" ; v2Text += new String(CellUtil.cloneValue(cell)) + "\t" ; } } v2.set(title + "\n" + v2Text); context.write(k2, v2); } } }
时间: 2024-12-20 10:33:11