本篇先介绍HBase在伪分布式环境下的安装方式,然后将MapReduce编程和HBase结合起来使用,完成WordCount这个例子。
- HBase在伪分布环境下安装
一、 前提条件
已经成功地安装了jdk1.6和hadoop1.2.1。
Jdk1.6+Hadoop1.2.1在伪分布环境下具体的安装方法见:Hadoop1.2.1安装——单节点方式和单机伪分布方式
二、 环境
- VMware® Workstation 10.04
- Ubuntu14.04 32位
- Java JDK 1.6.0
- hadoop1.2.1
- hbase0.94.26
三、 HBase0.94伪分布式下的安装步骤
(1)下载hbase0.94.26的tar包并解压
tar -zxvf hbase-0.94.26.tar.g
(2)去{hbase}/conf目录修改hbase-site.xml
<configuration> <property> <name>hbase.rootdir</name> <value>hdfs://localhost:9000/hbase</value> <!-- 端口号和ip地址要与hadoop配置参数fs.default.name一致 --> </property> <property> <name>hbase.cluster.distributed</name> <value>true</value> </property>
<property>
<name>dfs.replication</name>
<value>1</value> (伪分布设置为1)
</property>
</configuration>
(3)去{hbase}/conf目录修改hbase-env.sh文件
export JAVA_HOME=/usr/lib/jvm/{jdk} #jdk安装路径 export HBASE_CLASSPATH=/etc/hadoop export HBASE_MANAGES_ZK=true
(4)让hbase0.94.26支持hadoop1.2.1
hbase0.94.26默认支持的是hadoop1.0.4,我们可以用替换hadoop-core的方式让其支持hadoop1.2.1.
a. 将hadoop主目录下的hadoop-core-1.2.1.jar文件复制到hbase/lib目录下去,将hbase/lib 目录下自带的 hadoop-core-1.0.4.jar文件删除,
b. 再将hadoop/lib目录下的commons-collections-3.2.1.jar和commons-configuration-1.6.jar文件复制到 hbase/lib目录下去
rm /home/u14/hbase-0.94.26/lib/hadoop-core-1.0.4.jar cp /home/u14/hadoop/hadoop-core-1.2.1.jar /home/u14/hbase-0.94.26/lib cp /home/u14/hadoop/lib/commons-collections-3.2.1.jar /home/u14/hbase-0.94.26/lib cp /home/u14/hadoop/lib/commons-configuration-1.6.jar /home/u14/hbase-0.94.26/lib
(5)启动HBase
a. 先启动hadoop
b. 启动Hbase
进入hbase的解压目录下的bin文件夹,执行start-hbase.sh脚本
bin/start-hbase.sh
用jps命令查看相关进程:
SecondaryNameNode DataNode HQuorumPeer TaskTracker JobTracker Jps HRegionServer HMaster NameNode
c. 进入shell模式,操作hbase
bin/hbase shell
d. 停止hbase:先停止hbase,再停止hadoop
stop-hbase.sh stop-all.sh
- 使用Eclipse开发HBase应用程序
a. 在eclipse里新建一个java项目HBase,然后选择项目属性,在Libraries->Add External JARs...,然后选择{hbase}/lib下相关的JAR包,如果只是测试用的话,就简单一点,将所有的JAR选上
b. 在项目HBase下增加一个文件夹conf,将Hbase集群的配置文件hbase-site.xml复制到该目录,然后选择项目属性在Libraries->Add Class Folder,将刚刚增加的conf目录选上。 - 将MapReduce与HBase结合起来完成wordCount例子
在这个例子中,输入文件为:
user/u14/hbasetest/file01: hello world bye world
user/u14/hbasetest/file02: hello hadoop bye hadoop
程序思想:程序首先从文件中收集数据,在shuffle完成之后进行统计并计算,最后将计算结果存储到hbase中。
1 import java.io.IOException; 2 3 import org.apache.hadoop.conf.Configuration; 4 import org.apache.hadoop.fs.Path; 5 import org.apache.hadoop.hbase.HBaseConfiguration; 6 import org.apache.hadoop.hbase.HColumnDescriptor; 7 import org.apache.hadoop.hbase.HTableDescriptor; 8 import org.apache.hadoop.hbase.client.HBaseAdmin; 9 import org.apache.hadoop.hbase.client.Put; 10 import org.apache.hadoop.hbase.mapreduce.TableOutputFormat; 11 import org.apache.hadoop.hbase.mapreduce.TableReducer; 12 import org.apache.hadoop.hbase.util.Bytes; 13 import org.apache.hadoop.io.IntWritable; 14 import org.apache.hadoop.io.LongWritable; 15 import org.apache.hadoop.io.NullWritable; 16 import org.apache.hadoop.io.Text; 17 import org.apache.hadoop.mapreduce.Job; 18 import org.apache.hadoop.mapreduce.Mapper; 19 import org.apache.hadoop.mapreduce.lib.input.FileInputFormat; 20 import org.apache.hadoop.mapreduce.lib.input.TextInputFormat; 21 22 public class WordCountHBase { 23 public static class Map extends Mapper<LongWritable,Text, Text, IntWritable>{ 24 private IntWritable i = new IntWritable(1); 25 public void map(LongWritable key, Text value, Context context) 26 throws IOException, InterruptedException{ 27 String s[] = value.toString().trim().split(" "); 28 for(String m: s){ 29 context.write(new Text(m), i); 30 } 31 } 32 } 33 34 public static class Reduce extends TableReducer<Text, IntWritable, NullWritable>{ 35 public void reduce(Text key, Iterable<IntWritable> values, Context context) 36 throws IOException, InterruptedException{ 37 int sum = 0; 38 for(IntWritable i: values){ 39 sum += i.get(); 40 } 41 Put put = new Put(Bytes.toBytes(key.toString())); //put实例化,每一个词存一行 42 put.add(Bytes.toBytes("content"),Bytes.toBytes("count"), 43 Bytes.toBytes(String.valueOf(sum))); //列族为content,列修饰符为count,列值为数值 44 context.write(NullWritable.get(), put); 45 } 46 } 47 48 public static void createHBaseTable(String tableName) throws IOException{ 49 HTableDescriptor htd = new HTableDescriptor(tableName); 50 HColumnDescriptor col = new HColumnDescriptor("content"); 51 htd.addFamily(col); 52 HBaseConfiguration config = new HBaseConfiguration(); 53 HBaseAdmin admin = new HBaseAdmin(config); 54 if(admin.tableExists(tableName)){ 55 System.out.println("table exists, trying recreate table!"); 56 admin.disableTable(tableName); 57 admin.deleteTable(tableName); 58 } 59 System.out.println("create new table: "+ tableName); 60 admin.createTable(htd); 61 } 62 63 public static void main(String args[]) throws Exception{ 64 String tableName = "wordcountH"; 65 Configuration conf = new Configuration(); 66 conf.set(TableOutputFormat.OUTPUT_TABLE, tableName); 67 createHBaseTable(tableName); 68 Job job = new Job(conf, "WordCountHbase"); 69 job.setJarByClass(WordCountHBase.class); 70 job.setNumReduceTasks(3); 71 job.setMapperClass(Map.class); 72 job.setReducerClass(Reduce.class); 73 job.setMapOutputKeyClass(Text.class); 74 job.setMapOutputValueClass(IntWritable.class); 75 job.setInputFormatClass(TextInputFormat.class); 76 job.setOutputFormatClass(TableOutputFormat.class); 77 FileInputFormat.addInputPath(job, new Path(args[0])); 78 System.exit(job.waitForCompletion(true)?0:1); 79 } 80 }
程序成功运行后,通过Hbase Shell检查输出结果: