使用solrj进行DIH操作

背景说明:在一个项目中需要将Mongodb中的数据导入到solr中完成搜索。在solr中Mysql数据库有对应的DIH包,可以通过配置sql语句完成数据的导入。Mongodb下也有开源的工具用来实现数据的导入。看了下文档,感觉这个工具对数据的定制性不高,并且是python的,不是很满足项目需求。最后决定使用solrj来完成数据的导入。

一、 遇到的问题

1. 全量数据很大,在全量或者增量时无法一次性将数据全部获取: 对数据进行分页获取。(关于两种分页获取数据的性能问题,后面会单独介绍)

2. 全量在更新数据时,需要将之前的老数据clean掉,增量则不需要: clean其实就是删除所有数据。

3. 由于使用了分页获取数据,全量的clean操作必须是在全量开始之前完成,并且为了保证在做全量过程中,之前的老数据不会丢失,删除全部数据的操作对应的commit==false,且在整个全量过程中commit==false, 在最后完成全量后,再进行commit。

4.增量操作和全量操作是通过配置不同的trigger完成的,比如增量每隔五分钟执行一次,全量则一天执行一次。如果没有对任务进行控制,可能会造成 全量和增量同时在做。刚才说了,全量操作的整个过程,commit==false, 所以对增量和全量的任务必须加互斥锁。

二、相关的实现

package com.meizu.galaxy2.solr;

import org.apache.log4j.Logger;
import org.apache.solr.client.solrj.SolrQuery;
import org.apache.solr.client.solrj.SolrServerException;
import org.apache.solr.client.solrj.impl.CloudSolrClient;
import org.apache.solr.client.solrj.request.UpdateRequest;
import org.apache.solr.common.SolrInputDocument;
import org.apache.solr.common.params.ModifiableSolrParams;

import java.io.IOException;
import java.lang.reflect.Field;
import java.util.List;

/**
 * Created by ltao on 2015-7-16.
 */
public class CloudSolrDIHClient {

    private static final Logger logger = Logger.getLogger(CloudSolrDIHClient.class);
    private static final String ID_FILED_NAME = "id";
    private CloudSolrClient solrClient;

    private static final int BATCH_SIZE=500;

    private String defaultCollection;

    public CloudSolrDIHClient(String zkHost, String zkNodePath, int zkClientTimeout, int zkConnectTimeout, String defaultCollection) {

        if (!zkHost.startsWith("zookeeper")) {
            logger.error("zk host must start with zookeeper://");
            return;
        } else {

            String hosts = zkHost.substring(12);
            hosts = hosts + zkNodePath;
            solrClient = new org.apache.solr.client.solrj.impl.CloudSolrClient(hosts);
            solrClient.setZkClientTimeout(zkClientTimeout);
            solrClient.setZkConnectTimeout(zkConnectTimeout);
            this.defaultCollection = defaultCollection;
        }
    }

    public void connect() throws Exception {
        solrClient.connect();
    }

    public void addDoc(SolrInputDocument doc) {
        if (this.defaultCollection != null) {
            this.addDoc(defaultCollection, doc);
        } else {
            logger.error("default collection should not be null");
        }

    }

    public void addDoc(String collection, SolrInputDocument doc) {
        try {
            solrClient.add(collection, doc);
        } catch (Exception e) {
            logger.error("add Doc occurs an error,collection:" + collection + ",doc_id:" + doc.getFieldValue(ID_FILED_NAME), e);
        }
    }

    public void addDocs(List<SolrInputDocument> docs) {
        if (this.defaultCollection != null) {
            this.addDocs(defaultCollection, docs);
        } else {
            logger.error("default collection should not be null");
        }
    }

    public void addDocs(String collection, List<SolrInputDocument> docs) {
        if(docs!=null && docs.size()>0) {
            int size=docs.size();
            if(size<=BATCH_SIZE) {
                try {
                    solrClient.add(collection, docs);
                } catch (Exception e) {
                    logger.error("add Docs occurs an error,collection:" + collection, e);
                }
            }
            else
            {
                int end=size>BATCH_SIZE? BATCH_SIZE:size;
                int start=0;
                while(true)
                {
                    List<SolrInputDocument> subList=docs.subList(start,end);
                    try {
                        solrClient.add(collection, subList);
                    } catch (Exception e) {
                        logger.error("add Docs occurs an error,collection:" + collection, e);
                    }
                    if(end==size)
                    {
                        break;
                    }
                    start=start+BATCH_SIZE;
                    end=(end+BATCH_SIZE);
                   if(end>size)
                   {
                       end=size;
                   }
                }

            }
        }

    }

    public void deleteDocByIds(List<String> ids) {
        if (this.defaultCollection != null) {
            this.deleteDocByIds(defaultCollection, ids);
        }
    }

    public void deleteDocByIds(String collection, List<String> ids) {

        try {
            solrClient.deleteById(collection,ids);
        } catch (Exception e) {
            logger.error("delete Docs occurs an error,collection:" + collection ,e);
        }

    }

    public void deleteDocById(String collection, String id) {
        try {
            solrClient.deleteById(collection, id);
        } catch (Exception e) {
            logger.error("delete Doc occurs an error,collection:" + collection + ",doc_id:" + id, e);
        }
    }

    public void deleteDocById(String id) {
        if (this.defaultCollection != null) {
            this.deleteDocById(defaultCollection, id);

        } else {
            logger.error("default collection should not be null");
        }
    }

    public void addBean(String collection, Object obj) {
        try {
            solrClient.addBean(collection, obj);
        } catch (Exception e) {
            String id = null;
            try {
                Field idFiled = obj.getClass().getDeclaredField(ID_FILED_NAME);
                if (idFiled != null) {
                    idFiled.setAccessible(true);
                    Object idFiledValue = idFiled.get(obj);
                    id = idFiledValue != null ? idFiledValue.toString() : "";
                }
            } catch (Exception e1) {
                logger.error("get id field occurs an error", e1);
            }
            logger.error("add bean occurs an error,collection:" + collection + ",bean_id:" + id, e);
        }
    }

    public void addBean(Object obj) throws SolrServerException, IOException {
        if (this.defaultCollection != null) {
            this.addBean(defaultCollection, obj);
        } else {
            logger.error("default collection should not be null");
        }

    }

    public void addBeans(List<Object> objs) throws SolrServerException, IOException {
        if (this.defaultCollection != null) {
            this.addBean(defaultCollection, objs);
        } else {
            logger.error("default collection should not be null");
        }
    }

    public void addBeans(String collection, List<Object> objs) {
        if(objs!=null && objs.size()>0) {
            int size=objs.size();
            if(size<=BATCH_SIZE) {
                try {
                    solrClient.addBeans(collection, objs);
                } catch (Exception e) {
                    logger.error("addBeans occurs an error,collection:" + collection, e);
                }
            }
            else
            {
                int end=size>BATCH_SIZE? BATCH_SIZE:size;
                int start=0;
                while(true)
                {
                    List<Object> subList=objs.subList(start,end);
                    try {
                        solrClient.addBeans(collection, subList);
                    } catch (Exception e) {
                        logger.error("addBeans occurs an error,collection:" + collection, e);
                    }
                    if(end==size)
                    {
                        break;
                    }
                    start=start+BATCH_SIZE;
                    end=(end+BATCH_SIZE);
                    if(end>size)
                    {
                        end=size;
                    }
                }

            }
        }
    }

    public void commit() throws SolrServerException, IOException {
        this.commit(defaultCollection);
    }

    public void commit(String collection) throws SolrServerException, IOException {
        solrClient.commit(collection);
    }

    public void clean(Boolean clean) throws SolrServerException, IOException {
        this.clean(defaultCollection, clean);
    }

    public void clean(String collection, Boolean clean) throws SolrServerException, IOException {
        UpdateRequest updateRequest = new UpdateRequest();
        updateRequest.setParam("stream.body", "<delete><query>*:*</query></delete>");
        updateRequest.setParam("commit", Boolean.toString(clean));
        solrClient.request(updateRequest, collection);
    }

    public void close() throws IOException {
        if (solrClient != null) {
            solrClient.close();
        }
    }

}

上面是对solrclient的简单封装:增加、删除、clean

全量和增量进行加锁互斥

  private static Lock lock = new ReentrantLock();

    public void importDelta() {
        boolean hasGetLock = false;
        try {
            hasGetLock = lock.tryLock();
            if (hasGetLock) {
                logger.info("start import delta hotel data ");
                long start = System.currentTimeMillis();
                hotelService.importDelta();
                long end = System.currentTimeMillis();
                logger.info("finish import delta hotel data ,spend " + (end - start) + " ms");
            }
        } finally {
            if (hasGetLock) {
                lock.unlock();
            }
        }

    }

    public void importAll() {

        try {
            lock.lock();
            logger.info("start import all hotel data ");
            long start = System.currentTimeMillis();
            hotelService.importAll();
            long end = System.currentTimeMillis();
            logger.info("finish import all hotel data ,spend " + (end - start) + " ms");
        } finally {
            lock.unlock();
        }
    }

    public DataImportService getHotelService() {
        return hotelService;
    }

这里用了Lock的tryLock,tryLock()会尝试获取锁,如果当前锁已被使用,则放弃该次获取锁操作。lock()则会阻塞,直到获取到锁。这样可以较大概率的保证全量一定能够执行。(如果增量一直都在运行,可能会造成全量一直阻塞,在实际运行中不会遇到这种情况;或者在某种机缘巧合下,增量一个接一个的获取到了锁,全量则一直阻塞,个人觉得应该可以使用公平锁解决刚才的这个问题,不过其实没必要)。

时间: 2024-11-05 19:25:24

使用solrj进行DIH操作的相关文章

Solr Centos6.5下搭建solr-7.7.2集群solrcloud+DIH操作

上一篇介绍了单机版的搭建,现在来介绍集群版的搭建 什么是SolrCloud SolrCloud(solr 云)是Solr提供的分布式搜索方案,当你需要大规模,容错,分布式索引和检索能力时使用 SolrCloud.当一个系统的索引数据量少的时候是不需要使用SolrCloud的,当索引量很大,搜索请求并发很高,这时需要使用SolrCloud满足这些需求. SolrCloud不同于redis集群自带集群,SolrCloud是基于Solr和Zookeeper的分布式搜索方案,它的主要思想是使用Zooke

solr的DIH操作同步mysql数据

1.创建MySQL数据 CREATE TABLE `city` ( `id` INT(10) UNSIGNED NOT NULL AUTO_INCREMENT COMMENT '城市编号', `province_id` INT(10) UNSIGNED NOT NULL COMMENT '省份编号', `city_name` VARCHAR(25) NULL DEFAULT NULL COMMENT '城市名称', `description` VARCHAR(25) NULL DEFAULT N

HBase视频教程

基于微博数据应用的HBase实战开发 课程观看地址:http://www.xuetuwuyou.com/course/150 课程出自学途无忧网:http://www.xuetuwuyou.com 一.课程用到的软件 1.centos6.7 2.apache-tomcat-7.0.47 3.solr-5.5 4.zookeeper 3.4.6 5.eclipse-jee-neon-R-win32-x86_64  6.jdk1.7_49 7.HBase1.2.2 8.Ganglia3.7.2 9.

基于微博数据应用的HBase实战开发_HBase视频教程

基于微博数据应用的HBase实战开发 课程观看地址:http://www.xuetuwuyou.com/course/150 课程出自学途无忧网:http://www.xuetuwuyou.com 一.课程用到的软件 1.centos6.7 2.apache-tomcat-7.0.47 3.solr-5.5 4.zookeeper 3.4.6 5.eclipse-jee-neon-R-win32-x86_64  6.jdk1.7_49 7.HBase1.2.2 8.Ganglia3.7.2 9.

Solr DataImportHandler 配置

DIH主要用于从数据库抓取数据并创建索引.另外还能够从HTTP(RSS.ATOM)拉数据. 相关概念: Datasource:数据源,包含获取数据必需的信息:数据位置(url).数据库driver.登录账号和password Entity:相当于数据库的一个视图,能够从一个表或联表查询获得 Processor:数据处理器,负责从数据源中获取数据.处理.然后增加到索引中 Transformer:数据转换器,可选,负责改动数据.创建新的field.或依据须要把一条记录变成多条记录 首先.链接数据库须

Apache Solr 之 使用SolrJ操作索引库

Solrj是Solr搜索服务器的一个比较基础的客户端工具,可以非常方便地与Solr搜索服务器进行交互.最基本的功能就是管理Solr索引,包括添加.更新.删除和查询等.对于一些比较基础的应用,用Solj基本够用,而且你可以非常容易地通过使用Solrj的API实现与Solr搜索服务器进行交互,实现对Solr的基本管理功能.如果你的应用比较复杂,可以扩展Solrj来满足需要. 使用 SolrJ操作索引库: package com.hcm.solr.test; import java.io.IOExce

solr4使用solrj操作索引库

solr配套有好多的客户端用于操作索引库,下面我们来讲如何用solrj去操作solr索引库. 一.认识solrj solrj是solr的java客户端,用于访问solr索引库.它提供了添加.删除.查询.优化等功能. 二.下载 百度.google以下solrj下载,你会发现根本就没有,那么我们该到哪儿下载呢?其实,它是集成到solr压缩包里的,解压文件后,有个目录/dist/solrj-lib,里面就存放了solrj所用到的jar,你把这些jar都添加到你的classpath就ok. 如果你是使用

使用solrj操作solr索引库

(solrj)初次使用solr的开发人员总是很郁闷,不知道如何去操作solr索引库,以为只能用<五分钟solr4.5教程(搭建.运行)>中讲到的用xml文件的形式提交数据到索引库,其实没有那么麻烦,solr配套有好多的客户端用于操作索引库,下面我们来讲如何用solrj去操作solr索引库. 一.认识solrj solrj是solr的java客户端,用于访问solr索引库.它提供了添加.删除.查询.优化等功能. 二.下载 百度.google以下solrj下载,你会发现根本就没有,那么我们该到哪儿

solr在使用solrj操作中的各个操作大全(在solrcores中测试)

package com.fjsh.SearchJobsFirst; import java.text.SimpleDateFormat; import java.util.Calendar; import java.util.Date; import java.util.GregorianCalendar; import java.util.HashMap; import java.util.Iterator; import java.util.LinkedHashMap; import jav