使用Java High Level REST Client操作elasticsearch

说明

在明确了ES的基本概念和使用方法后,我们来学习如何使用ES的Java API.
本文假设你已经对ES的基本概念已经有了一个比较全面的认识。

客户端

你可以用Java客户端做很多事情:

  • 执行标准的index,get,delete,update,search等操作。
  • 在正在运行的集群上执行管理任务。

但是,通过官方文档可以得知,现在存在至少三种Java客户端。

  1. Transport Client
  2. Java High Level REST Client
  3. Java Low Level Rest Client

造成这种混乱的原因是:

  • 长久以来,ES并没有官方的Java客户端,并且Java自身是可以简单支持ES的API的,于是就先做成了TransportClient。但是TransportClient的缺点是显而易见的,它没有使用RESTful风格的接口,而是二进制的方式传输数据。
  • 之后ES官方推出了Java Low Level REST Client,它支持RESTful,用起来也不错。但是缺点也很明显,因为TransportClient的使用者把代码迁移到Low Level REST Client的工作量比较大。官方文档专门为迁移代码出了一堆文档来提供参考。
  • 现在ES官方推出Java High Level REST Client,它是基于Java Low Level REST Client的封装,并且API接收参数和返回值和TransportClient是一样的,使得代码迁移变得容易并且支持了RESTful的风格,兼容了这两种客户端的优点。当然缺点是存在的,就是版本的问题。ES的小版本更新非常频繁,在最理想的情况下,客户端的版本要和ES的版本一致(至少主版本号一致),次版本号不一致的话,基本操作也许可以,但是新API就不支持了。
  • 强烈建议ES5及其以后的版本使用Java High Level REST Client。笔者这里使用的是ES5.6.3,下面的文章将基于JDK1.8+Spring Boot+ES5.6.3 Java High Level REST Client+Maven进行示例。

前置条件:

  • JDK1.8
  • elasticsearch 6.3.2(其他版本未做测试,不保证完全兼容)
  • maven
  • spring boot
  • 1.maven依赖:
        <!--elasticsearch base-->
        <dependency>
            <groupId>org.elasticsearch</groupId>
            <artifactId>elasticsearch</artifactId>
            <version>6.3.2</version>
        </dependency>
        <!-- Java Low Level REST Client -->
        <dependency>
            <groupId>org.elasticsearch.client</groupId>
            <artifactId>elasticsearch-rest-client</artifactId>
            <version>6.3.2</version>
        </dependency>
        <!-- Java High Level REST Client -->
        <dependency>
            <groupId>org.elasticsearch.client</groupId>
            <artifactId>elasticsearch-rest-high-level-client</artifactId>
            <version>6.3.2</version>
        </dependency>
  • 2.接入rest-higl-level-client
 1 import org.apache.http.HttpHost;
 2 import org.apache.http.auth.AuthScope;
 3 import org.apache.http.auth.UsernamePasswordCredentials;
 4 import org.apache.http.client.CredentialsProvider;
 5 import org.apache.http.impl.client.BasicCredentialsProvider;
 6 import org.apache.http.impl.nio.client.HttpAsyncClientBuilder;
 7 import org.elasticsearch.client.RestClient;
 8 import org.elasticsearch.client.RestClientBuilder;
 9 import org.elasticsearch.client.RestHighLevelClient;
10 import org.slf4j.Logger;
11 import org.slf4j.LoggerFactory;
12 import org.springframework.beans.factory.DisposableBean;
13 import org.springframework.beans.factory.FactoryBean;
14 import org.springframework.beans.factory.InitializingBean;
15 import org.springframework.beans.factory.annotation.Value;
16 import org.springframework.context.annotation.Configuration;
17
18 @Configuration
19 public class ElasticsearchConfiguration implements FactoryBean<RestHighLevelClient>, InitializingBean, DisposableBean {
20     private static final Logger LOGGER = LoggerFactory.getLogger(ElasticsearchConfiguration.class);
21
22     @Value("${spring.data.elasticsearch.host}")
23     private String host;
24     @Value("${spring.data.elasticsearch.port}")
25     private int port;
26     @Value("${spring.data.elasticsearch.username}")
27     private String username;
28     @Value("${spring.data.elasticsearch.password}")
29     private String password;
30
31     private RestHighLevelClient restHighLevelClient;
32
33     @Override
34     public void destroy() throws Exception {
35         try {
36             LOGGER.info("Closing elasticSearch client");
37             if (restHighLevelClient != null) {
38                 restHighLevelClient.close();
39             }
40         } catch (final Exception e) {
41             LOGGER.error("Error closing ElasticSearch client: ", e);
42         }
43     }
44
45     @Override
46     public RestHighLevelClient getObject() throws Exception {
47         return restHighLevelClient;
48     }
49
50     @Override
51     public Class<RestHighLevelClient> getObjectType() {
52         return RestHighLevelClient.class;
53     }
54
55     @Override
56     public boolean isSingleton() {
57         return false;
58     }
59
60     @Override
61     public void afterPropertiesSet() throws Exception {
62         buildClient();
63     }
64
65     protected void buildClient() {
66         final CredentialsProvider credentialsProvider = new BasicCredentialsProvider();
67         credentialsProvider.setCredentials(AuthScope.ANY, new UsernamePasswordCredentials(username, password));
68         RestClientBuilder builder = RestClient.builder(new HttpHost(host, port))
69                 .setHttpClientConfigCallback(new RestClientBuilder.HttpClientConfigCallback() {
70                     @Override
71                     public HttpAsyncClientBuilder customizeHttpClient(HttpAsyncClientBuilder httpClientBuilder) {
72                         return httpClientBuilder.setDefaultCredentialsProvider(credentialsProvider);
73                     }
74                 });
75
76         restHighLevelClient = new RestHighLevelClient(builder);
77     }
78
79 }
  • 3.index api
1 Map<String, Object> jsonMap = new HashMap<>();
2 jsonMap.put("user", "laimailai");
3 jsonMap.put("postDate", new Date());
4 jsonMap.put("message", "trying out Elasticsearch");
5 IndexRequest indexRequest = new IndexRequest("index", "type", "1")
6         .source(jsonMap);
7 IndexResponse indexResponse = client.index(request);
  • 4.get api
1 GetRequest getRequest = new GetRequest(
2         "index",
3         "type",
4         "1");
5 GetResponse getResponse = client.get(request);
  • 5.update api
1 UpdateRequest request = new UpdateRequest(
2         "index",
3         "type",
4         "1");
5 UpdateResponse updateResponse = client.update(request);
  • 6.delete api
1 DeleteRequest request = new DeleteRequest(
2         "index",
3         "type",
4         "1");
  • 7.bulk api

之前的文档说明过,bulk接口是批量index/update/delete操作
在API中,只需要一个bulk request就可以完成一批请求。

 1 //1.bulk
 2 BulkRequest request = new BulkRequest();
 3 request.add(new IndexRequest("index", "type", "1")
 4         .source(XContentType.JSON, "field", "foo"));
 5 request.add(new IndexRequest("index", "type", "2")
 6         .source(XContentType.JSON, "field", "bar"));
 7 request.add(new IndexRequest("index", "type", "3")
 8         .source(XContentType.JSON, "field", "baz"));
 9
10 //同步
11 BulkResponse bulkResponse = client.bulk(request);
12
13 //异步
14 client.bulkAsync(request, new ActionListener<BulkResponse>() {
15     @Override
16     public void onResponse(BulkResponse bulkResponse) {
17
18     }
19
20     @Override
21     public void onFailure(Exception e) {
22
23     }
24 });
  • 8.bulkprocessor 划重点!!!

BulkProcessor 简化bulk API的使用,并且使整个批量操作透明化。
BulkProcessor 的执行需要三部分组成:

  1. RestHighLevelClient :执行bulk请求并拿到响应对象。
  2. BulkProcessor.Listener:在执行bulk request之前、之后和当bulk response发生错误时调用。
  3. ThreadPool:bulk request在这个线程池中执行操作,这使得每个请求不会被挡住,在其他请求正在执行时,也可以接收新的请求。

示例代码:

 1 @Service
 2 public class ElasticSearchUtil {
 3     private static final Logger LOGGER = LoggerFactory.getLogger(ElasticSearchUtil.class);
 4
 5     @Autowired
 6     private RestHighLevelClient restHighLevelClient;
 7
 8     private BulkProcessor bulkProcessor;
 9
10     @PostConstruct
11     public void init() {
12         BulkProcessor.Listener listener = new BulkProcessor.Listener() {
13             @Override
14             public void beforeBulk(long executionId, BulkRequest request) {
15                 //重写beforeBulk,在每次bulk request发出前执行,在这个方法里面可以知道在本次批量操作中有多少操作数
16                 int numberOfActions = request.numberOfActions();
17                 LOGGER.info("Executing bulk [{}] with {} requests", executionId, numberOfActions);
18             }
19
20             @Override
21             public void afterBulk(long executionId, BulkRequest request, BulkResponse response) {
22                 //重写afterBulk方法,每次批量请求结束后执行,可以在这里知道是否有错误发生。
23                 if (response.hasFailures()) {
24                     LOGGER.error("Bulk [{}] executed with failures,response = {}", executionId, response.buildFailureMessage());
25                 } else {
26                     LOGGER.info("Bulk [{}] completed in {} milliseconds", executionId, response.getTook().getMillis());
27                 }
28                 BulkItemResponse[] responses = response.getItems();
29             }
30
31             @Override
32             public void afterBulk(long executionId, BulkRequest request, Throwable failure) {
33                 //重写方法,如果发生错误就会调用。
34                 LOGGER.error("Failed to execute bulk", failure);
35             }
36         };
37
38         //在这里调用build()方法构造bulkProcessor,在底层实际上是用了bulk的异步操作
39         BulkProcessor bulkProcessor = BulkProcessor.builder(restHighLevelClient::bulkAsync, listener)
40                 // 1000条数据请求执行一次bulk
41                 .setBulkActions(1000)
42                 // 5mb的数据刷新一次bulk
43                 .setBulkSize(new ByteSizeValue(5L, ByteSizeUnit.MB))
44                 // 并发请求数量, 0不并发, 1并发允许执行
45                 .setConcurrentRequests(0)
46                 // 固定1s必须刷新一次
47                 .setFlushInterval(TimeValue.timeValueSeconds(1L))
48                 // 重试5次,间隔1s
49                 .setBackoffPolicy(BackoffPolicy.constantBackoff(TimeValue.timeValueSeconds(1L), 5))
50                 .build();
51         this.bulkProcessor = bulkProcessor;
52     }
53
54     @PreDestroy
55     public void destroy() {
56         try {
57             bulkProcessor.awaitClose(30, TimeUnit.SECONDS);
58         } catch (InterruptedException e) {
59             LOGGER.error("Failed to close bulkProcessor", e);
60         }
61         LOGGER.info("bulkProcessor closed!");
62     }
63
64     /**
65      * 修改
66      *
67      * @param request
68      * @throws IOException
69      */
70     public void update(UpdateRequest request) {
71         this.bulkProcessor.add(request);
72     }
73
74     /**
75      * 新增
76      *
77      * @param request
78      */
79     public void insert(IndexRequest request) {
80         this.bulkProcessor.add(request);
81     }
82 }

bulkProcessor使用用例:

 1         //新建三个 index 请求
 2         IndexRequest one = new IndexRequest("posts", "doc", "1").
 3                 source(XContentType.JSON, "title", "In which order are my Elasticsearch queries executed?");
 4         IndexRequest two = new IndexRequest("posts", "doc", "2")
 5                 .source(XContentType.JSON, "title", "Current status and upcoming changes in Elasticsearch");
 6         IndexRequest three = new IndexRequest("posts", "doc", "3")
 7                 .source(XContentType.JSON, "title", "The Future of Federated Search in Elasticsearch");
 8         //新的三条index请求加入到上面配置好的bulkProcessor里面。
 9         bulkProcessor.add(one);
10         bulkProcessor.add(two);
11         bulkProcessor.add(three);
12         // add many request here.
13         //bulkProcess必须被关闭才能使上面添加的操作生效
14         bulkProcessor.close(); //立即关闭
15         //关闭bulkProcess的两种方法:
16         try {
17             //2.调用awaitClose.
18             //简单来说,就是在规定的时间内,是否所有批量操作完成。全部完成,返回true,未完成返//回false
19
20             boolean terminated = bulkProcessor.awaitClose(30L, TimeUnit.SECONDS);
21
22         } catch (InterruptedException e) {
23             // TODO Auto-generated catch block
24             e.printStackTrace();
25         }
  • 9.upsert api

update --当id不存在时将会抛出异常:

1 UpdateRequest request = new UpdateRequest(index, type, "1").doc(jsonMap);
2 UpdateResponse response = restHighLevelClient.update(request);

upsert--id不存在时就插入:

1 UpdateRequest request = new UpdateRequest(index, type, "1").doc(jsonMap).upsert(jsonMap);
2 UpdateResponse response = restHighLevelClient.update(request);
  • 10.search api

Search API提供了对文档的查询和聚合的查询。
它的基本形式:

1 SearchRequest searchRequest = new SearchRequest();  //构造search request .在这里无参,查询全部索引
2 SearchSourceBuilder searchSourceBuilder = new SearchSourceBuilder();//大多数查询参数要写在searchSourceBuilder里
3 searchSourceBuilder.query(QueryBuilders.matchAllQuery());//增加match_all的条件。
1 SearchRequest searchRequest = new SearchRequest("posts"); //指定posts索引
2 searchRequest.types("doc"); //指定doc类型

使用SearchSourceBuilder

大多数的查询控制都可以使用SearchSourceBuilder实现。
举一个简单例子:

1 SearchSourceBuilder sourceBuilder = new SearchSourceBuilder(); //构造一个默认配置的对象
2 sourceBuilder.query(QueryBuilders.termQuery("user", "kimchy")); //设置查询
3 sourceBuilder.from(0); //设置从哪里开始
4 sourceBuilder.size(5); //每页5条
5 sourceBuilder.timeout(new TimeValue(60, TimeUnit.SECONDS)); //设置超时时间

配置好searchSourceBuilder后,将它传入searchRequest里:

1 SearchRequest searchRequest = new SearchRequest();
2 searchRequest.source(sourceBuilder);
1 //全量搜索
2 SearchRequest searchRequest = new SearchRequest();
3 SearchSourceBuilder searchSourceBuilder = new SearchSourceBuilder();
4 searchSourceBuilder.query(QueryBuilders.matchAllQuery());
5 searchRequest.source(searchSourceBuilder);
6 SearchRequest searchRequest = new SearchRequest("index");
 1 //根据多个条件搜索
 2 BoolQueryBuilder boolQueryBuilder = QueryBuilders.boolQuery();
 3 for (String id: ids) {
 4     TermQueryBuilder termQueryBuilder = new TermQueryBuilder("id", id);
 5     boolQueryBuilder.should(termQueryBuilder);
 6 }
 7 SearchRequest searchRequest = new SearchRequest(index);
 8 SearchSourceBuilder searchSourceBuilder = new SearchSourceBuilder();
 9 searchSourceBuilder.query(boolQueryBuilder);
10 searchRequest.source(searchSourceBuilder);
11 SearchResponse response = null;
12     response = restHighLevelClient.search(searchRequest);
13 return response;
  • 11.search scroll api
 1 //scroll 分页搜索
 2 final Scroll scroll = new Scroll(TimeValue.timeValueMinutes(1L));
 3 SearchRequest searchRequest = new SearchRequest("posts");
 4 searchRequest.scroll(scroll);
 5 SearchSourceBuilder searchSourceBuilder = new SearchSourceBuilder();
 6 searchSourceBuilder.query(matchQuery("title", "Elasticsearch"));
 7 searchRequest.source(searchSourceBuilder);
 8
 9 SearchResponse searchResponse = client.search(searchRequest);
10 String scrollId = searchResponse.getScrollId();
11 SearchHit[] searchHits = searchResponse.getHits().getHits();
12
13 while (searchHits != null && searchHits.length > 0) {
14     SearchScrollRequest scrollRequest = new SearchScrollRequest(scrollId);
15     scrollRequest.scroll(scroll);
16     searchResponse = client.searchScroll(scrollRequest);
17     scrollId = searchResponse.getScrollId();
18     searchHits = searchResponse.getHits().getHits();
19
20 }
21
22 ClearScrollRequest clearScrollRequest = new ClearScrollRequest();
23 clearScrollRequest.addScrollId(scrollId);
24 ClearScrollResponse clearScrollResponse = client.clearScroll(clearScrollRequest);
25 boolean succeeded = clearScrollResponse.isSucceeded();
  • 12.排序

SearchSourceBuilder可以添加一种或多种SortBuilder。
有四种特殊的排序实现:

    • field
    • score
    • GeoDistance
    • scriptSortBuilder
1 sourceBuilder.sort(new ScoreSortBuilder().order(SortOrder.DESC)); //按照score倒序排列
2 sourceBuilder.sort(new FieldSortBuilder("_uid").order(SortOrder.ASC));  //并且按照id正序排列
  • 13.过滤

默认情况下,searchRequest返回文档内容,与REST API一样,这里你可以重写search行为。例如,你可以完全关闭"_source"检索。

1 sourceBuilder.fetchSource(false);

该方法还接受一个或多个通配符模式的数组,以更细粒度地控制包含或排除哪些字段。

1 String[] includeFields = new String[] {"title", "user", "innerObject.*"};
2 String[] excludeFields = new String[] {"_type"};
3 sourceBuilder.fetchSource(includeFields, excludeFields);
  • 14.聚合

通过配置适当的 AggregationBuilder ,再将它传入SearchSourceBuilder里,就可以完成聚合请求了。
之前的文档里面,我们通过下面这条命令,导入了一千条account信息:

curl -H "Content-Type: application/json" -XPOST ‘localhost:9200/bank/account/_bulk?pretty&refresh‘ --data-binary "@accounts.json"

随后,我们介绍了如何通过聚合请求进行分组:

GET /bank/_search?pretty
{
  "size": 0,
  "aggs": {
    "group_by_state": {
      "terms": {
        "field": "state.keyword"
      }
    }
  }
}

我们将这一千条数据根据state字段分组,得到响应:

{
  "took": 2,
  "timed_out": false,
  "_shards": {
    "total": 5,
    "successful": 5,
    "skipped": 0,
    "failed": 0
  },
  "hits": {
    "total": 999,
    "max_score": 0,
    "hits": []
  },
  "aggregations": {
    "group_by_state": {
      "doc_count_error_upper_bound": 20,
      "sum_other_doc_count": 770,
      "buckets": [
        {
          "key": "ID",
          "doc_count": 27
        },
        {
          "key": "TX",
          "doc_count": 27
        },
        {
          "key": "AL",
          "doc_count": 25
        },
        {
          "key": "MD",
          "doc_count": 25
        },
        {
          "key": "TN",
          "doc_count": 23
        },
        {
          "key": "MA",
          "doc_count": 21
        },
        {
          "key": "NC",
          "doc_count": 21
        },
        {
          "key": "ND",
          "doc_count": 21
        },
        {
          "key": "MO",
          "doc_count": 20
        },
        {
          "key": "AK",
          "doc_count": 19
        }
      ]
    }
  }
}

Java实现:

 1    @Test
 2     public void test2(){
 3         RestClient lowLevelRestClient = RestClient.builder(
 4                 new HttpHost("172.16.73.50", 9200, "http")).build();
 5         RestHighLevelClient client =
 6                 new RestHighLevelClient(lowLevelRestClient);
 7         SearchRequest searchRequest = new SearchRequest("bank");
 8         searchRequest.types("account");
 9         TermsAggregationBuilder aggregation = AggregationBuilders.terms("group_by_state")
10                 .field("state.keyword");
11         SearchSourceBuilder searchSourceBuilder = new SearchSourceBuilder();
12         searchSourceBuilder.aggregation(aggregation);
13         searchSourceBuilder.size(0);
14         searchRequest.source(searchSourceBuilder);
15         try {
16             SearchResponse searchResponse = client.search(searchRequest);
17             System.out.println(searchResponse.toString());
18         } catch (IOException e) {
19             e.printStackTrace();
20         }
21
22     }

Search response

Search response返回对象与其在API里的一样,返回一些元数据和文档数据。
首先,返回对象里的数据十分重要,因为这是查询的返回结果、使用分片情况、文档数据,HTTP状态码等

1 RestStatus status = searchResponse.status();
2 TimeValue took = searchResponse.getTook();
3 Boolean terminatedEarly = searchResponse.isTerminatedEarly();
4 boolean timedOut = searchResponse.isTimedOut();

其次,返回对象里面包含关于分片的信息和分片失败的处理:

1 int totalShards = searchResponse.getTotalShards();
2 int successfulShards = searchResponse.getSuccessfulShards();
3 int failedShards = searchResponse.getFailedShards();
4 for (ShardSearchFailure failure : searchResponse.getShardFailures()) {
5     // failures should be handled here
6 }

取回searchHit

为了取回文档数据,我们要从search response的返回对象里先得到searchHit对象:

1 SearchHits hits = searchResponse.getHits();

取回文档数据:

 1     @Test
 2     public void test2(){
 3         RestClient lowLevelRestClient = RestClient.builder(
 4                 new HttpHost("172.16.73.50", 9200, "http")).build();
 5         RestHighLevelClient client =
 6                 new RestHighLevelClient(lowLevelRestClient);
 7         SearchRequest searchRequest = new SearchRequest("bank");
 8         searchRequest.types("account");
 9         SearchSourceBuilder searchSourceBuilder = new SearchSourceBuilder();
10         searchRequest.source(searchSourceBuilder);
11         try {
12             SearchResponse searchResponse = client.search(searchRequest);
13             SearchHits searchHits = searchResponse.getHits();
14             SearchHit[] searchHit = searchHits.getHits();
15             for (SearchHit hit : searchHit) {
16                 System.out.println(hit.getSourceAsString());
17             }
18         } catch (IOException e) {
19             e.printStackTrace();
20         }
21
22     }

根据需要,还可以转换成其他数据类型:

1 String sourceAsString = hit.getSourceAsString();
2 Map<String, Object> sourceAsMap = hit.getSourceAsMap();
3 String documentTitle = (String) sourceAsMap.get("title");
4 List<Object> users = (List<Object>) sourceAsMap.get("user");
5 Map<String, Object> innerObject = (Map<String, Object>) sourceAsMap.get("innerObject");

取回聚合数据

聚合数据可以通过SearchResponse返回对象,取到它的根节点,然后再根据名称取到聚合数据。

GET /bank/_search?pretty
{
  "size": 0,
  "aggs": {
    "group_by_state": {
      "terms": {
        "field": "state.keyword"
      }
    }
  }
}

响应:

{
  "took": 2,
  "timed_out": false,
  "_shards": {
    "total": 5,
    "successful": 5,
    "skipped": 0,
    "failed": 0
  },
  "hits": {
    "total": 999,
    "max_score": 0,
    "hits": []
  },
  "aggregations": {
    "group_by_state": {
      "doc_count_error_upper_bound": 20,
      "sum_other_doc_count": 770,
      "buckets": [
        {
          "key": "ID",
          "doc_count": 27
        },
        {
          "key": "TX",
          "doc_count": 27
        },
        {
          "key": "AL",
          "doc_count": 25
        },
        {
          "key": "MD",
          "doc_count": 25
        },
        {
          "key": "TN",
          "doc_count": 23
        },
        {
          "key": "MA",
          "doc_count": 21
        },
        {
          "key": "NC",
          "doc_count": 21
        },
        {
          "key": "ND",
          "doc_count": 21
        },
        {
          "key": "MO",
          "doc_count": 20
        },
        {
          "key": "AK",
          "doc_count": 19
        }
      ]
    }
  }
}

Java实现:

 1     @Test
 2     public void test2(){
 3         RestClient lowLevelRestClient = RestClient.builder(
 4                 new HttpHost("172.16.73.50", 9200, "http")).build();
 5         RestHighLevelClient client =
 6                 new RestHighLevelClient(lowLevelRestClient);
 7         SearchRequest searchRequest = new SearchRequest("bank");
 8         searchRequest.types("account");
 9         TermsAggregationBuilder aggregation = AggregationBuilders.terms("group_by_state")
10                 .field("state.keyword");
11         SearchSourceBuilder searchSourceBuilder = new SearchSourceBuilder();
12         searchSourceBuilder.aggregation(aggregation);
13         searchSourceBuilder.size(0);
14         searchRequest.source(searchSourceBuilder);
15         try {
16             SearchResponse searchResponse = client.search(searchRequest);
17             Aggregations aggs = searchResponse.getAggregations();
18             Terms byStateAggs = aggs.get("group_by_state");
19             Terms.Bucket b = byStateAggs.getBucketByKey("ID"); //只取key是ID的bucket
20             System.out.println(b.getKeyAsString()+","+b.getDocCount());
21             System.out.println("!!!");
22             List<? extends Bucket> aggList = byStateAggs.getBuckets();//获取bucket数组里所有数据
23             for (Bucket bucket : aggList) {
24                 System.out.println("key:"+bucket.getKeyAsString()+",docCount:"+bucket.getDocCount());;
25             }
26         } catch (IOException e) {
27             e.printStackTrace();
28         }
29     }

参考:https://www.jianshu.com/p/5cb91ed22956

参考:https://my.oschina.net/u/3795437/blog/2253366

原文地址:https://www.cnblogs.com/fnlingnzb-learner/p/10750868.html

时间: 2024-10-31 09:09:47

使用Java High Level REST Client操作elasticsearch的相关文章

Elasticsearch java api操作(二)(Java High Level Rest Client)

一.说明: 一.Elasticsearch提供了两个JAVA REST Client版本: 1.java low level rest client: 低级别的rest客户端,通过http与集群交互,用户需自己编组请求JSON串,及解析响应JSON串.兼容所有Elasticsearch版本. 特点:maven引入 使用介绍: https://www.elastic.co/guide/en/elasticsearch/client/java-rest/current/java-rest-low.h

Elasticsearch High Level Rest Client 发起请求的过程分析

本文讨论的是JAVA High Level Rest Client向ElasticSearch6.3.2发送请求(index操作.update.delete--)的一个详细过程的理解,主要涉及到Rest Client如何选择哪一台Elasticsearch服务器发起请求. maven依赖如下: <dependency> <groupId>org.elasticsearch.client</groupId> <artifactId>elasticsearch-

java操作elasticsearch实现批量添加数据(bulk)

java操作elasticsearch实现批量添加主要使用了bulk 代码如下: //bulk批量操作(批量添加) @Test public void test7() throws IOException { //1.指定es集群 cluster.name 是固定的key值,my-application是ES集群的名称 Settings settings = Settings.builder().put("cluster.name", "my-application"

java操作elasticsearch实现基本的增删改查操作

一.在进行java操作elasticsearch之前,请确认好集群的名称及对应的ES节点ip和端口 1.查看ES的集群名称 #进入elasticsearch.yml配置文件/opt/elasticsearch-6.4.3/config vim elasticsearch.yml 2.查询ip 二.根据文档id查询数据 /** * */ package com.cyb.test; import java.net.InetAddress; import java.net.UnknownHostExc

使用JAVA操作ElasticSearch(Java API 和Spring Data ElasticSearch)

Java API 我的ElasticSearch集群的版本是6.2.4,导入elasticsearch相关的maven依赖也是6.2.4,不同版本的api可能会有差异 一:maven依赖 <!--elasticsearch核心依赖--> <dependency> <groupId>org.elasticsearch</groupId> <artifactId>elasticsearch</artifactId> <version

使用python操作elasticsearch实现数据插入分析

前言: 例行公事,有些人可能不太了解elasticsearch,下面搜了一段,大家瞅一眼. Elasticsearch是一款分布式搜索引擎,支持在大数据环境中进行实时数据分析.它基于Apache Lucene文本搜索引擎,内部功能通过ReST API暴露给外部.除了通过HTTP直接访问Elasticsearch,还可以通过支持Java.JavaScript.Python及更多语言的客户端库来访问.它也支持集成Apache Hadoop环境.Elasticsearch在有些处理海量数据的公司中已经

使用curl命令操作elasticsearch

使用curl命令操作elasticsearch 大岩不灿 发表于 2015年4月25日 浏览 7,426 次 第一:_cat系列_cat系列提供了一系列查询elasticsearch集群状态的接口.你可以通过执行curl -XGET localhost:9200/_cat获取所有_cat系列的操作=^.^=/_cat/allocation/_cat/shards/_cat/shards/{index}/_cat/master/_cat/nodes/_cat/indices/_cat/indice

新导入项目出现Java compiler level does not match the version of the installed java project facet问题处理

在使用eclipse开发java类项目的时候,免不了会在不同的设备上开发编译同一个项目,那么就会出现Java compiler level does not match the version of the installed java project facet问题,意思是java编译器与项目使用的不一样.那么下面介绍下如何处理这个问题. 处理方法: 第一步: 项目邮件---->Properties---->Java Compiler,然后按照如图所示操作,然后Apply或者OK. 第二步:

读Hadoop3.2源码,深入了解java调用HDFS的常用操作和HDFS原理

本文将通过一个演示工程来快速上手java调用HDFS的常见操作.接下来以创建文件为例,通过阅读HDFS的源码,一步步展开HDFS相关原理.理论知识的说明. 说明:本文档基于最新版本Hadoop3.2.1 目录 一.java调用HDFS的常见操作 1.1.演示环境搭建 1.2.操作HDFS 1.3.java文件操作常用方法 二.深入了解HDFS写文件的流程和HDFS原理 2.1.Hadoop3.2.1 源码下载及介绍 2.2.文件系统:FileSystem 2.3.HDFS体系结构:namenod