ES 常用java api

java rest client 有两种:

  1、Java Low Level REST Client :用于Elasticsearch的官方低层客户端。它允许通过http与Elasticsearch集群通信。叶子请求编组,响应反编组给用户。它兼容所有的Elasticsearch版本。

  2、Java High Level REST Client :Elasticsearch的官方高级客户端。它基于底层客户端,公开API特定的方法,处理请求编组和响应反编组。

一、Java Low Level REST Client

  1、先引入jar包

  <dependency>
        <groupId>org.elasticsearch.client</groupId>
        <artifactId>elasticsearch-rest-client</artifactId>
        <version>6.6.0</version>
    </dependency>

  2、编写代码

public class LowLevelRestClientTest {

    //RequestOptions类包含请求的一些部分,这些部分应该在同一个应用程序中的多个请求之间共享。你可以创建一个单实例,并在所有请求之间共享:
    private static final RequestOptions COMMON_OPTIONS;
    static {
        RequestOptions.Builder builder = RequestOptions.DEFAULT.toBuilder();
        builder.addHeader("Authorization", "kyle " + "TOKEN");
        builder.setHttpAsyncResponseConsumerFactory(
            new HttpAsyncResponseConsumerFactory
                .HeapBufferedResponseConsumerFactory(200*1024*1024));
        COMMON_OPTIONS = builder.build();
    }

    public static RestClient buildRestClient(){
        //创建RestClientBuilder
                RestClientBuilder builder = RestClient.builder(
                        new HttpHost("localhost",9201,"http"),
                        new HttpHost("localhost",9202,"http"),
                        new HttpHost("localhost",9203,"http")
                        );
                //在创建restClient的同时,设置每个请求需要发送的默认头文件,以避免在每个请求中指定它们
                Header[] defaultHeaders = new Header[]{new BasicHeader("header", "value")};
                builder.setDefaultHeaders(defaultHeaders);
                //设置应该遵守的超时,以防对同一个请求进行多次尝试,默认为3000ms.
                builder.setMaxRetryTimeoutMillis(1000);
                //设置一个监听器,用来在节点发生故障的时候,采取相应的操作。
                builder.setFailureListener(new RestClient.FailureListener(){
                    @Override
                    public void onFailure(Node node) {
                        super.onFailure(node);
                        //doSomeThing();
                    }
                });
                //将节点选择器设置为用于过滤客户机将发送请求到的节点之间的节点,这些节点被设置为客户机本身。
                //这对于防止在启用嗅探时将请求发送到专用的主节点非常有用。默认情况下,客户机向每个配置的节点发送请求
                builder.setNodeSelector(NodeSelector.SKIP_DEDICATED_MASTERS);
                //设置一个回调函数,允许修改默认的请求配置
                /*builder.setRequestConfigCallback(new RestClientBuilder.RequestConfigCallback() {

                    @Override
                    public Builder customizeRequestConfig(Builder arg0) {
                        return null;
                    }
                });*/

                //设置一个回调,允许修改http客户机配置
                /*builder.setHttpClientConfigCallback(new HttpClientConfigCallback(){

                    @Override
                    public HttpAsyncClientBuilder customizeHttpClient(HttpAsyncClientBuilder arg0) {
                        return null;
                    }

                });*/

                //创建restClient,中间的那些配置也可以不设置
                RestClient restClient = builder.build();

                return restClient;
    }

    public static void request(RestClient restClient) throws ParseException, IOException{
        //创建一个请求组
                Request request = new Request("GET","/_search");
                //为请求添加一些参数
                request.addParameter("pretty", "true");

                //请求的主体设置为任何HttpEntity,
                //为HttpEntity指定的ContentType非常重要,因为它将用于设置content - type头部,以便Elasticsearch能够正确解析内容。
                //request.setEntity(new NStringEntity("{\"json\":\"text\"}",ContentType.APPLICATION_JSON));

                //还可以将其设置为一个字符串,该字符串将默认为application/json的ContentType。
                request.setJsonEntity("{\"query\": {\"match\": {\"address\": \"Street\"}}}");

                //非必须
                request.setOptions(COMMON_OPTIONS);

                //发送一个同步的请求,线程会阻塞
                Response response = restClient.performRequest(request);

                //发送异步请求,然后使用监听器来对返回结果进行处理
                /*restClient.performRequestAsync(request, new ResponseListener() {

                    @Override
                    public void onSuccess(Response resp) {
                        System.out.println("成功");
                    }

                    @Override
                    public void onFailure(Exception arg0) {
                        System.out.println("失败");
                    }
                });*/

                //有关已执行请求的信息
                RequestLine requestLine = response.getRequestLine();
                //返回响应的主机
                HttpHost host = response.getHost();
                //响应状态行,您可以从中检索状态代码
                int statusCode = response.getStatusLine().getStatusCode();
                //响应头
                Header[] headers = response.getHeaders();

                String responseBody = EntityUtils.toString(response.getEntity());

                System.out.println(requestLine.getUri());
                System.out.println(host);
                System.out.println(statusCode);
                System.out.println(headers);
                System.out.println(responseBody);

    }

    public static void main(String[] args) throws IOException {
        RestClient restClient = buildRestClient();
        request(restClient);
        //关闭restClient
        restClient.close();
    }

  

  3、RequestConfigCallback 和 HttpClientConfigCallback 的一些常用的配置

  RequestConfigCallback和HttpClientConfigCallback允许Apache Async Http客户机公开的任何定制。这些回调使修改客户机的某些特定行为成为可能,而无需覆盖使用RestClient初始化的所有其他默认配置。本节描述一些常见的场景,这些场景需要对底层Java REST客户机进行额外的配置。

  3.1、身份验证

final CredentialsProvider credentialsProvider =
    new BasicCredentialsProvider();
credentialsProvider.setCredentials(AuthScope.ANY,
    new UsernamePasswordCredentials("user", "password"));

RestClientBuilder builder = RestClient.builder(
    new HttpHost("localhost", 9200))
    .setHttpClientConfigCallback(new HttpClientConfigCallback() {
        @Override
        public HttpAsyncClientBuilder customizeHttpClient(
                HttpAsyncClientBuilder httpClientBuilder) {
            //httpClientBuilder.disableAuthCaching(); ①
            return httpClientBuilder
                .setDefaultCredentialsProvider(credentialsProvider);
        }
    });

  ①可以禁用抢占式身份验证,这意味着每个请求都将在没有授权头的情况下发送,以查看它是否被接受,并且在接收到HTTP 401响应后,它将使用基本身份验证头重新发送完全相同的请求。

  3.2、默认情况下,Apache Http Async客户机启动一个dispatcher线程和连接管理器使用的多个工作线程,以及本地检测到的处理器的数量(取决于Runtime.getRuntime(). availableprocessors()返回的是什么)。线程数可以修改如下:

RestClientBuilder builder = RestClient.builder(
    new HttpHost("localhost", 9200))
    .setHttpClientConfigCallback(new HttpClientConfigCallback() {
        @Override
        public HttpAsyncClientBuilder customizeHttpClient(
                HttpAsyncClientBuilder httpClientBuilder) {
            return httpClientBuilder.setDefaultIOReactorConfig(
                IOReactorConfig.custom()
                    .setIoThreadCount(1)
                    .build());
        }
    });

  3.3、设置链接超时或者socket超时

RestClientBuilder builder = RestClient.builder(
    new HttpHost("localhost", 9200))
    .setRequestConfigCallback(
        new RestClientBuilder.RequestConfigCallback() {
            @Override
            public RequestConfig.Builder customizeRequestConfig(
                    RequestConfig.Builder requestConfigBuilder) {
                return requestConfigBuilder
                    .setConnectTimeout(5000)
                    .setSocketTimeout(60000);
            }
        })
    .setMaxRetryTimeoutMillis(60000);

  3.4、加密传输

KeyStore truststore = KeyStore.getInstance("jks");
try (InputStream is = Files.newInputStream(keyStorePath)) {
    truststore.load(is, keyStorePass.toCharArray());
}
SSLContextBuilder sslBuilder = SSLContexts.custom()
    .loadTrustMaterial(truststore, null);
final SSLContext sslContext = sslBuilder.build();
RestClientBuilder builder = RestClient.builder(
    new HttpHost("localhost", 9200, "https"))
    .setHttpClientConfigCallback(new HttpClientConfigCallback() {
        @Override
        public HttpAsyncClientBuilder customizeHttpClient(
                HttpAsyncClientBuilder httpClientBuilder) {
            return httpClientBuilder.setSSLContext(sslContext);
        }
    });

 4、嗅探器(Sniffer):允许从运行的elasticsearch集群中自动发现节点,并将其设置到现有的RestClient实例中。 默认情况下,它将使用Nodes Info api来检索属于集群的节点,并使用jackson解析响应的json数据。假如集群中有100个节点,如果我们全部用手写进代码那样麻烦而且容易出错。这时候就可以使用嗅探器来自动发现节点。

  4.1、先引入jar包

<dependency>
    <groupId>org.elasticsearch.client</groupId>
    <artifactId>elasticsearch-rest-client-sniffer</artifactId>
    <version>6.6.0</version>
</dependency>

  4.2、sinffer的创建和关闭

RestClient restClient = RestClient.builder(
    new HttpHost("localhost", 9201, "http"))
    .build();
//创建Sniffer 并设置一分钟更新一次,默认为5分钟
Sniffer sniffer = Sniffer.builder(restClient)
    .setSniffIntervalMillis(60000).build();

//sniffer需要在restClient之前关闭
sniffer.close();
restClient.close();

  4.3、还可以启用故障嗅探功能,这意味着在每次故障之后,节点列表将立即更新,而不是在接下来的普通嗅探轮中更新。在这种情况下,需要首先创建一个SniffOnFailureListener,并在创建RestClient时提供它。同样,在稍后创建嗅探器之后,它需要与相同的SniffOnFailureListener实例相关联,该实例将在每次失败时得到通知,并使用嗅探器执行所述的额外嗅探。

SniffOnFailureListener sniffOnFailureListener =
    new SniffOnFailureListener();
RestClient restClient = RestClient.builder(
    new HttpHost("localhost", 9201))
    .setFailureListener(sniffOnFailureListener)
    .build();
Sniffer sniffer = Sniffer.builder(restClient)
    .setSniffAfterFailureDelayMillis(30000)
    .build();
sniffOnFailureListener.setSniffer(sniffer);

  4.4、使用https进行连接

RestClient restClient = RestClient.builder(
        new HttpHost("localhost", 9201, "http"))
        .build();
NodesSniffer nodesSniffer = new ElasticsearchNodesSniffer(
        restClient,
        ElasticsearchNodesSniffer.DEFAULT_SNIFF_REQUEST_TIMEOUT,
        ElasticsearchNodesSniffer.Scheme.HTTPS);
Sniffer sniffer = Sniffer.builder(restClient)
        .setNodesSniffer(nodesSniffer).build();

  4.5、同样,也可以定制sniffRequestTimeout,默认值为1秒。是超时参数作为一个查询字符串参数当调用节点信息提供api,所以当超时过期在服务器端,一个有效的响应仍返回虽然可能只包含节点的一个子集,是集群的一部分,那些在那之前已经做出了回应。

RestClient restClient = RestClient.builder(
    new HttpHost("localhost", 9201, "http"))
    .build();
NodesSniffer nodesSniffer = new ElasticsearchNodesSniffer(
    restClient,
    TimeUnit.SECONDS.toMillis(5),
    ElasticsearchNodesSniffer.Scheme.HTTP);
Sniffer sniffer = Sniffer.builder(restClient)
    .setNodesSniffer(nodesSniffer).build();

二、Java High Level REST Client

  Java高级REST客户机在Java低级REST客户机之上工作。它的主要目标是公开API特定的方法,这些方法接受请求对象作为参数并返回响应对象,以便请求编组和响应反编组由客户机本身处理。 可以同步或异步调用每个API。同步方法返回一个响应对象,而异步方法(名称以async后缀结尾)则需要一个侦听器参数,一旦响应或错误为r,侦听器参数(在低级客户机管理的线程池上)就会被通知

  1、兼容性

    1.1、Java高级REST客户机需要Java 1.8,并且依赖于Elasticsearch core项目。客户端版本与客户端开发的Elasticsearch版本相同

    1.2、高级客户机保证能够与运行在相同主版本和更大或更小版本上的Elasticsearch节点通信。它不需要处于与之通信的Elasticsearch节点相同的次要版本中,因为它是向前兼容的,这意味着它支持与后来版本的Elasticsearch通信。

但是向前版本通信可能会有不兼容的。例如在6.1和6.0之间,如果6.1客户机支持一些api的新请求体字段,而这些api是6.0节点所不知道的。

    1.3、建议当elasticsearch集群版本升级后,最好是将客户端版本也升级到该版本。

  2、引入jar包

<dependency>
    <groupId>org.elasticsearch.client</groupId>
    <artifactId>elasticsearch-rest-high-level-client</artifactId>
    <version>6.6.0</version>
</dependency>

  3、编写代码

public class HighLeveRestClientTest {

    public static void main(String[] args) throws IOException {
        //创建高级rest客户端
        /*高级客户端将在内部创建用于基于提供的构建器执行请求的低级客户端。这个低级客户机维护一个连接池,并启动一些线程,
         * 所以当您真正完成高级客户机的操作时,应该关闭它,然后关闭内部低级客户机来释放这些资源。
         * */
        //高级rest客户端和低级客户端一样设置requestOptions,具体参考低级客户端。
        RestHighLevelClient restClient = new RestHighLevelClient(
                RestClient.builder(
                        new HttpHost("127.0.0.1", 9201),
                        new HttpHost("127.0.0.1", 9202),
                        new HttpHost("127.0.0.1", 9203)
                        )
                );

        //构建请求,有很多种构建请求的方式,这里只列举一种
        IndexRequest request = new IndexRequest(
                "posts",
                "doc",
                "1");
        String jsonString = "{" +
                "\"user\":\"kimchy\"," +
                "\"postDate\":\"2013-01-30\"," +
                "\"message\":\"trying out Elasticsearch\"" +
                "}";
        request.source(jsonString, XContentType.JSON);

        //执行请求
        IndexResponse indexResponse = restClient.index(request, RequestOptions.DEFAULT);

        //处理返回
        String index = indexResponse.getIndex();
        String type = indexResponse.getType();
        String id = indexResponse.getId();
        long version = indexResponse.getVersion();
        System.out.println(index);
        System.out.println(type);
        System.out.println(id);
        System.out.println(version);
        if (indexResponse.getResult() == DocWriteResponse.Result.CREATED) {
            System.out.println("创建");
        } else if (indexResponse.getResult() == DocWriteResponse.Result.UPDATED) {
            System.out.println("更新");
        }
        ReplicationResponse.ShardInfo shardInfo = indexResponse.getShardInfo();
        if (shardInfo.getTotal() != shardInfo.getSuccessful()) {

        }
        if (shardInfo.getFailed() > 0) {
            for (ReplicationResponse.ShardInfo.Failure failure :
                    shardInfo.getFailures()) {
                String reason = failure.reason();
                System.out.println(reason);
            }
        }
        //关闭客户端
        restClient.close();
    }

}

由于后续的api太多,懒得记录了,上个链接吧https://www.elastic.co/guide/en/elasticsearch/client/java-rest/current/java-rest-high.html

原文地址:https://www.cnblogs.com/kyleinjava/p/10522173.html

时间: 2024-10-09 16:03:58

ES 常用java api的相关文章

HBase的常用Java API

1. 创建HBase表的对象 HBase表的对项名字叫HTable,创建它的方法有很多,常见的有如下: org.apache.hadoop.hbase.client.HTable hTable = new HTable(org.apache.hadoop.hbase.HBaseConfiguration conf, String tableName); 或 org.apache.hadoop.hbase.client.HTable hTable = new HTable(org.apache.h

常用 Java API

常用Java API 一. java.io.BufferedReader类(用于从文件中读入一段字符:所属套件:java.io) 1. 构造函数BufferedReader(java.io.FileReader FileReader变量) 说明:新建一个BufferReader对象. 2. close方法 void close() 说明:关闭BufferReader对象. 3. readLine方法 java.lang.string readLine() 说明:从文件中读取一行字符.若为NULL

HBase 常用java api获得客户端,创建表,查询,删除

1,前期准备 (1) 本文采用的hbase是采用三台服务器搭建的集群,zookeeper也是相同服务器搭建的集群,集群ip分别是192.168.183.101: 192.168.183.102: 192.168.183.103.其中102是主节点(HMaster),101以及103都是HRegionServer (2) 这次测试安装的hbase的版本是 hbase-0.99.2.-bin.tar (3)java api引用的maven依赖路径如下 <dependency> <groupI

常用Java API

一. java.io.BufferedReader类(用于从文件中读入一段字符:所属套件:java.io) 1. 构造函数BufferedReader(java.io.FileReader FileReader变量) 说明:新建一个BufferReader对象. 2. close方法 void close() 说明:关闭BufferReader对象. 3. readLine方法 java.lang.string readLine() 说明:从文件中读取一行字符.若为NULL,代表读取至文件结尾.

kafka系列五、kafka常用java API

引入maven包 <dependency> <groupId>org.apache.kafka</groupId> <artifactId>kafka_2.11</artifactId> <version>0.10.0.0</version> </dependency> 一.同步发送消息 package com.example.demo.kafka; import org.apache.kafka.client

常用JAVA API :String 、StringBuilder、StringBuffer的常用方法和区别

摘要 本文将介绍String.StringBuilder类的常用方法. 在java中String类不可变的,创建一个String对象后不能更改它的值.所以如果需要对原字符串进行一些改动操作,就需要用StringBuilder类或者StringBuffer类,StringBuilder比StringBuffer更快一些,缺点是StringBuilder不是线程安全的,但在算法竞赛中一般我们用不到多线程.所以,主要推荐使用StringBuilder类. String: 方法概述: String 类包

ElasticSearch AggregationBuilders java api常用聚会查询

以球员信息为例,player索引的player type包含5个字段,姓名,年龄,薪水,球队,场上位置.index的mapping为: "mappings": { "player": { "properties": { "name": { "index": "not_analyzed", "type": "string" }, "age&

Hbase框架原理及相关的知识点理解、Hbase访问MapReduce、Hbase访问Java API、Hbase shell及Hbase性能优化总结

转自:http://blog.csdn.net/zhongwen7710/article/details/39577431 本blog的内容包含: 第一部分:Hbase框架原理理解 第二部分:Hbase调用MapReduce函数使用理解 第三部分:Hbase调用Java API使用理解 第四部分:Hbase Shell操作 第五部分:Hbase建表.读写操作方式性能优化总结 第一部分:Hbase框架原理理解 概述 HBase是一个构建在HDFS上的分布式列存储系统:HBase是基于Google

Java基础----Java API中的常用类

System:描述系统的一些信息 preperties();获取系统信息 Properties prop =new System.getProperties(); 是hashtable 的子类.用map的方法去除该类集合中的元素.该集合中存储的都是字符串,没有泛型定义. String calue=(String)prop.get(obj); System.out.println(obj+":"+value); //如何在系统中自定义一些特有信息? System.setProperty(