Elasticsearch High Level Rest Client 发起请求的过程分析

本文讨论的是JAVA High Level Rest Client向ElasticSearch6.3.2发送请求(index操作、update、delete……)的一个详细过程的理解,主要涉及到Rest Client如何选择哪一台Elasticsearch服务器发起请求。

maven依赖如下:

<dependency>
    <groupId>org.elasticsearch.client</groupId>
    <artifactId>elasticsearch-rest-high-level-client</artifactId>
    <version>6.3.2</version>
</dependency>

High Level Rest Client 为这些请求提供了两套接口:同步和异步,异步接口以Async结尾。以update请求为例,如下:

官方也提供了详细的示例来演示如何使用这些API:java-rest-high,在使用之前需要先初始化一个RestHighLevelClient 然后就可以参考API文档开发了。RestHighLevelClient 底层封装的是一个http连接池,当需要执行 update、index、delete操作时,直接从连接池中取出一个连接,然后发送http请求到ElasticSearch服务端,服务端基于Netty接收请求。

The high-level client will internally create the low-level client used to perform requests based on the provided builder. That low-level client maintains a pool of connections 

本文的主要内容是探究一下 index/update/delete请求是如何一步步构造,并发送到ElasticSearch服务端的,并重点探讨选择向哪个ElasticSearch服务器发送请求的 round robin 算法

以update请求为例:构造了update请求后:执行esClient.update(updateRequest);发起请求:

updateRequest.doc(XContentFactory.jsonBuilder().startObject().field(fieldName, val).endObject());
            UpdateResponse response = esClient.update(updateRequest);

最终会执行到performRequest(),index、delete请求最终也是执行到这个方法:

    /**
     * Sends a request to the Elasticsearch cluster that the client points to. Blocks until the request is completed and returns
     * its response or fails by throwing an exception. Selects a host out of the provided ones in a round-robin fashion. Failing hosts
     * are marked dead and retried after a certain amount of time (minimum 1 minute, maximum 30 minutes), depending on how many times
     * they previously failed (the more failures, the later they will be retried). In case of failures all of the alive nodes (or dead
     * nodes that deserve a retry) are retried until one responds or none of them does, in which case an {@link IOException} will be thrown.
     *
     *
     */
    public Response performRequest(String method, String endpoint, Map<String, String> params,
                                   HttpEntity entity, HttpAsyncResponseConsumerFactory httpAsyncResponseConsumerFactory,
                                   Header... headers) throws IOException {
        SyncResponseListener listener = new SyncResponseListener(maxRetryTimeoutMillis);
        performRequestAsyncNoCatch(method, endpoint, params, entity, httpAsyncResponseConsumerFactory,
            listener, headers);
        return listener.get();
    }

看这个方法的注释,向Elasticsearch cluster发送请求,并等待响应。等待响应就是通过创建一个SyncResponseListener,然后执行performRequestAsyncNoCatch先异步把HTTP请求发送出去,然后SyncResponseListener等待获取请求的响应结果,即:listener.get();阻塞等待直到拿到HTTP请求的响应结果。

performRequestAsyncNoCatch()里面调用的内容如下:

client.execute(requestProducer, asyncResponseConsumer, context, new FutureCallback<HttpResponse>() {
            @Override
            public void completed(HttpResponse httpResponse) {

也就是CloseableHttpAsyncClient的execute()方法向ElasticSearch服务端发起了HTTP请求。(rest-high-level client封装的底层http连接池)

以上就是:ElasticSearch JAVA High Level 同步方法的具体执行过程。总结起来就二句:performRequestAsyncNoCatch异步发送请求,SyncResponseListener阻塞获取响应结果。异步方法的执行方式也是类似的。

这篇文章中提到,ElasticSearch集群中每个节点默认都是Coordinator 节点,可以接收Client的请求。因为在创建ElasticSearch JAVA High Level 时,一般会配置多个IP地址,如下就配置了三台:

//      es中默认 每个节点都是 coordinating node
            String[] nodes = clusterNode.split(",");
            HttpHost host_0 = new HttpHost(nodes[0].split(":")[0], Integer.parseInt(nodes[0].split(":")[1]), "http");
            HttpHost host_1 = new HttpHost(nodes[1].split(":")[0], Integer.parseInt(nodes[1].split(":")[1]), "http");
            HttpHost host_2 = new HttpHost(nodes[2].split(":")[0], Integer.parseInt(nodes[2].split(":")[1]), "http");
            restHighLevelClient = new RestHighLevelClient(RestClient.builder(host_0, host_1, host_2));

那么,Client在发起HTTP请求时,到底是请求到了哪台ElasticSearch服务器上呢?这就是本文想要讨论的问题。

而发送请求主要由RestClient实现,看看这个类的源码注释,里面就提到了sending a request, a host gets selected out of the provided ones in a round-robin fashion.

/**
 * Client that connects to an Elasticsearch cluster through HTTP.
 * The hosts that are part of the cluster need to be provided at creation time, but can also be replaced later
 * The method {@link #performRequest(String, String, Map, HttpEntity, Header...)} allows to send a request to the cluster. When
 * sending a request, a host gets selected out of the provided ones in a round-robin fashion. Failing hosts are marked dead and
 * retried after a certain amount of time (minimum 1 minute, maximum 30 minutes), depending on how many times they previously
 * failed (the more failures, the later they will be retried). In case of failures all of the alive nodes (or dead nodes that
 * deserve a retry) are retried until one responds or none of them does, in which case an {@link IOException} will be thrown.
 * <p>
 * Requests can be either synchronous or asynchronous. The asynchronous variants all end with {@code Async}.
 * <p>
 */
public class RestClient implements Closeable {

    //一些代码

        /**
     * {@code HostTuple} enables the {@linkplain HttpHost}s and {@linkplain AuthCache} to be set together in a thread
     * safe, volatile way.
     */
    private static class HostTuple<T> {
        final T hosts;
        final AuthCache authCache;

        HostTuple(final T hosts, final AuthCache authCache) {
            this.hosts = hosts;
            this.authCache = authCache;
        }
    }
}
    

HostTuple是RestClient是静态内部类,封装在配置文件中配置的ElasticSearch集群中各台机器的IP地址和端口。

因此,对于Client而言,存在2个问题:

  1. 怎样选一台“可靠的”机器,然后放心地把我的请求交给它?
  2. 如果Client端的请求量非常大,不能老是把请求都往ElasticSearch某一台服务器发,应该要考虑一下负载均衡。

其实具体的算法实现细节我也没有深入去研究理解,不过把这两个问题抽象出来,其实在很多场景中都能碰到。

客户端想要连接服务端,服务器端提供了很多主机可供选择,我应该需要考虑哪些因素,选一台合适的主机连接?

performRequestAsync方法的参数中,会调用RestClient类的netxtHost():方法,选择合适的ElasticSearch服务器IP进行连接。

void performRequestAsyncNoCatch(String method, String endpoint, Map<String, String> params,
                                    HttpEntity entity, HttpAsyncResponseConsumerFactory httpAsyncResponseConsumerFactory,
                                    ResponseListener responseListener, Header... headers) {

    //省略其他无关代码
        performRequestAsync(startTime, nextHost(), request, ignoreErrorCodes, httpAsyncResponseConsumerFactory,
                failureTrackingResponseListener);
}
 /**
     * Returns an {@link Iterable} of hosts to be used for a request call.
     * Ideally, the first host is retrieved from the iterable and used successfully for the request.
     * Otherwise, after each failure the next host has to be retrieved from the iterator so that the request can be retried until
     * there are no more hosts available to retry against. The maximum total of attempts is equal to the number of hosts in the iterable.
     * The iterator returned will never be empty. In case there are no healthy hosts available, or dead ones to be be retried,
     * one dead host gets returned so that it can be retried.
     */
    private HostTuple<Iterator<HttpHost>> nextHost() {

?

nextHost()方法的大致逻辑如下:

do{
    //先从HostTuple中拿到ElasticSearch集群配置的主机信息
    //....

    if (filteredHosts.isEmpty()) {
        //last resort: if there are no good hosts to use, return a single dead one, the one that‘s closest to being retried
        //所有的主机都不可用,那就死马当活马医
        HttpHost deadHost = sortedHosts.get(0).getKey();
        nextHosts = Collections.singleton(deadHost);
    }else{
        List<HttpHost> rotatedHosts = new ArrayList<>(filteredHosts);
        //rotate()方法选取最适合连接的主机
                Collections.rotate(rotatedHosts, rotatedHosts.size() - lastHostIndex.getAndIncrement());
                nextHosts = rotatedHosts;
    }

}while(nextHosts.isEmpty())

选择ElasticSearch主机连接主要是由rotate()实现的。该方法里面又有2种实现,具体代码就不贴了,看注释:

    /**
     * Rotates the elements in the specified list by the specified distance.
     * After calling this method, the element at index <tt>i</tt> will be
     * the element previously at index <tt>(i - distance)</tt> mod
     * <tt>list.size()</tt>, for all values of <tt>i</tt> between <tt>0</tt>
     * and <tt>list.size()-1</tt>, inclusive.  (This method has no effect on
     * the size of the list.)
     *
     * <p>For example, suppose <tt>list</tt> comprises<tt> [t, a, n, k, s]</tt>.
     * After invoking <tt>Collections.rotate(list, 1)</tt> (or
     * <tt>Collections.rotate(list, -4)</tt>), <tt>list</tt> will comprise
     * <tt>[s, t, a, n, k]</tt>.
     *
     * <p>Note that this method can usefully be applied to sublists to
     * move one or more elements within a list while preserving the
     * order of the remaining elements.  For example, the following idiom
     * moves the element at index <tt>j</tt> forward to position
     * <tt>k</tt> (which must be greater than or equal to <tt>j</tt>):
     * <pre>
     *     Collections.rotate(list.subList(j, k+1), -1);
     * </pre>
     * To make this concrete, suppose <tt>list</tt> comprises
     * <tt>[a, b, c, d, e]</tt>.  To move the element at index <tt>1</tt>
     * (<tt>b</tt>) forward two positions, perform the following invocation:
     * <pre>
     *     Collections.rotate(l.subList(1, 4), -1);
     * </pre>
     * The resulting list is <tt>[a, c, d, b, e]</tt>.
     *
     * <p>To move more than one element forward, increase the absolute value
     * of the rotation distance.  To move elements backward, use a positive
     * shift distance.
     *
     * <p>If the specified list is small or implements the {@link
     * RandomAccess} interface, this implementation exchanges the first
     * element into the location it should go, and then repeatedly exchanges
     * the displaced element into the location it should go until a displaced
     * element is swapped into the first element.  If necessary, the process
     * is repeated on the second and successive elements, until the rotation
     * is complete.  If the specified list is large and doesn‘t implement the
     * <tt>RandomAccess</tt> interface, this implementation breaks the
     * list into two sublist views around index <tt>-distance mod size</tt>.
     * Then the {@link #reverse(List)} method is invoked on each sublist view,
     * and finally it is invoked on the entire list.  For a more complete
     * description of both algorithms, see Section 2.3 of Jon Bentley‘s
     * <i>Programming Pearls</i> (Addison-Wesley, 1986).
     *
     */
    public static void rotate(List<?> list, int distance) {
        if (list instanceof RandomAccess || list.size() < ROTATE_THRESHOLD)
            rotate1(list, distance);
        else
            rotate2(list, distance);
    }

如果想要了解算法的具体思路就结合源码并参考:《编程珠玑》2.3节中的详细描述。

原文:https://www.cnblogs.com/hapjin/p/10116073.html

原文地址:https://www.cnblogs.com/hapjin/p/10116073.html

时间: 2024-10-05 00:35:52

Elasticsearch High Level Rest Client 发起请求的过程分析的相关文章

使用Java High Level REST Client操作elasticsearch

说明 在明确了ES的基本概念和使用方法后,我们来学习如何使用ES的Java API. 本文假设你已经对ES的基本概念已经有了一个比较全面的认识. 客户端 你可以用Java客户端做很多事情: 执行标准的index,get,delete,update,search等操作. 在正在运行的集群上执行管理任务. 但是,通过官方文档可以得知,现在存在至少三种Java客户端. Transport Client Java High Level REST Client Java Low Level Rest Cl

Elasticsearch java api操作(二)(Java High Level Rest Client)

一.说明: 一.Elasticsearch提供了两个JAVA REST Client版本: 1.java low level rest client: 低级别的rest客户端,通过http与集群交互,用户需自己编组请求JSON串,及解析响应JSON串.兼容所有Elasticsearch版本. 特点:maven引入 使用介绍: https://www.elastic.co/guide/en/elasticsearch/client/java-rest/current/java-rest-low.h

Photon Server 实现注册与登录(三) --- 前端UI设计和发起请求

一.打开之前的测试项目.先将服务端代码编译一下,在 bin/Debug/目录下会发现有一个Common.dill.我们相应导入到前端使用.直接拖拽到相应地方 UI相应布局属于前端操作,这里就不做介绍了.详细查看视频:https://www.bilibili.com/video/av35109390/?p=70 二.代码处理:跟后端一样,前端发起请求的地方也会随着项目变大而变多,所以也得单独区分出每个请求独自管理. Request.cs 请求基类 LoginRequest.cs 登录请求逻辑 Re

ElasticSearch的javaAPI之Client

翻译的原文:http://www.elasticsearch.org/guide/en/elasticsearch/client/java-api/current/client.html#node-client 翻译ElasticSearch的javaAPI之Client 本节描述了elasticsearch提供的Java API,所有elasticsearch操作使用client对象执行. 所有的操作在本质上是完全asynchronous(接受一个listener,或返回一个future).

Nginx与Tomcat、Client之间请求的长连接配置不一致问题解决[转]

http://bert82503.iteye.com/blog/2152613 前些天,线上出现“服务端长连接与客户端短连接引起Nginx的Writing.Active连接数过高问题”,这个是由于“服务端使用HTTPs长连接,而客户端使用短连接”引起.这几天,发现Nginx与Tomcat之间也存在同样的问题,原因是两边的相关配置参数不一致引起的.(这是心细活!) 先说说服务为什么使用HTTPs长连接技术?有如下几个原因: 对响应时间要求较高: 服务走的是公网,客户端与服务端的TCP建立的三次握手

elasticsearch java 客户端之Client简介

elasticsearch通过构造一个client体现对外提供了一套丰富的java调用接口.总体来说client分为两类cluster信息方面的client及数据(index)方面的client.这两个大类由可以分为普通操作和admin操作两类.以下是client的继承关系(1.5版本,其它版本可能不一样): 通过这个继承关系图可以很清楚的了解client的实现,及功能.总共有三类即client, indicesAdminClient和ClusterAdminClient.它都有自己的实现类,但

通过WebClient类来发起请求并下载html 抓取邮箱 图片

using System; using System.Collections.Generic; using System.Linq; using System.Text; using System.Net; using System.Text.RegularExpressions; using System.IO; namespace 通过WebClient类来发起请求并下载html 抓取邮箱 图片 { class Program { static void Main(string[] args

微信应用号小程序发起请求wx.request(OBJECT)

微信应用号小程序发起请求wx.request(OBJECT) wx.request(OBJECT) ? wx.request发起的是https请求.一个微信小程序,同时只能有5个网络请求连接. OBJECT参数说明: 参数名 类型 必填 说明 url String 是 开发者服务器接口地址 data Object.String 否 请求的参数 header Object 否 设置请求的header , header中不能设置Referer method String 否 默认为GET,有效值:O

[WEB API] CLIENT 指定请求及回应格式(XML/JSON)

[Web API] Client 指定请求及响应格式(xml/json) Web API 支持的格式请参考 http://www.asp.net/web-api/overview/formats-and-model-binding 本篇则要演练xml/json 回应 Get 定义 Header:Content-Type 定义 QueryString 请求 Post POST json 数据 POST xml 数据 回应 Get 定义 Header:Content-Type 测试工具:本来想用 p