说明
在明确了ES的基本概念和使用方法后,我们来学习如何使用ES的Java API.
本文假设你已经对ES的基本概念已经有了一个比较全面的认识。
客户端
你可以用Java客户端做很多事情:
- 执行标准的index,get,delete,update,search等操作。
- 在正在运行的集群上执行管理任务。
但是,通过官方文档可以得知,现在存在至少三种Java客户端。
- Transport Client
- Java High Level REST Client
- Java Low Level Rest Client
造成这种混乱的原因是:
- 长久以来,ES并没有官方的Java客户端,并且Java自身是可以简单支持ES的API的,于是就先做成了TransportClient。但是TransportClient的缺点是显而易见的,它没有使用RESTful风格的接口,而是二进制的方式传输数据。
- 之后ES官方推出了Java Low Level REST Client,它支持RESTful,用起来也不错。但是缺点也很明显,因为TransportClient的使用者把代码迁移到Low Level REST Client的工作量比较大。官方文档专门为迁移代码出了一堆文档来提供参考。
- 现在ES官方推出Java High Level REST Client,它是基于Java Low Level REST Client的封装,并且API接收参数和返回值和TransportClient是一样的,使得代码迁移变得容易并且支持了RESTful的风格,兼容了这两种客户端的优点。当然缺点是存在的,就是版本的问题。ES的小版本更新非常频繁,在最理想的情况下,客户端的版本要和ES的版本一致(至少主版本号一致),次版本号不一致的话,基本操作也许可以,但是新API就不支持了。
- 强烈建议ES5及其以后的版本使用Java High Level REST Client。笔者这里使用的是ES5.6.3,下面的文章将基于JDK1.8+Spring Boot+ES5.6.3 Java High Level REST Client+Maven进行示例。
前置条件:
- JDK1.8
- elasticsearch 6.3.2(其他版本未做测试,不保证完全兼容)
- maven
- spring boot
- 1.maven依赖:
<!--elasticsearch base--> <dependency> <groupId>org.elasticsearch</groupId> <artifactId>elasticsearch</artifactId> <version>6.3.2</version> </dependency> <!-- Java Low Level REST Client --> <dependency> <groupId>org.elasticsearch.client</groupId> <artifactId>elasticsearch-rest-client</artifactId> <version>6.3.2</version> </dependency> <!-- Java High Level REST Client --> <dependency> <groupId>org.elasticsearch.client</groupId> <artifactId>elasticsearch-rest-high-level-client</artifactId> <version>6.3.2</version> </dependency>
- 2.接入rest-higl-level-client
1 import org.apache.http.HttpHost; 2 import org.apache.http.auth.AuthScope; 3 import org.apache.http.auth.UsernamePasswordCredentials; 4 import org.apache.http.client.CredentialsProvider; 5 import org.apache.http.impl.client.BasicCredentialsProvider; 6 import org.apache.http.impl.nio.client.HttpAsyncClientBuilder; 7 import org.elasticsearch.client.RestClient; 8 import org.elasticsearch.client.RestClientBuilder; 9 import org.elasticsearch.client.RestHighLevelClient; 10 import org.slf4j.Logger; 11 import org.slf4j.LoggerFactory; 12 import org.springframework.beans.factory.DisposableBean; 13 import org.springframework.beans.factory.FactoryBean; 14 import org.springframework.beans.factory.InitializingBean; 15 import org.springframework.beans.factory.annotation.Value; 16 import org.springframework.context.annotation.Configuration; 17 18 @Configuration 19 public class ElasticsearchConfiguration implements FactoryBean<RestHighLevelClient>, InitializingBean, DisposableBean { 20 private static final Logger LOGGER = LoggerFactory.getLogger(ElasticsearchConfiguration.class); 21 22 @Value("${spring.data.elasticsearch.host}") 23 private String host; 24 @Value("${spring.data.elasticsearch.port}") 25 private int port; 26 @Value("${spring.data.elasticsearch.username}") 27 private String username; 28 @Value("${spring.data.elasticsearch.password}") 29 private String password; 30 31 private RestHighLevelClient restHighLevelClient; 32 33 @Override 34 public void destroy() throws Exception { 35 try { 36 LOGGER.info("Closing elasticSearch client"); 37 if (restHighLevelClient != null) { 38 restHighLevelClient.close(); 39 } 40 } catch (final Exception e) { 41 LOGGER.error("Error closing ElasticSearch client: ", e); 42 } 43 } 44 45 @Override 46 public RestHighLevelClient getObject() throws Exception { 47 return restHighLevelClient; 48 } 49 50 @Override 51 public Class<RestHighLevelClient> getObjectType() { 52 return RestHighLevelClient.class; 53 } 54 55 @Override 56 public boolean isSingleton() { 57 return false; 58 } 59 60 @Override 61 public void afterPropertiesSet() throws Exception { 62 buildClient(); 63 } 64 65 protected void buildClient() { 66 final CredentialsProvider credentialsProvider = new BasicCredentialsProvider(); 67 credentialsProvider.setCredentials(AuthScope.ANY, new UsernamePasswordCredentials(username, password)); 68 RestClientBuilder builder = RestClient.builder(new HttpHost(host, port)) 69 .setHttpClientConfigCallback(new RestClientBuilder.HttpClientConfigCallback() { 70 @Override 71 public HttpAsyncClientBuilder customizeHttpClient(HttpAsyncClientBuilder httpClientBuilder) { 72 return httpClientBuilder.setDefaultCredentialsProvider(credentialsProvider); 73 } 74 }); 75 76 restHighLevelClient = new RestHighLevelClient(builder); 77 } 78 79 }
- 3.index api
1 Map<String, Object> jsonMap = new HashMap<>(); 2 jsonMap.put("user", "laimailai"); 3 jsonMap.put("postDate", new Date()); 4 jsonMap.put("message", "trying out Elasticsearch"); 5 IndexRequest indexRequest = new IndexRequest("index", "type", "1") 6 .source(jsonMap); 7 IndexResponse indexResponse = client.index(request);
- 4.get api
1 GetRequest getRequest = new GetRequest( 2 "index", 3 "type", 4 "1"); 5 GetResponse getResponse = client.get(request);
- 5.update api
1 UpdateRequest request = new UpdateRequest( 2 "index", 3 "type", 4 "1"); 5 UpdateResponse updateResponse = client.update(request);
- 6.delete api
1 DeleteRequest request = new DeleteRequest( 2 "index", 3 "type", 4 "1");
- 7.bulk api
之前的文档说明过,bulk接口是批量index/update/delete操作
在API中,只需要一个bulk request就可以完成一批请求。
1 //1.bulk 2 BulkRequest request = new BulkRequest(); 3 request.add(new IndexRequest("index", "type", "1") 4 .source(XContentType.JSON, "field", "foo")); 5 request.add(new IndexRequest("index", "type", "2") 6 .source(XContentType.JSON, "field", "bar")); 7 request.add(new IndexRequest("index", "type", "3") 8 .source(XContentType.JSON, "field", "baz")); 9 10 //同步 11 BulkResponse bulkResponse = client.bulk(request); 12 13 //异步 14 client.bulkAsync(request, new ActionListener<BulkResponse>() { 15 @Override 16 public void onResponse(BulkResponse bulkResponse) { 17 18 } 19 20 @Override 21 public void onFailure(Exception e) { 22 23 } 24 });
- 8.bulkprocessor 划重点!!!
BulkProcessor 简化bulk API的使用,并且使整个批量操作透明化。
BulkProcessor 的执行需要三部分组成:
- RestHighLevelClient :执行bulk请求并拿到响应对象。
- BulkProcessor.Listener:在执行bulk request之前、之后和当bulk response发生错误时调用。
- ThreadPool:bulk request在这个线程池中执行操作,这使得每个请求不会被挡住,在其他请求正在执行时,也可以接收新的请求。
示例代码:
1 @Service 2 public class ElasticSearchUtil { 3 private static final Logger LOGGER = LoggerFactory.getLogger(ElasticSearchUtil.class); 4 5 @Autowired 6 private RestHighLevelClient restHighLevelClient; 7 8 private BulkProcessor bulkProcessor; 9 10 @PostConstruct 11 public void init() { 12 BulkProcessor.Listener listener = new BulkProcessor.Listener() { 13 @Override 14 public void beforeBulk(long executionId, BulkRequest request) { 15 //重写beforeBulk,在每次bulk request发出前执行,在这个方法里面可以知道在本次批量操作中有多少操作数 16 int numberOfActions = request.numberOfActions(); 17 LOGGER.info("Executing bulk [{}] with {} requests", executionId, numberOfActions); 18 } 19 20 @Override 21 public void afterBulk(long executionId, BulkRequest request, BulkResponse response) { 22 //重写afterBulk方法,每次批量请求结束后执行,可以在这里知道是否有错误发生。 23 if (response.hasFailures()) { 24 LOGGER.error("Bulk [{}] executed with failures,response = {}", executionId, response.buildFailureMessage()); 25 } else { 26 LOGGER.info("Bulk [{}] completed in {} milliseconds", executionId, response.getTook().getMillis()); 27 } 28 BulkItemResponse[] responses = response.getItems(); 29 } 30 31 @Override 32 public void afterBulk(long executionId, BulkRequest request, Throwable failure) { 33 //重写方法,如果发生错误就会调用。 34 LOGGER.error("Failed to execute bulk", failure); 35 } 36 }; 37 38 //在这里调用build()方法构造bulkProcessor,在底层实际上是用了bulk的异步操作 39 BulkProcessor bulkProcessor = BulkProcessor.builder(restHighLevelClient::bulkAsync, listener) 40 // 1000条数据请求执行一次bulk 41 .setBulkActions(1000) 42 // 5mb的数据刷新一次bulk 43 .setBulkSize(new ByteSizeValue(5L, ByteSizeUnit.MB)) 44 // 并发请求数量, 0不并发, 1并发允许执行 45 .setConcurrentRequests(0) 46 // 固定1s必须刷新一次 47 .setFlushInterval(TimeValue.timeValueSeconds(1L)) 48 // 重试5次,间隔1s 49 .setBackoffPolicy(BackoffPolicy.constantBackoff(TimeValue.timeValueSeconds(1L), 5)) 50 .build(); 51 this.bulkProcessor = bulkProcessor; 52 } 53 54 @PreDestroy 55 public void destroy() { 56 try { 57 bulkProcessor.awaitClose(30, TimeUnit.SECONDS); 58 } catch (InterruptedException e) { 59 LOGGER.error("Failed to close bulkProcessor", e); 60 } 61 LOGGER.info("bulkProcessor closed!"); 62 } 63 64 /** 65 * 修改 66 * 67 * @param request 68 * @throws IOException 69 */ 70 public void update(UpdateRequest request) { 71 this.bulkProcessor.add(request); 72 } 73 74 /** 75 * 新增 76 * 77 * @param request 78 */ 79 public void insert(IndexRequest request) { 80 this.bulkProcessor.add(request); 81 } 82 }
bulkProcessor使用用例:
1 //新建三个 index 请求 2 IndexRequest one = new IndexRequest("posts", "doc", "1"). 3 source(XContentType.JSON, "title", "In which order are my Elasticsearch queries executed?"); 4 IndexRequest two = new IndexRequest("posts", "doc", "2") 5 .source(XContentType.JSON, "title", "Current status and upcoming changes in Elasticsearch"); 6 IndexRequest three = new IndexRequest("posts", "doc", "3") 7 .source(XContentType.JSON, "title", "The Future of Federated Search in Elasticsearch"); 8 //新的三条index请求加入到上面配置好的bulkProcessor里面。 9 bulkProcessor.add(one); 10 bulkProcessor.add(two); 11 bulkProcessor.add(three); 12 // add many request here. 13 //bulkProcess必须被关闭才能使上面添加的操作生效 14 bulkProcessor.close(); //立即关闭 15 //关闭bulkProcess的两种方法: 16 try { 17 //2.调用awaitClose. 18 //简单来说,就是在规定的时间内,是否所有批量操作完成。全部完成,返回true,未完成返//回false 19 20 boolean terminated = bulkProcessor.awaitClose(30L, TimeUnit.SECONDS); 21 22 } catch (InterruptedException e) { 23 // TODO Auto-generated catch block 24 e.printStackTrace(); 25 }
- 9.upsert api
update --当id不存在时将会抛出异常:
1 UpdateRequest request = new UpdateRequest(index, type, "1").doc(jsonMap); 2 UpdateResponse response = restHighLevelClient.update(request);
upsert--id不存在时就插入:
1 UpdateRequest request = new UpdateRequest(index, type, "1").doc(jsonMap).upsert(jsonMap); 2 UpdateResponse response = restHighLevelClient.update(request);
- 10.search api
Search API提供了对文档的查询和聚合的查询。
它的基本形式:
1 SearchRequest searchRequest = new SearchRequest(); //构造search request .在这里无参,查询全部索引 2 SearchSourceBuilder searchSourceBuilder = new SearchSourceBuilder();//大多数查询参数要写在searchSourceBuilder里 3 searchSourceBuilder.query(QueryBuilders.matchAllQuery());//增加match_all的条件。
1 SearchRequest searchRequest = new SearchRequest("posts"); //指定posts索引 2 searchRequest.types("doc"); //指定doc类型
使用SearchSourceBuilder
大多数的查询控制都可以使用SearchSourceBuilder实现。
举一个简单例子:
1 SearchSourceBuilder sourceBuilder = new SearchSourceBuilder(); //构造一个默认配置的对象 2 sourceBuilder.query(QueryBuilders.termQuery("user", "kimchy")); //设置查询 3 sourceBuilder.from(0); //设置从哪里开始 4 sourceBuilder.size(5); //每页5条 5 sourceBuilder.timeout(new TimeValue(60, TimeUnit.SECONDS)); //设置超时时间
配置好searchSourceBuilder后,将它传入searchRequest里:
1 SearchRequest searchRequest = new SearchRequest(); 2 searchRequest.source(sourceBuilder);
1 //全量搜索 2 SearchRequest searchRequest = new SearchRequest(); 3 SearchSourceBuilder searchSourceBuilder = new SearchSourceBuilder(); 4 searchSourceBuilder.query(QueryBuilders.matchAllQuery()); 5 searchRequest.source(searchSourceBuilder); 6 SearchRequest searchRequest = new SearchRequest("index");
1 //根据多个条件搜索 2 BoolQueryBuilder boolQueryBuilder = QueryBuilders.boolQuery(); 3 for (String id: ids) { 4 TermQueryBuilder termQueryBuilder = new TermQueryBuilder("id", id); 5 boolQueryBuilder.should(termQueryBuilder); 6 } 7 SearchRequest searchRequest = new SearchRequest(index); 8 SearchSourceBuilder searchSourceBuilder = new SearchSourceBuilder(); 9 searchSourceBuilder.query(boolQueryBuilder); 10 searchRequest.source(searchSourceBuilder); 11 SearchResponse response = null; 12 response = restHighLevelClient.search(searchRequest); 13 return response;
- 11.search scroll api
1 //scroll 分页搜索 2 final Scroll scroll = new Scroll(TimeValue.timeValueMinutes(1L)); 3 SearchRequest searchRequest = new SearchRequest("posts"); 4 searchRequest.scroll(scroll); 5 SearchSourceBuilder searchSourceBuilder = new SearchSourceBuilder(); 6 searchSourceBuilder.query(matchQuery("title", "Elasticsearch")); 7 searchRequest.source(searchSourceBuilder); 8 9 SearchResponse searchResponse = client.search(searchRequest); 10 String scrollId = searchResponse.getScrollId(); 11 SearchHit[] searchHits = searchResponse.getHits().getHits(); 12 13 while (searchHits != null && searchHits.length > 0) { 14 SearchScrollRequest scrollRequest = new SearchScrollRequest(scrollId); 15 scrollRequest.scroll(scroll); 16 searchResponse = client.searchScroll(scrollRequest); 17 scrollId = searchResponse.getScrollId(); 18 searchHits = searchResponse.getHits().getHits(); 19 20 } 21 22 ClearScrollRequest clearScrollRequest = new ClearScrollRequest(); 23 clearScrollRequest.addScrollId(scrollId); 24 ClearScrollResponse clearScrollResponse = client.clearScroll(clearScrollRequest); 25 boolean succeeded = clearScrollResponse.isSucceeded();
- 12.排序
SearchSourceBuilder可以添加一种或多种SortBuilder。
有四种特殊的排序实现:
- field
- score
- GeoDistance
- scriptSortBuilder
1 sourceBuilder.sort(new ScoreSortBuilder().order(SortOrder.DESC)); //按照score倒序排列 2 sourceBuilder.sort(new FieldSortBuilder("_uid").order(SortOrder.ASC)); //并且按照id正序排列
- 13.过滤
默认情况下,searchRequest返回文档内容,与REST API一样,这里你可以重写search行为。例如,你可以完全关闭"_source"检索。
1 sourceBuilder.fetchSource(false);
该方法还接受一个或多个通配符模式的数组,以更细粒度地控制包含或排除哪些字段。
1 String[] includeFields = new String[] {"title", "user", "innerObject.*"}; 2 String[] excludeFields = new String[] {"_type"}; 3 sourceBuilder.fetchSource(includeFields, excludeFields);
- 14.聚合
通过配置适当的 AggregationBuilder ,再将它传入SearchSourceBuilder里,就可以完成聚合请求了。
之前的文档里面,我们通过下面这条命令,导入了一千条account信息:
curl -H "Content-Type: application/json" -XPOST ‘localhost:9200/bank/account/_bulk?pretty&refresh‘ --data-binary "@accounts.json"
随后,我们介绍了如何通过聚合请求进行分组:
GET /bank/_search?pretty { "size": 0, "aggs": { "group_by_state": { "terms": { "field": "state.keyword" } } } }
我们将这一千条数据根据state字段分组,得到响应:
{ "took": 2, "timed_out": false, "_shards": { "total": 5, "successful": 5, "skipped": 0, "failed": 0 }, "hits": { "total": 999, "max_score": 0, "hits": [] }, "aggregations": { "group_by_state": { "doc_count_error_upper_bound": 20, "sum_other_doc_count": 770, "buckets": [ { "key": "ID", "doc_count": 27 }, { "key": "TX", "doc_count": 27 }, { "key": "AL", "doc_count": 25 }, { "key": "MD", "doc_count": 25 }, { "key": "TN", "doc_count": 23 }, { "key": "MA", "doc_count": 21 }, { "key": "NC", "doc_count": 21 }, { "key": "ND", "doc_count": 21 }, { "key": "MO", "doc_count": 20 }, { "key": "AK", "doc_count": 19 } ] } } }
Java实现:
1 @Test 2 public void test2(){ 3 RestClient lowLevelRestClient = RestClient.builder( 4 new HttpHost("172.16.73.50", 9200, "http")).build(); 5 RestHighLevelClient client = 6 new RestHighLevelClient(lowLevelRestClient); 7 SearchRequest searchRequest = new SearchRequest("bank"); 8 searchRequest.types("account"); 9 TermsAggregationBuilder aggregation = AggregationBuilders.terms("group_by_state") 10 .field("state.keyword"); 11 SearchSourceBuilder searchSourceBuilder = new SearchSourceBuilder(); 12 searchSourceBuilder.aggregation(aggregation); 13 searchSourceBuilder.size(0); 14 searchRequest.source(searchSourceBuilder); 15 try { 16 SearchResponse searchResponse = client.search(searchRequest); 17 System.out.println(searchResponse.toString()); 18 } catch (IOException e) { 19 e.printStackTrace(); 20 } 21 22 }
Search response
Search response返回对象与其在API里的一样,返回一些元数据和文档数据。
首先,返回对象里的数据十分重要,因为这是查询的返回结果、使用分片情况、文档数据,HTTP状态码等
1 RestStatus status = searchResponse.status(); 2 TimeValue took = searchResponse.getTook(); 3 Boolean terminatedEarly = searchResponse.isTerminatedEarly(); 4 boolean timedOut = searchResponse.isTimedOut();
其次,返回对象里面包含关于分片的信息和分片失败的处理:
1 int totalShards = searchResponse.getTotalShards(); 2 int successfulShards = searchResponse.getSuccessfulShards(); 3 int failedShards = searchResponse.getFailedShards(); 4 for (ShardSearchFailure failure : searchResponse.getShardFailures()) { 5 // failures should be handled here 6 }
取回searchHit
为了取回文档数据,我们要从search response的返回对象里先得到searchHit对象:
1 SearchHits hits = searchResponse.getHits();
取回文档数据:
1 @Test 2 public void test2(){ 3 RestClient lowLevelRestClient = RestClient.builder( 4 new HttpHost("172.16.73.50", 9200, "http")).build(); 5 RestHighLevelClient client = 6 new RestHighLevelClient(lowLevelRestClient); 7 SearchRequest searchRequest = new SearchRequest("bank"); 8 searchRequest.types("account"); 9 SearchSourceBuilder searchSourceBuilder = new SearchSourceBuilder(); 10 searchRequest.source(searchSourceBuilder); 11 try { 12 SearchResponse searchResponse = client.search(searchRequest); 13 SearchHits searchHits = searchResponse.getHits(); 14 SearchHit[] searchHit = searchHits.getHits(); 15 for (SearchHit hit : searchHit) { 16 System.out.println(hit.getSourceAsString()); 17 } 18 } catch (IOException e) { 19 e.printStackTrace(); 20 } 21 22 }
根据需要,还可以转换成其他数据类型:
1 String sourceAsString = hit.getSourceAsString(); 2 Map<String, Object> sourceAsMap = hit.getSourceAsMap(); 3 String documentTitle = (String) sourceAsMap.get("title"); 4 List<Object> users = (List<Object>) sourceAsMap.get("user"); 5 Map<String, Object> innerObject = (Map<String, Object>) sourceAsMap.get("innerObject");
取回聚合数据
聚合数据可以通过SearchResponse返回对象,取到它的根节点,然后再根据名称取到聚合数据。
GET /bank/_search?pretty { "size": 0, "aggs": { "group_by_state": { "terms": { "field": "state.keyword" } } } }
响应:
{ "took": 2, "timed_out": false, "_shards": { "total": 5, "successful": 5, "skipped": 0, "failed": 0 }, "hits": { "total": 999, "max_score": 0, "hits": [] }, "aggregations": { "group_by_state": { "doc_count_error_upper_bound": 20, "sum_other_doc_count": 770, "buckets": [ { "key": "ID", "doc_count": 27 }, { "key": "TX", "doc_count": 27 }, { "key": "AL", "doc_count": 25 }, { "key": "MD", "doc_count": 25 }, { "key": "TN", "doc_count": 23 }, { "key": "MA", "doc_count": 21 }, { "key": "NC", "doc_count": 21 }, { "key": "ND", "doc_count": 21 }, { "key": "MO", "doc_count": 20 }, { "key": "AK", "doc_count": 19 } ] } } }
Java实现:
1 @Test 2 public void test2(){ 3 RestClient lowLevelRestClient = RestClient.builder( 4 new HttpHost("172.16.73.50", 9200, "http")).build(); 5 RestHighLevelClient client = 6 new RestHighLevelClient(lowLevelRestClient); 7 SearchRequest searchRequest = new SearchRequest("bank"); 8 searchRequest.types("account"); 9 TermsAggregationBuilder aggregation = AggregationBuilders.terms("group_by_state") 10 .field("state.keyword"); 11 SearchSourceBuilder searchSourceBuilder = new SearchSourceBuilder(); 12 searchSourceBuilder.aggregation(aggregation); 13 searchSourceBuilder.size(0); 14 searchRequest.source(searchSourceBuilder); 15 try { 16 SearchResponse searchResponse = client.search(searchRequest); 17 Aggregations aggs = searchResponse.getAggregations(); 18 Terms byStateAggs = aggs.get("group_by_state"); 19 Terms.Bucket b = byStateAggs.getBucketByKey("ID"); //只取key是ID的bucket 20 System.out.println(b.getKeyAsString()+","+b.getDocCount()); 21 System.out.println("!!!"); 22 List<? extends Bucket> aggList = byStateAggs.getBuckets();//获取bucket数组里所有数据 23 for (Bucket bucket : aggList) { 24 System.out.println("key:"+bucket.getKeyAsString()+",docCount:"+bucket.getDocCount());; 25 } 26 } catch (IOException e) { 27 e.printStackTrace(); 28 } 29 }
参考:https://www.jianshu.com/p/5cb91ed22956
参考:https://my.oschina.net/u/3795437/blog/2253366
原文地址:https://www.cnblogs.com/fnlingnzb-learner/p/10750868.html