使用Coprocessor实现hbase+solr数据交互

HBase和Solr可以通过协处理器 Coprocessor 的方式向Solr发出请求,Solr对于接收到的数据可以做相关的同步:增、删、改索引的操作。使用solr作为hbase的二级索引,构建基于solr+hbase的快速多条件复杂查询。

查询时,先根据条件在solr中查找符合条件的rowkey,再根据rowkey从hbase中取数据,根据测试,分页查询时基本可以实现ms级的快速查询。

1. 编写SolrIndexCoprocessorObserver代码

package cn.ac.ict.solr.server;

import cn.ac.ict.solr.utils.SolrWriter;
import org.apache.hadoop.hbase.Cell;
import org.apache.hadoop.hbase.CellUtil;
import org.apache.hadoop.hbase.client.Delete;
import org.apache.hadoop.hbase.client.Durability;
import org.apache.hadoop.hbase.client.Put;
import org.apache.hadoop.hbase.coprocessor.BaseRegionObserver;
import org.apache.hadoop.hbase.coprocessor.ObserverContext;
import org.apache.hadoop.hbase.coprocessor.RegionCoprocessorEnvironment;
import org.apache.hadoop.hbase.regionserver.wal.WALEdit;
import org.apache.hadoop.hbase.util.Bytes;
import org.apache.solr.common.SolrInputDocument;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.io.IOException;

/**
 * 监听HBase,一有数据postPut就向Solr发送
 * hbase有两种coprocessor,一种是Observer(观察者),类似于关系数据库的trigger(触发器)
 * 另外一种就是EndPoint,类似于关系数据库的存储过程
 * 使用solrwrite进行写数据
 * User: zhaop
 * Date: 15-4-7
 * Time: 下午2:16
 */
public class SolrIndexCoprocessorObserver extends BaseRegionObserver{
    private static final Logger logger = LoggerFactory.getLogger(SolrIndexCoprocessorObserver.class);

    @Override
    public void postPut(ObserverContext<RegionCoprocessorEnvironment> e, Put put,WALEdit edit, Durability durability) throws IOException {
        logger.info("postPut 向solr中插入数据");
        inputSolr(put);
    }

    @Override
    public void postDelete(ObserverContext<RegionCoprocessorEnvironment> e, Delete delete,WALEdit edit,Durability durability) throws IOException {
        String rowKey = Bytes.toString(delete.getRow());
        try {
            logger.info("postDelete 删除solr中的数据");
            SolrWriter solrWriter = new SolrWriter();
            solrWriter.deleteDoc(rowKey);
        } catch (Exception ex){
            logger.info("postDelete delete rowKey = "+rowKey+" from solr fail:"+ex.getMessage());
            logger.error(ex.getMessage(),ex);
        }
    }

    public void inputSolr(Put put) {
        String rowKey = Bytes.toString(put.getRow());
        try {
            Cell cell_did = put.get(Bytes.toBytes("values"), Bytes.toBytes("did")).get(0);
            String did = new String(CellUtil.cloneValue(cell_did));
            Cell cell_dvid = put.get(Bytes.toBytes("values"), Bytes.toBytes("dvid")).get(0);
            String dvid = new String(CellUtil.cloneValue(cell_dvid));
            Cell cell_value= put.get(Bytes.toBytes("values"), Bytes.toBytes("value")).get(0);
            String value = new String(CellUtil.cloneValue(cell_value));
            Cell cell_timestamp = put.get(Bytes.toBytes("values"), Bytes.toBytes("timestamp")).get(0);
            String timestamp = new String(CellUtil.cloneValue(cell_timestamp));
            Cell cell_model = put.get(Bytes.toBytes("values"), Bytes.toBytes("model")).get(0);
            String model = new String(CellUtil.cloneValue(cell_model));

            SolrInputDocument doc = new SolrInputDocument();
            doc.addField("rowkey", rowKey);
            doc.addField("did", did);
            doc.addField("dvid", dvid);
            doc.addField("value", value);
            doc.addField("timestamp", timestamp);
            doc.addField("model", model);

            SolrWriter.addDocToCache(doc);
            logger.info("postPut 向solr缓存中插入数据成功,rowKey = "+rowKey);
        } catch (Exception e) {
            logger.info("postPut write rowKey = "+rowKey+" to solr fail:"+e.getMessage());
            logger.error(e.getMessage(),e);
        }
    }

}

package cn.ac.ict.solr.utils;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hbase.HBaseConfiguration;
import org.apache.solr.client.solrj.SolrServerException;
import org.apache.solr.client.solrj.impl.CloudSolrClient;
import org.apache.solr.common.SolrInputDocument;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.io.IOException;
import java.util.List;
import java.util.Timer;
import java.util.TimerTask;
import java.util.Vector;
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReentrantLock;

/**
 * 向sola中写数据,每隔一段时间
 * User: zhaop
 * Date: 15-4-9
 * Time: 下午8:50
 */
public class SolrWriter {
    private static final Logger logger = LoggerFactory.getLogger(SolrWriter.class);

    public static String urlSolr = "";     //solr地址
    private static String defaultCollection = "";  //默认collection
    private static int zkClientTimeOut = 0;//zk客户端请求超时间
    private static int zkConnectTimeOut = 0;//zk客户端连接超时间
    private static CloudSolrClient cloudSolrClient = null;

    private static int maxCacheCount = 0;   //缓存大小,当达到该上限时提交
    private static Vector<SolrInputDocument> cache = null;   //缓存 此处缓存对象可以改为 SolrInputDocument更具通用性
    public static Lock commitLock = new ReentrantLock();  //在添加缓存或进行提交时加锁

    private static int maxCommitTime = 60; //最大提交时间,s

    static {
        Configuration conf = HBaseConfiguration.create();
        urlSolr = conf.get("hbase.solr.zklist", "192.168.0.177:2181");
        defaultCollection = conf.get("hbase.solr.collection", "dev_values");
        zkClientTimeOut = conf.getInt("hbase.solr.zkClientTimeOut", 10000);
        zkConnectTimeOut = conf.getInt("hbase.solr.zkConnectTimeOut", 10000);
        maxCacheCount = conf.getInt("hbase.solr.maxCacheCount", 10000);
        maxCommitTime = conf.getInt("hbase.solr.maxCommitTime", 1);

        logger.info("solr init param " + urlSolr + "  " + defaultCollection + "  " + zkClientTimeOut + "  " + zkConnectTimeOut + "  " + maxCacheCount + "  " + maxCommitTime);
        try {
            cache = new Vector<SolrInputDocument>(maxCacheCount);

            cloudSolrClient = new CloudSolrClient(urlSolr);
            cloudSolrClient.setDefaultCollection(defaultCollection);
            cloudSolrClient.setZkClientTimeout(zkClientTimeOut);
            cloudSolrClient.setZkConnectTimeout(zkConnectTimeOut);

            //启动定时任务,第一次延迟1s执行,之后每隔指定时间执行一次
            Timer timer = new Timer();
            timer.schedule(new CommitTimer(), 1 * 1000, maxCommitTime * 1000);
        } catch (Exception e) {
            logger.error(e.getMessage(), e);
        }

    }

    /**
     * 批量提交
     */
    public void inputDoc(List<SolrInputDocument> docList) throws IOException, SolrServerException {
        if (docList == null || docList.size() == 0) {
            return;
        }
        /*List<SolrInputDocument> doclist = new ArrayList<SolrInputDocument>(deviceDataList.size());
        for (DeviceData dd : deviceDataList) {
            SolrInputDocument doc = new SolrInputDocument();
            doc.addField("rowkey", dd.getRowkey());
            doc.addField("did", dd.getDid());
            doc.addField("dvid", dd.getDvid());
            doc.addField("value", dd.getValue());
            doc.addField("timestamp", dd.getTimestamp());
            doc.addField("model", dd.getModel());
            doclist.add(doc);
        }*/
        cloudSolrClient.add(docList);
        cloudSolrClient.commit();
    }

    /**
     * 单条提交
     */
    public void inputDoc(SolrInputDocument doc) throws IOException, SolrServerException {
        if (doc == null) {
            return;
        }
        cloudSolrClient.add(doc);
        cloudSolrClient.commit();
    }

    public void deleteDoc(List<String> rowkeys) throws IOException, SolrServerException {
        if (rowkeys == null || rowkeys.size() == 0) {
            return;
        }
        cloudSolrClient.deleteById(rowkeys);
        cloudSolrClient.commit();
    }

    public void deleteDoc(String rowkey) throws IOException, SolrServerException {
        cloudSolrClient.deleteById(rowkey);
        cloudSolrClient.commit();
    }

    /**
     * 添加记录到cache,如果cache达到maxCacheCount,则提交
     */
    public static void addDocToCache(SolrInputDocument doc) {
        commitLock.lock();
        try {
            cache.add(doc);
            logger.info("cache commit maxCacheCount:" + maxCacheCount);
            logger.info("cache size:" + cache.size());
            if (cache.size() >= maxCacheCount) { //cache满则提交
                logger.info("cache commit, count:" + cache.size());
                new SolrWriter().inputDoc(cache);
                cache.clear();
            }
        } catch (Exception e) {
            logger.error(e.getMessage(), e);
        } finally {
            commitLock.unlock();
        }
    }

    /**
     * 提交定时器
     */
    static class CommitTimer extends TimerTask {
        @Override
        public void run() {
            commitLock.lock();
            try {
                if (cache.size() > 0) { //cache中有数据则提交
                    logger.info("timer commit, count:" + cache.size());
                    new SolrWriter().inputDoc(cache);
                    cache.clear();
                }
            } catch (Exception e) {
                logger.error(e.getMessage(), e);
            } finally {
                commitLock.unlock();
            }
        }
    }

}

2. 打成jar包上传到hadoop中

目录为hdfs:///lib/coprocessor-solr-1.0-SNAPSHOT.jar

进入hadoop bin目录
创建lib目录
hadoop fs -mkdir /lib
上传文件
hadoop fs -put coprocessor-solr-1.0-SNAPSHOT.jar /lib
查看是否已存在
hadoop fs -lsr /lib

3. hbase shell中添加coprocessor

对表增加coprocessor

disable ‘dev_values‘
alter ‘dev_values‘, METHOD => ‘table_att‘, ‘coprocessor‘=>‘hdfs:///lib/coprocessor-solr-1.0-SNAPSHOT.jar|cn.ac.ict.solr.server.SolrIndexCoprocessorObserver|1001|‘
enable ‘dev_values‘

查看是否已添加成功
describe ‘dev_values‘

 ‘dev_values‘, {TABLE_ATTRIBUTES => {coprocessor$2 => ‘hdfs:///lib/coprocessor-solr-1.0-SNAPSHOT.jar|cn.ac.ict.solr.server. true
 SolrIndexCoprocessorObserver|1001|‘}, {NAME => ‘values‘, DATA_BLOCK_ENCODING => ‘NONE‘, BLOOMFILTER => ‘ROW‘, REPLICATION_
 SCOPE => ‘0‘, VERSIONS => ‘1‘, COMPRESSION => ‘NONE‘, MIN_VERSIONS => ‘0‘, TTL => ‘FOREVER‘, KEEP_DELETED_CELLS => ‘false‘
 , BLOCKSIZE => ‘65536‘, IN_MEMORY => ‘false‘, BLOCKCACHE => ‘true‘}  

4. 大功告成,进行测试

向hbase中插入一条数据,查看日志是否有记录,solr中查看数据是否已存在

时间: 2024-10-11 11:57:26

使用Coprocessor实现hbase+solr数据交互的相关文章

HBASE+Solr实现详单查询--转

原文地址:https://mp.weixin.qq.com/s?srcid=0831kfMZgtx1sQbzulgeIETs&scene=23&mid=2663994161&sn=cee222a8534cbc6e28c401706e979dc0&idx=1&__biz=MzA3ODUxMzQxMA%3D%3D&chksm=847c675cb30bee4a5c4e9a03a41662ba6f312d4ba28407311a80c4a36f3f93a4bb624

AJAX+REA实现前后台数据交互的加密解密

AJAX+REA实现前后台数据交互的加密解密 1.创建js文件Encryption.js /**  * 加密解密  */ /** RSA加密用 生成key */ function bodyRSA(){ /** 1024位的key参数写130,2014位的key参数写260 */ setMaxDigits(130); /** ajax 调用后台方法,取回公钥 */ var keyR ;     $.ajax({      url: "/GHGL/Key/pk",//请求后台的url,本例

浅谈混合开发与Android,JS数据交互

本文是作者原创,如转载请注明出处! 一.概论 现在时代已经走过了移动互联网的超级火爆阶段,市场上移动开发人员已经趋于饱和,显然,只会原生APP的开发已不能满足市场的需求,随着H5的兴起与火爆,H5在原生APP中的使用越来越广泛,也就是我们常说的混合开发(Hybrid APP).最新很火的微信小程序相信大家都是知道的,实际上微信小程序加载的界面就是一个HTML5的界面,HTML5界面在一些电商类的APP中主要承担展示数据的作用,但是他的作用并不仅限于此,最起码js调用原生方法和原生调用js的方法是

Struts2基本使用(三)--数据交互

Struts2中的数据交互 在Struts2中我们不必再使用request.getParameter()这种方式来获取前台发送到服务器的参数. 我们可以在服务器端的Java类中直接声明一个和前台发送数据的同名变量即可,然后生成它的set/get方法即可以实现前后台数据的交互. 假如我们在前台页面中的表单如下: <form method="post" action="demo!register.action"> username:<input typ

JavaScript模板引擎实现数据交互

经过1年的磨练,近期终于稍微明白到,前端是怎么做到企业要求的:数据交互. 1,ajax+json这个是必须学的,但没问题,我们可以通过这个博客来慢慢了解怎么回事? 2,可以通过JS框架和JS模板来实现,但最后还是要用到ajax+json的. 注意:个人建议 假如项目页面数量是少于50-100个的,那么推荐使用JS模板:如果大于100个的用JS框架.各有各优势嘛. 今晚的博客分几次写完,看到这句话删除就证明已经写完了. 先分享JS模板的内容:我这次推荐使用百度的模板引擎,因为他比腾讯的art运行速

ASP.NET MVC 4 中的JSON数据交互

前台Ajax请求很多时候需要从后台获取JSON格式数据,一般有以下方式: 拼接字符串 return Content("{\"id\":\"1\",\"name\":\"A\"}"); 为了严格符合Json数据格式,对双引号进行了转义. 使用JavaScriptSerialize.Serialize()方法将对象序列化为JSON格式的字符串 MSDN 例如我们有一个匿名对象: var tempObj=new

集成Nutch/Hbase/Solr构建搜索引擎之二:内容分析

请先参见"集成Nutch/Hbase/Solr构建搜索引擎之一:安装及运行",搭建测试环境 http://blog.csdn.net/jediael_lu/article/details/37329731 一.被索引的域 Schema.xml 在使用solr对Nutch抓取到的网页进行索引时,schema.xml被改变成以下内容. 文件中指定了哪些域被索引.存储等内容. <?xml version="1.0" encoding="UTF-8"

关系型数据库与HBase的数据储存方式区别

如今Bigtable型(列族)数据库应用越来越广,功能也很强大.但是很多人还是把它当做关系型数据库在使用,用原来关系型数据库的思维建表.存储.查询.本文以hbase举例讲述数据模式的变化. 传统关系型数据库(mysql,oracle)数据存储方式主要如下: 图一 上图是个很典型的数据储存方式,我把每条记录分成3部分:主键.记录属性.索引字段.我们会对索引字段建立索引,达到二级索引的效果. 但是随着业务的发展,查询条件越来越复杂,需要更多的索引字段,且很多值都不存在,如下图: 图二 上图是6个索

Spring MVC基础知识整理?View与Controller数据交互

概述 Spring MVC是由View—Controller—Model组成,其中View和Controller的数据交互,成为了关注的核心点.MVC中,我们将View中的数据传递到Controller,可以采用POST或者Get,传递相应的参数.Controller通过绑定来,匹配前台传递的参数.后台Controller也可以将值传递到前台页面. View值传递Controller 绑定传值的常用方式有如下 @RequestParam,绑定单个请求数据,可以是URL中的数据,表单提交的数据或上