java操作elasticsearch

Elasticsearch是一个搜索引擎,建立在Lucene之上

集群 (cluster)

  代表一个集群,集群中有多个节点,其中有一个为主节点,这个主节点是可以通过选举产生的,主从节点是对于集群内部来说的。  es的一个概念就是去中心化,字面上理解就是无中心节点,这是对于集群外部来说的,因为从外部来看es集群,在逻辑上是个整体,  你与任何一个节点的通信和与整个es集群通信是等价的。

节点(node)

  每一个运行实例称为一个节点,每一个运行实例既可以在同一机器上,也可以在不同的机器上.所谓运行实例,就是一个服务器进程.  在测试环境内,可以在一台服务器上运行多个服务器进程,在生产环境建议每台服务器运行一个服务器进程

索引(index)

  这里的索引是名词不是动词,在elasticsearch里面支持多个索引。类似于关系数据库里面每一个服务器可以支持多个数据库一样。  在每一索引下面又支持多种类型,类似于关系数据库里面的一个数据库可以有多张表。但是本质上和关系数据库有很大的区别。 

分片(shards) 

  把一个索引分解为多个小的索引,每一个小的索引叫做分片。分片后就可以把各个分片分配到不同的节点中,构成分布式搜索  分片的数量只能在索引创建前指定,并且索引创建后不能更改

副本(replicas)

  副本的作用一是提高系统的容错性,当个某个节点某个分片损坏或丢失时可以从副本中恢复。二是提高es的查询效率,es会自动对搜索请求进行负载均衡

recovery

  代表数据恢复或叫数据重新分布,es在有节点加入或退出时会根据机器的负载对索引分片进行重新分配,挂掉的节点重新启动时也会进行数据恢复。

river

  代表es的一个数据源,也是其它存储方式(如:数据库)同步数据到es的一个方法。它是以插件方式存在的一个es服务,通过读取river中的数据并把它索引到es中,

   官方的river有couchDB的,RabbitMQ的,Twitter的,Wikipedia的,river这个功能将会在后面的文件中重点说到。

gateway

  代表es索引的持久化存储方式,es默认是先把索引存放到内存中,当内存满了时再持久化到硬盘。当这个es集群关闭再重新启动时就会从gateway中读取索引数据。

    es支持多种类型的gateway,有本地文件系统(默认),分布式文件系统,Hadoop的HDFS和amazon的s3云存储服务。

    ---将各种集群状态信息、索引配置信息等全部持久存放在网关中

discovery.zen

  代表es的自动发现节点机制,es是一个基于p2p的系统,它先通过广播寻找存在的节点,再通过多播协议来进行节点之间的通信,同时也支持点对点的交互。

Transport

  代表es内部节点或集群与客户端的交互方式,默认内部是使用tcp协议进行交互,同时它支持http协议(json格式)、thrift、servlet、  memcached、zeroMQ等的传输协议(通过插件方式集成)。

索引(Index)

  ElaticSearch将数据存放在一个或多个索引当中。一个索引相当于一个数据库,里面存放用户文档数据。在底层,ElasticSearch实际上还是
   使用Lucene完成读写数据的操作,ElasticSearch索引是由一个或多个Lucene索引组成,所以ES中的分片或副本实际上就是一个Lucene索引。
 

文档(Document)

  文档是ES中主要的实体,所有ES的查询都是基于存放在ES中文档资源的查询。每个文档都是由各种域(Field)组成,每个域(Field)有一个名  称和一个或多个值构成。实际上,从用户的角度看,一个ES文档就是一个JSON对象。

映射(Mapping)

  映射用于定义文档域的属性,这些属性包括分词器,字段类型,存储类型等。对于没有定义的字段类型的属性,ES可以自动通过其字段值进行识别。

类型(Type)

  ES中每个文档必须有一个类型定义。这里的类型相当于数据库当中的表,类型定义了字段映射(类似数据库表结构),  这样一来,每个索引可以包含多种文档类型,而每种文档类型定义一种映射关系。 

路由(Routing)

  ES给每个文档建索引后,通过路由可以算出所查的文档处在哪个分片上,因为在建立索引之初使用公式:shard = hash(routting) % number_of_pr  imary_shards进行文档分配。routing值是一个任意的字符串,默认是文档的ID,通过人工指定就可以控制文档存放在哪个shard的位置了。

索引别名(Index Alias)

  索引别名相当于快捷方式或软链接,可以指向一个或多个索引,甚至可以指向带路由的分片。

近实时性 near realtime (nrt)

  Elasticsearch是一个近实时性的搜索平台,所以对于刚建过的索引文件进行查询时需要一个轻微的等待时间(通常为1秒)。

java操作elastic:

    <properties>
        <project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
        <runSuite>**/MainTestSuite.class</runSuite>
        <elasticsearch.plugin.name>sql</elasticsearch.plugin.name>
        <elasticsearch.plugin.site>true</elasticsearch.plugin.site>
        <elasticsearch.plugin.jvm>true</elasticsearch.plugin.jvm>
        <elasticsearch.version>5.6.2</elasticsearch.version>
        <elasticsearch.rest.version>5.5.2</elasticsearch.rest.version>
        <slf4j.version>1.7.7</slf4j.version>
        <elasticsearch.plugin.classname>org.elasticsearch.plugin.nlpcn.SqlPlug</elasticsearch.plugin.classname>
    </properties>

    <repositories>
        <repository>
            <id>elasticsearch-releases</id>
            <url>https://artifacts.elastic.co/maven</url>
            <releases>
                <enabled>true</enabled>
            </releases>
            <snapshots>
                <enabled>false</enabled>
            </snapshots>
        </repository>
    </repositories>

    <dependencies>
        <dependency>
            <groupId>org.springframework</groupId>
            <artifactId>spring-core</artifactId>
            <version>4.0.2.RELEASE</version>
        </dependency>

        <dependency>
            <groupId>org.elasticsearch</groupId>
            <artifactId>elasticsearch</artifactId>
            <version>${elasticsearch.version}</version>
            <scope>provided</scope>
        </dependency>

        <dependency>
            <groupId>org.elasticsearch.client</groupId>
            <artifactId>transport</artifactId>
            <version>${elasticsearch.version}</version>
            <scope>provided</scope>
        </dependency>

        <dependency>
            <groupId>org.elasticsearch.client</groupId>
            <artifactId>rest</artifactId>
            <version>${elasticsearch.rest.version}</version>
        </dependency>

        <dependency>
            <groupId>org.elasticsearch.client</groupId>
            <artifactId>x-pack-transport</artifactId>
            <version>${elasticsearch.version}</version>
        </dependency>

        <dependency>
            <groupId>commons-io</groupId>
            <artifactId>commons-io</artifactId>
            <version>2.4</version>
        </dependency>

        <dependency>
            <groupId>com.google.guava</groupId>
            <artifactId>guava</artifactId>
            <version>15.0</version>
        </dependency>

        <dependency>
            <groupId>org.hamcrest</groupId>
            <artifactId>hamcrest-all</artifactId>
            <version>1.3</version>
            <scope>test</scope>
        </dependency>

        <dependency>
            <groupId>org.locationtech.spatial4j</groupId>
            <artifactId>spatial4j</artifactId>
            <version>0.6</version>
        </dependency>

        <dependency>
            <groupId>com.vividsolutions</groupId>
            <artifactId>jts</artifactId>
            <version>1.13</version>
        </dependency>

        <dependency>
            <groupId>com.alibaba</groupId>
            <artifactId>fastjson</artifactId>
            <version>1.1.41</version>
            <scope>test</scope>
        </dependency>

        <dependency>
            <groupId>com.alibaba</groupId>
            <artifactId>druid</artifactId>
            <version>1.0.15</version>
        </dependency>

        <!-- LOGGING begin -->
        <dependency>
            <groupId>org.slf4j</groupId>
            <artifactId>slf4j-api</artifactId>
            <version>${slf4j.version}</version>
        </dependency>
        <dependency>
            <groupId>org.slf4j</groupId>
            <artifactId>slf4j-log4j12</artifactId>
            <version>${slf4j.version}</version>
        </dependency>
        <!-- common-logging 实际调用slf4j -->
        <dependency>
            <groupId>org.slf4j</groupId>
            <artifactId>jcl-over-slf4j</artifactId>
            <version>${slf4j.version}</version>
        </dependency>
        <!-- java.util.logging 实际调用slf4j -->
        <dependency>
            <groupId>org.slf4j</groupId>
            <artifactId>jul-to-slf4j</artifactId>
            <version>${slf4j.version}</version>
        </dependency>
        <!-- LOGGING end -->

        <dependency>
            <groupId>javax.servlet</groupId>
            <artifactId>servlet-api</artifactId>
            <version>2.5</version>
            <scope>provided</scope>
        </dependency>

        <dependency>
            <groupId>junit</groupId>
            <artifactId>junit</artifactId>
            <version>4.12</version>
            <scope>test</scope>
        </dependency>
    </dependencies>

EsQuery

public class EsQuery {

    protected static Logger logger = LoggerFactory.getLogger(EsQuery.class);

    /**
     * 集群状态
     */
    public void clusterStatus(){
        ClusterAdminClient clusterAdminClient = ElasticUtil.getClusterClient().admin().cluster();
        ClusterHealthResponse healths = clusterAdminClient.prepareHealth().get();
        String clusterName = healths.getClusterName();
        int numberOfDataNodes = healths.getNumberOfDataNodes();
        int numberOfNodes = healths.getNumberOfNodes();
        ClusterHealthStatus status = healths.getStatus();
        System.out.println("集群名称:"+clusterName);
        System.out.println("数据节点:"+numberOfDataNodes);
        System.out.println("正常节点:"+numberOfNodes);
        System.out.println("状态值:"+status.name());
    }

    /**
     * 判断索引库是否存在
     * @param indexName
     * @return
     */
    public boolean isIndexExists(String indexName) {
        IndicesExistsRequest inExistsRequest = new IndicesExistsRequest(indexName);
        IndicesExistsResponse inExistsResponse = ElasticUtil.getClusterClient().admin().indices()
                .exists(inExistsRequest).actionGet();
        return inExistsResponse.isExists();
    }

    /**
     * 创建索引 indexName 索引名称
     * @param indexName
     * @return
     * @throws IOException
     */
    public boolean createIndex(String indexName){
        if(isIndexExists(indexName)){
            return false;
        }
        CreateIndexResponse response = ElasticUtil.getClusterClient().admin().indices().prepareCreate(indexName).execute().actionGet();
        if(response.isAcknowledged()){
            return true;
        }
        return false;
    }

    /**
     * 删除索引库
     * @param indexName
     * @return
     */
    public boolean dropIndex(String indexName) {
        if (!isIndexExists(indexName)) {
            return false;
        } else {
            DeleteIndexResponse dResponse = ElasticUtil.getClusterClient().admin().indices().prepareDelete(indexName).execute().actionGet();
            if (dResponse.isAcknowledged()) {
                return true;
            }else{
                return false;
            }
        }
    }

    /**
     * 采用standard分词器-默认*/
    public boolean addType(String indexName,String typeName){
        XContentBuilder builder=null;
        try {
            builder = XContentFactory.jsonBuilder()
                    .startObject()
                    .startObject(typeName)
                    .endObject()
                    .endObject();
        } catch (IOException e) {
            e.printStackTrace();
        }
        PutMappingRequest mappingRequest = Requests.putMappingRequest(indexName).type(typeName).source(builder);
        try {
            PutMappingResponse mappingResponse = ElasticUtil.getClusterClient().admin().indices().putMapping(mappingRequest).actionGet();
            if (mappingResponse.isAcknowledged()) {
                return true;
            }else{
                return false;
            }
        } catch (IndexNotFoundException e) {
            System.out.println("索引不存在,创建失败...");
        }
        return false;
    }

    /**
     * 采用IK分词器
     */
    public boolean addIKType(String indexName,String typeName){
        XContentBuilder builder=null;
        try {
            builder = XContentFactory.jsonBuilder()
                    .startObject()
                    .startObject(typeName)
                    .startObject("properties")
                    .startObject("poi_id").field("type","integer").endObject()
                    .startObject("poi_title").field("type","text").field("analyzer","ik_max_word").endObject()
                    .startObject("poi_address").field("type","text").field("analyzer","ik_max_word").endObject()
                    .startObject("poi_tags").field("type","text").field("analyzer","ik_max_word").endObject()
                    .startObject("poi_phone").field("type","text").endObject()
                    .endObject()
                    .endObject()
                    .endObject();
        } catch (IOException e) {
            e.printStackTrace();
        }
        PutMappingRequest mappingRequest = Requests.putMappingRequest(indexName).type(typeName).source(builder);
        try {
            PutMappingResponse mappingResponse = ElasticUtil.getClusterClient().admin().indices().putMapping(mappingRequest).actionGet();
            if (mappingResponse.isAcknowledged()) {
                return true;
            }else{
                return false;
            }
        } catch (IndexNotFoundException e) {
            System.out.println("索引不存在,创建失败...");
        }
        return false;
    }

    /**
     * 添加或修改数据  设置自己的id
     * @param id
     * @param json
     */
    public static String insertOrUpdate(String indexName,String typeName,String id,Map<String, Object> json) {
        if(json==null){
            return null;
        }
        IndexResponse response = ElasticUtil.getClusterClient().prepareIndex(indexName, typeName,id).setSource(json).execute().actionGet();
        return response.getId();
    }

    /**
     * 添加或修改数据  使用随机id
     * @param json
     */
    public String insertOrUpdate(String indexName,String typeName,Map<String, Object> json) {
        if(json==null){
            return null;
        }
        IndexResponse response = ElasticUtil.getClusterClient().prepareIndex(indexName, typeName).setSource(json).execute().actionGet();
        return response.getId();
    }

    /**
     * 通过id查询单条数据
     * @param id
     * @return
     */
    public GetResponse getResourceById(String indexName,String typeName,String id){
        GetResponse response = ElasticUtil.getClusterClient().prepareGet(indexName,typeName, id).get();
        //Map<String, Object> source = response.getSource();
        return response;
    }

    /**
     * 删除数据
     * @param id
     */
    public void deleteResourceByIds(String indexName,String typeName,String[] ids) {
        if(ids==null||ids.length<1){
            return;
        }
        for(String id :ids){
            ElasticUtil.getClusterClient().prepareDelete(indexName, typeName, id)
            .execute().actionGet();
            System.out.println("删除id: "+id);
        }
        System.out.println("delete over..");
    }

    /**
     * 查询 index/type 数据
     * @param indexName
     */
    public static void simpleQuery(String indexName,String typeName){
        SearchRequestBuilder prepareSearch = ElasticUtil.getClusterClient().prepareSearch(indexName);
        if(typeName!=null && !"".equals(typeName.trim())){
            prepareSearch.setTypes(typeName);
        }
        //        prepareSearch.setFrom(1).setSize(10);
        SearchResponse response = prepareSearch.execute().actionGet();
        SearchHit[] searchHits = response.getHits().getHits();
        for (SearchHit sh : searchHits) {
            System.out.println(sh.getId()+"==>"+sh.getSource());
        }
        System.out.println("total==> "+searchHits.length);
    }

    /**
     * 通过field字段过滤索引库
     * @param indexName
     * @param typeName
     * @param field
     * @param value
     */
    public void matchFieldQuery(String indexName,String typeName,String field,String value){
        QueryBuilder qb = QueryBuilders.matchQuery(field,value);
        SearchRequestBuilder requestBuilder = ElasticUtil.getClusterClient().prepareSearch(indexName);
        if(typeName!=null&&!"".equals(typeName.trim())){
            requestBuilder.setTypes(typeName);
        }
        SearchResponse response = requestBuilder.setQuery(qb).execute().actionGet();
        SearchHit[] searchHits = response.getHits().getHits();
        for (SearchHit sh : searchHits) {
            System.out.println(sh.getId()+"==>"+sh.getSource());
        }
        System.out.println("total==> "+searchHits.length);
    }

    /**
     * 通过多个field字段过滤索引库
     * @param indexName
     * @param typeName
     * @param field1
     * @param field2
     * @param value
     */
    public void multiFieldMatchQuery(String indexName,String typeName,String field1,String field2,String value){
        QueryBuilder qb = QueryBuilders.multiMatchQuery(value,field1, field2);
        SearchRequestBuilder requestBuilder = ElasticUtil.getClusterClient().prepareSearch(indexName);
        if(typeName!=null&&!"".equals(typeName.trim())){
            requestBuilder.setTypes(typeName);
        }
        SearchResponse response = requestBuilder.setQuery(qb) .execute().actionGet();
        SearchHit[] searchHits = response.getHits().getHits();
        for (SearchHit sh : searchHits) {
            System.out.println(sh.getId()+"==>"+sh.getSource());
        }
    }

    /**
     * 通过id 获取多条数据*/
    public void idsQuery(String indexName,String typeName,String[] ids){
        IdsQueryBuilder qb = QueryBuilders.idsQuery().addIds(ids);
        SearchRequestBuilder requestBuilder = ElasticUtil.getClusterClient().prepareSearch(indexName);
        if(typeName!=null&&!"".equals(typeName.trim())){
            requestBuilder.setTypes(typeName);
        }
        SearchResponse response = requestBuilder.setQuery(qb).execute().actionGet();
        SearchHit[] searchHits = response.getHits().getHits();
        for (SearchHit sh : searchHits) {
            System.out.println(sh.getId()+"==>"+sh.getSource());
        }
        System.out.println("total==> "+searchHits.length);
    }

}

原文地址:https://www.cnblogs.com/weishao-lsv/p/8168188.html

时间: 2025-01-10 18:35:06

java操作elasticsearch的相关文章

java操作elasticsearch实现批量添加数据(bulk)

java操作elasticsearch实现批量添加主要使用了bulk 代码如下: //bulk批量操作(批量添加) @Test public void test7() throws IOException { //1.指定es集群 cluster.name 是固定的key值,my-application是ES集群的名称 Settings settings = Settings.builder().put("cluster.name", "my-application"

java操作elasticsearch实现基本的增删改查操作

一.在进行java操作elasticsearch之前,请确认好集群的名称及对应的ES节点ip和端口 1.查看ES的集群名称 #进入elasticsearch.yml配置文件/opt/elasticsearch-6.4.3/config vim elasticsearch.yml 2.查询ip 二.根据文档id查询数据 /** * */ package com.cyb.test; import java.net.InetAddress; import java.net.UnknownHostExc

java 操作elasticsearch之搭建测试项目环境

在创建项目之前请确认maven是否安装好,在此我是以环境都搭建好的情况下进行示范,现在以eclipse开发工具为例,具体操作如下: 1.创建maven项目 File - new -other 2.在pom文件中导入对应的jar包坐标 在此我没有添加log4j等坐标,需要的根据自己需要进行添加jar包坐标 <project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001

使用JAVA操作ElasticSearch(Java API 和Spring Data ElasticSearch)

Java API 我的ElasticSearch集群的版本是6.2.4,导入elasticsearch相关的maven依赖也是6.2.4,不同版本的api可能会有差异 一:maven依赖 <!--elasticsearch核心依赖--> <dependency> <groupId>org.elasticsearch</groupId> <artifactId>elasticsearch</artifactId> <version

elasticsearch java操作 api

默认进行了elasticsearch安装和ik安装, 超时配置, 分页压力配置等 添加maven依赖 <dependency> <groupId>org.elasticsearch</groupId> <artifactId>elasticsearch</artifactId> <version>2.4.0</version> </dependency> <dependency> <group

使用python操作elasticsearch实现数据插入分析

前言: 例行公事,有些人可能不太了解elasticsearch,下面搜了一段,大家瞅一眼. Elasticsearch是一款分布式搜索引擎,支持在大数据环境中进行实时数据分析.它基于Apache Lucene文本搜索引擎,内部功能通过ReST API暴露给外部.除了通过HTTP直接访问Elasticsearch,还可以通过支持Java.JavaScript.Python及更多语言的客户端库来访问.它也支持集成Apache Hadoop环境.Elasticsearch在有些处理海量数据的公司中已经

java使用elasticsearch分组进行聚合查询(group by)

java连接elasticsearch 进行聚合查询进行相应操作 一:对单个字段进行分组求和 1.表结构图片: 根据任务id分组,分别统计出每个任务id下有多少个文字标题 1.SQL:select id, count(*) as sum from task group by taskid;   java ES连接工具类 public class ESClientConnectionUtil { public static TransportClient client=null; public f

java 操作 Excel,java导出excel

WritableWorkbook out = null; try { response.getServletResponse().reset(); ((HttpServletResponse) response.getServletResponse()).setHeader("Content-Disposition", "attachment;filename=export.xls"); response.getServletResponse().setConten

java操作hbase例子

hbase安装方法请参考:hbase-0.94安装方法详解 hbase常用的shell命令请参考:hbase常用的shell命令例子 java操作hbase,在eclipse中创建一个java项目,将hbase安装文件根目录的jar包和lib目录下jar包导入项目,然后就可以编写java代码操作hbase了.下面代码给出来一个简单的示例 /** * @date 2015-07-23 21:28:10 * @author sgl */ package com.songguoliang.hbase;