新浪微博数据解析与java操作Hbase实例

之前发过一篇开发新浪微博的文章,对于大家比较感兴趣的内容之一便是如何解析新浪微博的JSON。

其实一开始的时候,也遇过一些挫折,比如直接用JsonArray和JsonObject去解析JSON内容的话,是解析不了的。

因为JSON的格式比较固定,像新浪微博返回的JSON内容则是多了一个中括号及statues标签,如下:

{
    "statuses": [ {
            "created_at": "Tue May 31 17:46:55 +0800 2011",
            "id": 11488058246,
            "text": "求关注。",
            "source": "<a href="http://weibo.com" rel="nofollow">新浪微博</a>",
            "favorited": false,
            "truncated": false,
            "in_reply_to_status_id": "",
            "in_reply_to_user_id": "",
            "in_reply_to_screen_name": "",
            "geo": null,
            "mid": "5612814510546515491",
            "reposts_count": 8,
            "comments_count": 9,
            "annotations": [],
            "user": {
                "id": 1404376560,
                "screen_name": "zaku",
                "name": "zaku",
                "province": "11",
                "city": "5",
                "location": "北京 朝阳区",
                "description": "人生五十年,乃如梦如幻;有生斯有死,壮士复何憾。",
                "url": "http://blog.sina.com.cn/zaku",
                "profile_image_url": "http://tp1.sinaimg.cn/1404376560/50/0/1",
                "domain": "zaku",
                "gender": "m",
                "followers_count": 1204,
                "friends_count": 447,
                "statuses_count": 2908,
                "favourites_count": 0,
                "created_at": "Fri Aug 28 00:00:00 +0800 2009",
                "following": false,
                "allow_all_act_msg": false,
                "remark": "",
                "geo_enabled": true,
                "verified": false,
                "allow_all_comment": true,
                "avatar_large": "http://tp1.sinaimg.cn/1404376560/180/0/1",
                "verified_reason": "",
                "follow_me": false,
                "online_status": 0,
                "bi_followers_count": 215
            }
        },
        ...  ],
    "previous_cursor": 0,                    // 暂未支持
    "next_cursor": 11488013766,     // 暂未支持
    "total_number": 81655
}

我的目标就是先去掉以上红色字体的内容,这个比较容易实现。

[java]
 
view plain
copy

  1. //将获取到的json内容变成可以正常解析的文本
  2. public static String parseJsonText(String jtext){
  3. char leftSymbol=‘[‘;
  4. char rightSymbol=‘]‘;
  5. String result="";
  6. for(int i=0;i<jtext.length();i++){
  7. if(leftSymbol==jtext.charAt(i)){
  8. result=jtext.substring(i,jtext.length());
  9. break;
  10. }
  11. }
  12. for(int i=result.length();i>0;i--){
  13. if(rightSymbol==result.charAt(i-1)){
  14. result=result.substring(0, i);
  15. break;
  16. }
  17. }
  18. return result;
  19. }

处理完就可以直接用JsonArray和JsonObject来解析相关的内容了。

第二步就是实现一个List<HashMap<String,String>>,用于填充ListView。

下面的方法返回一个List<HashMap<String,String>>,并在其中解析JSON内容。

[java]
 
view plain
copy

  1. private List<HashMap<String, String>> listview(String rlt) {
  2. String result = ParseJson.parseJsonText(rlt);
  3. try {
  4. JSONArray jsonArrays = new JSONArray(result);
  5. for (int i = 0; i < jsonArrays.length(); i++) {
  6. JSONObject jsonObject = jsonArrays.getJSONObject(i);
  7. HashMap<String, String> map = new HashMap<String, String>();
  8. // 获得微博的ID
  9. String ID = jsonObject.getString("id");
  10. contentId.add(ID);
  11. // 获得微博内容
  12. String text = jsonObject.getString("text");
  13. // 获得微博来源
  14. String source = jsonObject.getString("source");
  15. // 获得微博的发表时间
  16. String created_at = jsonObject.getString("created_at");
  17. // 生成user的可用JSON对象
  18. JSONObject userJson = jsonObject.getJSONObject("user");
  19. // 获得发表微博的用户名
  20. String username = userJson.getString("name");
  21. map.put("text", text);
  22. map.put("name", username);
  23. map.put("source", "来自" + "【" + source + "】");
  24. map.put("created_at", ParseJson.getFormatTime(created_at));
  25. if (jsonObject.has("retweeted_status")) {
  26. JSONObject retweetedJson = jsonObject
  27. .getJSONObject("retweeted_status");
  28. // 获得转发微博的内容
  29. String retweetedText = retweetedJson.getString("text");
  30. map.put("retweetedText", retweetedText);
  31. JSONObject retWeetedUserNameJson = retweetedJson
  32. .getJSONObject("user");
  33. // 获得转发微博内容的用户名
  34. String retweetedContent = retWeetedUserNameJson
  35. .getString("name");
  36. map.put("retweetedContent", retweetedContent + ":"
  37. + retweetedText);
  38. }
  39. listArrays.add(map);
  40. }
  41. } catch (JSONException e) {
  42. // TODO Auto-generated catch block
  43. System.out.println(e.getMessage());
  44. }
  45. return listArrays;
  46. }

通过上面两步就可以实现基本的微博JSON解析了。

Java操作Hbase进行建表、删表以及对数据进行增删改查,条件查询

1、搭建环境

新建JAVA项目,添加的包有:

有关Hadoop的hadoop-core-0.20.204.0.jar

有关Hbase的hbase-0.90.4.jar、hbase-0.90.4-tests.jar以及Hbase资源包中lib目录下的所有jar包

2、主要程序

Java代码
 
 

  1. package com.wujintao.hbase.test;
  2. import java.io.IOException;
  3. import java.util.ArrayList;
  4. import java.util.List;
  5. import org.apache.hadoop.conf.Configuration;
  6. import org.apache.hadoop.hbase.HBaseConfiguration;
  7. import org.apache.hadoop.hbase.HColumnDescriptor;
  8. import org.apache.hadoop.hbase.HTableDescriptor;
  9. import org.apache.hadoop.hbase.KeyValue;
  10. import org.apache.hadoop.hbase.MasterNotRunningException;
  11. import org.apache.hadoop.hbase.ZooKeeperConnectionException;
  12. import org.apache.hadoop.hbase.client.Delete;
  13. import org.apache.hadoop.hbase.client.Get;
  14. import org.apache.hadoop.hbase.client.HBaseAdmin;
  15. import org.apache.hadoop.hbase.client.HTable;
  16. import org.apache.hadoop.hbase.client.HTablePool;
  17. import org.apache.hadoop.hbase.client.Put;
  18. import org.apache.hadoop.hbase.client.Result;
  19. import org.apache.hadoop.hbase.client.ResultScanner;
  20. import org.apache.hadoop.hbase.client.Scan;
  21. import org.apache.hadoop.hbase.filter.Filter;
  22. import org.apache.hadoop.hbase.filter.FilterList;
  23. import org.apache.hadoop.hbase.filter.SingleColumnValueFilter;
  24. import org.apache.hadoop.hbase.filter.CompareFilter.CompareOp;
  25. import org.apache.hadoop.hbase.util.Bytes;
  26. public class JinTaoTest {
  27. public static Configuration configuration;
  28. static {
  29. configuration = HBaseConfiguration.create();
  30. configuration.set("hbase.zookeeper.property.clientPort", "2181");
  31. configuration.set("hbase.zookeeper.quorum", "192.168.1.100");
  32. configuration.set("hbase.master", "192.168.1.100:600000");
  33. }
  34. public static void main(String[] args) {
  35. // createTable("wujintao");
  36. // insertData("wujintao");
  37. // QueryAll("wujintao");
  38. // QueryByCondition1("wujintao");
  39. // QueryByCondition2("wujintao");
  40. //QueryByCondition3("wujintao");
  41. //deleteRow("wujintao","abcdef");
  42. deleteByCondition("wujintao","abcdef");
  43. }
  44. /**
  45. * 创建表
  46. * @param tableName
  47. */
  48. public static void createTable(String tableName) {
  49. System.out.println("start create table ......");
  50. try {
  51. HBaseAdmin hBaseAdmin = new HBaseAdmin(configuration);
  52. if (hBaseAdmin.tableExists(tableName)) {// 如果存在要创建的表,那么先删除,再创建
  53. hBaseAdmin.disableTable(tableName);
  54. hBaseAdmin.deleteTable(tableName);
  55. System.out.println(tableName + " is exist,detele....");
  56. }
  57. HTableDescriptor tableDescriptor = new HTableDescriptor(tableName);
  58. tableDescriptor.addFamily(new HColumnDescriptor("column1"));
  59. tableDescriptor.addFamily(new HColumnDescriptor("column2"));
  60. tableDescriptor.addFamily(new HColumnDescriptor("column3"));
  61. hBaseAdmin.createTable(tableDescriptor);
  62. } catch (MasterNotRunningException e) {
  63. e.printStackTrace();
  64. } catch (ZooKeeperConnectionException e) {
  65. e.printStackTrace();
  66. } catch (IOException e) {
  67. e.printStackTrace();
  68. }
  69. System.out.println("end create table ......");
  70. }
  71. /**
  72. * 插入数据
  73. * @param tableName
  74. */
  75. public static void insertData(String tableName) {
  76. System.out.println("start insert data ......");
  77. HTablePool pool = new HTablePool(configuration, 1000);
  78. HTable table = (HTable) pool.getTable(tableName);
  79. Put put = new Put("112233bbbcccc".getBytes());// 一个PUT代表一行数据,再NEW一个PUT表示第二行数据,每行一个唯一的ROWKEY,此处rowkey为put构造方法中传入的值
  80. put.add("column1".getBytes(), null, "aaa".getBytes());// 本行数据的第一列
  81. put.add("column2".getBytes(), null, "bbb".getBytes());// 本行数据的第三列
  82. put.add("column3".getBytes(), null, "ccc".getBytes());// 本行数据的第三列
  83. try {
  84. table.put(put);
  85. } catch (IOException e) {
  86. e.printStackTrace();
  87. }
  88. System.out.println("end insert data ......");
  89. }
  90. /**
  91. * 删除一张表
  92. * @param tableName
  93. */
  94. public static void dropTable(String tableName) {
  95. try {
  96. HBaseAdmin admin = new HBaseAdmin(configuration);
  97. admin.disableTable(tableName);
  98. admin.deleteTable(tableName);
  99. } catch (MasterNotRunningException e) {
  100. e.printStackTrace();
  101. } catch (ZooKeeperConnectionException e) {
  102. e.printStackTrace();
  103. } catch (IOException e) {
  104. e.printStackTrace();
  105. }
  106. }
  107. /**
  108. * 根据 rowkey删除一条记录
  109. * @param tablename
  110. * @param rowkey
  111. */
  112. public static void deleteRow(String tablename, String rowkey)  {
  113. try {
  114. HTable table = new HTable(configuration, tablename);
  115. List list = new ArrayList();
  116. Delete d1 = new Delete(rowkey.getBytes());
  117. list.add(d1);
  118. table.delete(list);
  119. System.out.println("删除行成功!");
  120. } catch (IOException e) {
  121. e.printStackTrace();
  122. }
  123. }
  124. /**
  125. * 组合条件删除
  126. * @param tablename
  127. * @param rowkey
  128. */
  129. public static void deleteByCondition(String tablename, String rowkey)  {
  130. //目前还没有发现有效的API能够实现 根据非rowkey的条件删除 这个功能能,还有清空表全部数据的API操作
  131. }
  132. /**
  133. * 查询所有数据
  134. * @param tableName
  135. */
  136. public static void QueryAll(String tableName) {
  137. HTablePool pool = new HTablePool(configuration, 1000);
  138. HTable table = (HTable) pool.getTable(tableName);
  139. try {
  140. ResultScanner rs = table.getScanner(new Scan());
  141. for (Result r : rs) {
  142. System.out.println("获得到rowkey:" + new String(r.getRow()));
  143. for (KeyValue keyValue : r.raw()) {
  144. System.out.println("列:" + new String(keyValue.getFamily())
  145. + "====值:" + new String(keyValue.getValue()));
  146. }
  147. }
  148. } catch (IOException e) {
  149. e.printStackTrace();
  150. }
  151. }
  152. /**
  153. * 单条件查询,根据rowkey查询唯一一条记录
  154. * @param tableName
  155. */
  156. public static void QueryByCondition1(String tableName) {
  157. HTablePool pool = new HTablePool(configuration, 1000);
  158. HTable table = (HTable) pool.getTable(tableName);
  159. try {
  160. Get scan = new Get("abcdef".getBytes());// 根据rowkey查询
  161. Result r = table.get(scan);
  162. System.out.println("获得到rowkey:" + new String(r.getRow()));
  163. for (KeyValue keyValue : r.raw()) {
  164. System.out.println("列:" + new String(keyValue.getFamily())
  165. + "====值:" + new String(keyValue.getValue()));
  166. }
  167. } catch (IOException e) {
  168. e.printStackTrace();
  169. }
  170. }
  171. /**
  172. * 单条件按查询,查询多条记录
  173. * @param tableName
  174. */
  175. public static void QueryByCondition2(String tableName) {
  176. try {
  177. HTablePool pool = new HTablePool(configuration, 1000);
  178. HTable table = (HTable) pool.getTable(tableName);
  179. Filter filter = new SingleColumnValueFilter(Bytes
  180. .toBytes("column1"), null, CompareOp.EQUAL, Bytes
  181. .toBytes("aaa")); // 当列column1的值为aaa时进行查询
  182. Scan s = new Scan();
  183. s.setFilter(filter);
  184. ResultScanner rs = table.getScanner(s);
  185. for (Result r : rs) {
  186. System.out.println("获得到rowkey:" + new String(r.getRow()));
  187. for (KeyValue keyValue : r.raw()) {
  188. System.out.println("列:" + new String(keyValue.getFamily())
  189. + "====值:" + new String(keyValue.getValue()));
  190. }
  191. }
  192. } catch (Exception e) {
  193. e.printStackTrace();
  194. }
  195. }
  196. /**
  197. * 组合条件查询
  198. * @param tableName
  199. */
  200. public static void QueryByCondition3(String tableName) {
  201. try {
  202. HTablePool pool = new HTablePool(configuration, 1000);
  203. HTable table = (HTable) pool.getTable(tableName);
  204. List<Filter> filters = new ArrayList<Filter>();
  205. Filter filter1 = new SingleColumnValueFilter(Bytes
  206. .toBytes("column1"), null, CompareOp.EQUAL, Bytes
  207. .toBytes("aaa"));
  208. filters.add(filter1);
  209. Filter filter2 = new SingleColumnValueFilter(Bytes
  210. .toBytes("column2"), null, CompareOp.EQUAL, Bytes
  211. .toBytes("bbb"));
  212. filters.add(filter2);
  213. Filter filter3 = new SingleColumnValueFilter(Bytes
  214. .toBytes("column3"), null, CompareOp.EQUAL, Bytes
  215. .toBytes("ccc"));
  216. filters.add(filter3);
  217. FilterList filterList1 = new FilterList(filters);
  218. Scan scan = new Scan();
  219. scan.setFilter(filterList1);
  220. ResultScanner rs = table.getScanner(scan);
  221. for (Result r : rs) {
  222. System.out.println("获得到rowkey:" + new String(r.getRow()));
  223. for (KeyValue keyValue : r.raw()) {
  224. System.out.println("列:" + new String(keyValue.getFamily())
  225. + "====值:" + new String(keyValue.getValue()));
  226. }
  227. }
  228. rs.close();
  229. } catch (Exception e) {
  230. e.printStackTrace();
  231. }
  232. }
  233. }

注意:可能大家没看到更新数据的操作,其实更新的操作跟添加完全一致,只不过是添加呢rowkey不存在,更新呢rowkey已经存在,并且timstamp相同的情况下,还有就是目前好像还没办法实现hbase数据的分页查询,不知道有没有人知道怎么做

HBase性能优化建议:

 针对前面的代码,有很多不足之处,在此我就不修改上面的代码了,只是提出建议的地方,大家自己加上

1)配置

当你调用create方法时将会加载两个配置文件:hbase-default.xml and hbase-site.xml,利用的是当前的java类路径, 代码中configuration设置的这些配置将会覆盖hbase-default.xml和hbase-site.xml中相同的配置,如果两个配置文件都存在并且都设置好了相应参上面的属性下面的属性即可

2)关于建表

public void createTable(HTableDescriptor desc)

HTableDescriptor 代表的是表的schema, 提供的方法中比较有用的有

setMaxFileSize,指定最大的region size

setMemStoreFlushSize 指定memstore flush到HDFS上的文件大小

增加family通过 addFamily方法

public void addFamily(final HColumnDescriptor family)

HColumnDescriptor代表的是column的schema,提供的方法比较常用的有

setTimeToLive:指定最大的TTL,单位是ms,过期数据会被自动删除。

setInMemory:指定是否放在内存中,对小表有用,可用于提高效率。默认关闭

setBloomFilter:指定是否使用BloomFilter,可提高随机查询效率。默认关闭

setCompressionType:设定数据压缩类型。默认无压缩。

setMaxVersions:指定数据最大保存的版本个数。默认为3。

注意的是,一般我们不去setInMemory为true,默认是关闭的

3)关于入库

官方建议

table.setAutoFlush(false); //数据入库之前先设置此项为false

table.setflushCommits();//入库完成后,手动刷入数据

注意:

在入库过程中,put.setWriteToWAL(true/flase);

关于这一项如果不希望大量数据在存储过程中丢失,建议设置为true,如果仅是在测试演练阶段,为了节省入库时间建议设置为false

4)关于获取表实例

HTablePool pool = new HTablePool(configuration, Integer.MAX_VALUE);

HTable table = (HTable) pool.getTable(tableName);

建议用表连接池的方式获取表,具体池有什么作用,我想用过数据库连接池的同学都知道,我就不再重复

不建议使用new HTable(configuration,tableName);的方式获取表

5)关于查询

建议每个查询语句都放入try catch语句块,并且finally中要进行关闭ResultScanner实例以及将不使用的表重新放入到HTablePool中的操作,具体做法如下

Java代码
 
 

  1. public static void QueryAll(String tableName) {
  2. HTablePool pool = new HTablePool(configuration, Integer.MAX_VALUE);
  3. HTable table = null;
  4. ResultScanner rs = null;
  5. try {
  6. Scan scan = new Scan();
  7. table = (HTable) pool.getTable(tableName);
  8. rs = table.getScanner(scan);
  9. for (Result r : rs) {
  10. System.out.println("获得到rowkey:" + new String(r.getRow()));
  11. for (KeyValue keyValue : r.raw()) {
  12. System.out.println("列:" + new String(keyValue.getFamily())
  13. + "====值:" + new String(keyValue.getValue()));
  14. }
  15. }
  16. } catch (IOException e) {
  17. e.printStackTrace();
  18. }finally{
  19. rs.close();// 最后还得关闭
  20. pool.putTable(table); //实际应用过程中,pool获取实例的方式应该抽取为单例模式的,不应在每个方法都重新获取一次(单例明白?就是抽取到专门获取pool的逻辑类中,具体逻辑为如果pool存在着直接使用,如果不存在则new)
  21. }
  22. }

所以,以上代码有缺陷的地方,感兴趣的同学可以针对优化建议作出相应修改

http://blog.csdn.net/ieicihc/article/details/10604129

http://javacrazyer.iteye.com/blog/1186881

时间: 2025-01-07 12:54:10

新浪微博数据解析与java操作Hbase实例的相关文章

java操作Hbase实例

所用HBase版本为1.1.2,hadoop版本为2.4 /* * 创建一个students表,并进行相关操作 */ import java.io.IOException; import java.util.ArrayList; import java.util.List; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.hbase.HBaseConfiguration; import org.apach

java操作hbase例子

hbase安装方法请参考:hbase-0.94安装方法详解 hbase常用的shell命令请参考:hbase常用的shell命令例子 java操作hbase,在eclipse中创建一个java项目,将hbase安装文件根目录的jar包和lib目录下jar包导入项目,然后就可以编写java代码操作hbase了.下面代码给出来一个简单的示例 /** * @date 2015-07-23 21:28:10 * @author sgl */ package com.songguoliang.hbase;

memcached—Java操作Memcached实例

前面博客介绍了如何在Windows操作系统中安装Memcached,总结一下如何使用Java操作Memcached实例: 代码一: package com.ghj.packageoftool; import java.io.BufferedWriter; import java.io.FileWriter; import java.io.IOException; import java.io.PrintWriter; import java.io.StringWriter; import jav

Java操作hbase总结

用过以后,总得写个总结,不然,就忘喽. 一.寻找操作的jar包. java操作hbase,首先要考虑到使用hbase的jar包. 因为咱装的是CDH5,比较方便,使用SecureCRT工具,远程连接到你安装的那台服务器上. jar包的存放位置在/opt/cloudera/parcels/CDH/lib/hbase,找到,下载下来. 在当前路径下,有一个lib包,里面是支持hbase的hadoop的jar包,根据需求,可以下载下来. 二.找一个API文档当成手册,哪里不会查哪里 百度分享,http

java操作hbase样例

hbase安装方法请參考:hbase-0.94安装方法具体解释 hbase经常使用的shell命令请參考:hbase经常使用的shell命令样例 java操作hbase,在eclipse中创建一个java项目.将hbase安装文件根文件夹的jar包和lib文件夹下jar包导入项目,然后就能够编写java代码操作hbase了. 以下代码给出来一个简单的演示样例 /** * @date 2015-07-23 21:28:10 * @author sgl */ package com.songguol

hbase0.96数据导入以及Kettle操作hbase问题

版本: cdh5.0.0+hadoop2.3.0+hbase0.96.1.1+Spoon5.0.1 一.HBase数据导入 HBase数据导入使用org.apache.hadoop.hbase.mapreduce.ImportTsv 的两种方式,一种是直接导入,一种是转换为HFile,然后再次导入. 1. HDFS数据为(部分): [[email protected] data]# hadoop fs -ls /input Found 1 items -rwxrwxrwx 1 hdfs supe

Hadoop之——Java操作HBase

转载请注明出处:http://blog.csdn.net/l1028386804/article/details/46463617 不多说,直接上代码,大家都懂得 package hbase; import java.io.IOException; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.hbase.HBaseConfiguration; import org.apache.hadoop.hbas

Java 操作Hbase 完整例子

开发工具:Eclipse,三步1.新建一个项目2.把hbase安装下的lib的文件都拷贝进来3.把lib目录下jar文件都引入4.lib下的client-facing-thirdparty 目录下的jar也都引入看图 package com.yue; import org.apache.hadoop.conf.Configuration;import org.apache.hadoop.hbase.;import org.apache.hadoop.hbase.client.; import j

Java操作Hbase进行建表、删表以及对数据进行增删改查,条件查询

1.搭建环境 新建JAVA项目,添加的包有: 有关Hadoop的hadoop-core-0.20.204.0.jar 有关Hbase的hbase-0.90.4.jar.hbase-0.90.4-tests.jar以及Hbase资源包中lib目录下的所有jar包 2.主要程序 Java代码 package com.wujintao.hbase.test; import java.io.IOException; import java.util.ArrayList; import java.util