elasticsearch持有者类

import com.fasterxml.jackson.core.JsonProcessingException;
import com.fasterxml.jackson.databind.ObjectMapper;
import org.apache.commons.lang3.StringUtils;
import org.elasticsearch.action.bulk.BulkItemResponse;
import org.elasticsearch.action.bulk.BulkRequestBuilder;
import org.elasticsearch.action.bulk.BulkResponse;
import org.elasticsearch.action.delete.DeleteRequest;
import org.elasticsearch.action.delete.DeleteRequestBuilder;
import org.elasticsearch.action.delete.DeleteResponse;
import org.elasticsearch.action.index.IndexRequest;
import org.elasticsearch.action.search.SearchRequestBuilder;
import org.elasticsearch.client.Client;
import org.elasticsearch.client.Requests;
import org.elasticsearch.client.transport.TransportClient;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.transport.InetSocketTransportAddress;
import org.elasticsearch.common.transport.TransportAddress;
import org.elasticsearch.index.query.QueryBuilder;
import org.elasticsearch.search.SearchHits;
import org.elasticsearch.search.builder.SearchSourceBuilder;
import org.elasticsearch.search.sort.SortOrder;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.io.Closeable;
import java.io.IOException;
import java.io.Serializable;
import java.net.InetAddress;
import java.util.Arrays;
import java.util.List;
import java.util.Map;

/**
 * <p></p>
 *
 * @author
 * @version V1.0
 * @modificationHistory=========================逻辑或功能性重大变更记录
 * @modify by user: $author$ $date$
 * @modify by reason: {方法名}:{原因}
 */
public class ESHolder implements Serializable,Closeable{
    private static final Logger LOG = LoggerFactory.getLogger(ESHolder.class);

    private String esClusterName = null;
    private String esClusterAddress = null;
    // ES客户端
    private Client ESClient = null;

    public ESHolder(String esClusterName, String esClusterAddress) {
        this.esClusterName = esClusterName;
        this.esClusterAddress = esClusterAddress;
    }

    public Client getESClient() {
        if (ESClient == null) {
            initESClient(esClusterName, esClusterAddress);
        }
        return ESClient;
    }

    /**
     * 批量建立ES索引
     *
     * @param list
     * @return
     * @author
     */
    public boolean addIndex(String indexName, String typeName, List<Map<String, Object>> list) {
        long t = System.currentTimeMillis();
        try {
            ObjectMapper mapper = new ObjectMapper();
            BulkRequestBuilder bulkRequest = getESClient().prepareBulk();
            for(Map<String, Object> data : list){
                byte[] json = mapper.writeValueAsBytes(data);
                bulkRequest.add(new IndexRequest(indexName, typeName).source(json));
            }

            BulkResponse response = bulkRequest.execute().actionGet();
            if(response.hasFailures()){
                BulkItemResponse[] itemResponses = response.getItems();
                for(BulkItemResponse itemResponse : itemResponses){
                    // TODO Must do something to handle failures.
                    LOG.error("Add ES Index failed! DOC_ID: {}, Reason: {}", itemResponse.getId(), itemResponse.getFailureMessage());
                }
            }
        } catch (JsonProcessingException e) {
            LOG.error("Build index fail.", e);
            return false;
        }
        LOG.debug("build index complete,num:{}, cost:{}", list.size(), System.currentTimeMillis() - t);
        return true;
    }

    /**
     * 批量删除ES索引
     *
     * @param docIds
     *
     *
     */
    public void deleteIndex(String indexName, String typeName, List<String> docIds){
        BulkRequestBuilder bulkRequest = getESClient().prepareBulk();
        for(String docId : docIds){
            bulkRequest.add(new DeleteRequest(indexName, typeName, docId));
        }
        BulkResponse response = bulkRequest.execute().actionGet();
        if(response.hasFailures()){
            BulkItemResponse[] itemResponses = response.getItems();
            for(BulkItemResponse itemResponse : itemResponses){
                // TODO Must do something to handle failures.
                LOG.error("ES Index delete failed! DOC_ID: {}, Reason: {}", itemResponse.getId(), itemResponse.getFailureMessage());
            }
        }
    }

    /**
     * 删除ES索引
     *
     * @param indexName
     * @param typeName
     * @param data
     * @return
     */
    public boolean deleteIndex(String indexName, String typeName, Map<String, Object> data){
        DeleteRequestBuilder requestBuilder = getESClient().prepareDelete(indexName, typeName,
                (String) data.get(“rowkey”));
        DeleteResponse response = requestBuilder.execute().actionGet();
        if(!response.isFound()){
            LOG.error("ES Index not found! DOC_ID: {}", response.getId());
            return false;
        }
        return true;
    }

    /**
     * 从ES查询数据
     *
     * @param query
     * @return
     *
     */
    public SearchHits queryWithES(SearchRequestBuilder query){
        SearchHits response = query.execute().actionGet().getHits();
        return response;
    }

    /**
     * 构造查询对象
     *
     * @param index
     * @param type
     * @param queryBuilder
     * @param retField
     * @param sortField
     * @param start
     * @param rows
     * @return
     */
    public SearchRequestBuilder buildSearch(String index, String type, QueryBuilder queryBuilder, String retField, String sortField, SortOrder sortOrder, int start, int rows){

        SearchSourceBuilder searchSourceBuilder = new SearchSourceBuilder();
        searchSourceBuilder.query(queryBuilder).from(start).size(rows);

        if(StringUtils.isNotEmpty(retField)){
            searchSourceBuilder.field(retField);
        }

        if(StringUtils.isNotEmpty(sortField)){
            searchSourceBuilder.sort(sortField, sortOrder);
        }

        LOG.debug("ES Query string: " + searchSourceBuilder.toString());

        return getESClient().prepareSearch().setIndices(index).setTypes(type)
                .setExtraSource(searchSourceBuilder.buildAsBytes(Requests.CONTENT_TYPE));
    }

    /**
     * 统计数据量
     *
     * @return 符合条件的数据量
     */
    public long countWithQuery(String indexName, String typeName, QueryBuilder queryBuilder){
        SearchRequestBuilder builder = getESClient().prepareSearch(indexName).setTypes(typeName)
                .setQuery(queryBuilder).setFrom(0).setSize(0);
        return countWithQuery(builder);
    }

    /**
     * 统计数据量
     *
     * @param query
     * @return
     *
     */
    public long countWithQuery(SearchRequestBuilder query){
        return query.execute().actionGet().getHits().getTotalHits();
    }

    /**
     * 初始化ES客户端
     *
     * @return
     */
    private void initESClient(String esClusterName, String esClusterAddress) {
        int esClientTimeout = 180000;
        LOG.info("init ES Client...");
        try {
            String[] hostPair = esClusterAddress.split(“,”);
            TransportAddress[] addrs = new TransportAddress[hostPair.length];

            int i = 0;
            String[] keyValuePair;
            for (String t : hostPair) {
                keyValuePair = t.split(":");
                if (2 != keyValuePair.length) {
                    throw new IOException("ES‘s host is not correct:" + Arrays.toString(keyValuePair));
                }
                addrs[i] = new InetSocketTransportAddress(InetAddress.getByName(keyValuePair[0]), Integer.valueOf(keyValuePair[1]));
                i++;
            }

            Settings settings = Settings.settingsBuilder()
                    .put("cluster.name", esClusterName)
                    .put("client.transport.sniff", true)
                    .put("client.transport.ping_timeout", esClientTimeout + "s").build();

            ESClient = TransportClient.builder().settings(settings).build().addTransportAddresses(addrs);
        } catch (Exception e) {
            LOG.error("Address error!", e);
        }
    }

    @Override
    public void close() throws IOException {
        if(this.ESClient != null){
            LOG.info("closing esclient....");
            this.ESClient.close();
            this.ESClient = null;
        }
    }
}
时间: 2025-01-18 00:58:08

elasticsearch持有者类的相关文章

zookeeper持有者类

1 import org.apache.curator.RetryPolicy; 2 import org.apache.curator.framework.CuratorFramework; 3 import org.apache.curator.framework.CuratorFrameworkFactory; 4 import org.apache.curator.framework.imps.CuratorFrameworkState; 5 import org.apache.cura

分布式搜索引擎Elasticsearch PHP类封装 使用原生api

<?php class ElasticSearch { public $index; function __construct($server = 'http://localhost:9200'){ $this->server = $server; } function call($path, $http = array()){ if (!$this->index) throw new Exception('$this->index needs a value'); return 

第三百七十节,Python分布式爬虫打造搜索引擎Scrapy精讲—elasticsearch(搜索引擎)用Django实现搜索结果分页

第三百七十节,Python分布式爬虫打造搜索引擎Scrapy精讲-elasticsearch(搜索引擎)用Django实现搜索结果分页 逻辑处理函数 计算搜索耗时 在开始搜索前:start_time = datetime.now()获取当前时间 在搜索结束后:end_time = datetime.now()获取当前时间 last_time = (end_time-start_time).total_seconds()结束时间减去开始时间等于用时,转换成秒 from django.shortcu

ElasticSearch的ik分词插件开发

摘要 本文主要介绍如何开发ElasticSearch的ik分词插件.很多时候,网上开源的分词插件不能满足业务需求,只能自己定义开发一套ik分词,let's go! ik插件,说白了,就是通过封装ik分词器,与ElasticSearch对接,让ElasticSearch能够驱动该分词器.那么,具体怎么与ElasticSearch对接呢?从下往上走,总共3步: 一.封装IK分析器 与ElasticSearch集成,分词器的配置均从ElasticSearch的配置文件读取,因此,需要重载IKAnaly

elasticsearch 中文API 索引(三)

索引API 索引API允许开发者索引类型化的JSON文档到一个特定的索引,使其可以被搜索. 生成JSON文档 有几种不同的方式生成JSON文档 利用byte[]或者作为一个String手动生成 利用一个Map将其自动转换为相应的JSON 利用第三方库如Jackson去序列化你的bean 利用内置的帮助函数XContentFactory.jsonBuilder() 手动生成 需要注意的是,要通过Date Format编码日期. String json = "{" + "\&qu

ElasticSearch Java Api -创建索引

ElasticSearch JAVA API官网文档:https://www.elastic.co/guide/en/elasticsearch/client/java-api/current/java-docs-index.html 一.生成JSON 创建索引的第一步是要把对象转换为JSON字符串.官网给出了四种创建JSON文档的方法: 1.1手写方式生成 String json = "{" + "\"user\":\"kimchy\"

ElasticSearch安装和使用

ElasticSearch是开源搜索平台的新成员,实时数据分析的神器.可以理解为作为搜索的数据库,可以提供搜索功能.对比关系型数据库,具有以下的相似关系: 关系型数据库 数据库 表 行 列 ElasticSearch 索引 类型 文档 字段 一个ES集群可以包含多个索引(数据库),每个索引又包含了很多类型(表),类型中包含了很多文档(行),每个文档又包含了很多字段(列). 如果要实现对关系型数据库数据的搜索功能,需要将关系型数据库中的数据导入到ElasticSearch中,网上有解决方案.但是好

Elasticsearch 的安装和基本使用

Elasticsearch 是一个搜索服务器,特点:分布式.易于扩展.全文检索.索引速度快. 本篇文章主要介绍 Elasticsearch 的安装和基本使用,假定你有一定的Linux基础(所有命令均在命令行中执行). Elasticsearch 版本:2.2.0 csdn下载 服务器:CentOS 6.4 (win7 下的虚拟机) 一.安装 因为 Elasticsearch 是 Java 开发的,所以要先安装 Java(下载) 可用 java -version来查 看是否已安装Java 若没有安

四十九 Python分布式爬虫打造搜索引擎Scrapy精讲—elasticsearch(搜索引擎)用Django实现搜索结果分页

逻辑处理函数 计算搜索耗时 在开始搜索前:start_time = datetime.now()获取当前时间 在搜索结束后:end_time = datetime.now()获取当前时间 last_time = (end_time-start_time).total_seconds()结束时间减去开始时间等于用时,转换成秒 from django.shortcuts import render # Create your views here. from django.shortcuts imp