ES bulk源码分析——ES 5.0

对bulk request的处理流程:

1、遍历所有的request,对其做一些加工,主要包括:获取routing(如果mapping里有的话)、指定的timestamp(如果没有带timestamp会使用当前时间),如果没有指定id字段,在action.bulk.action.allow_id_generation配置为true的情况下,会自动生成一个base64UUID作为id字段,并会将request的opType字段置为CREATE,因为如果是使用es自动生成的id的话,默认就是createdocument而不是updatedocument。(注:坑爹啊,我从github上面下的最新的ES代码,发现自动生成id这一段已经没有设置opType字段了,看起来和有指定id是一样的处理逻辑了,见https://github.com/elastic/elasticsearch/blob/master/core/src/main/java/org/elasticsearch/action/index/IndexRequest.java)。

2、创建一个shardId--> Operation的Map,再次遍历所有的request,获取获取每个request应该发送到的shardId,获取的过程是这样的:request有routing就直接返回,如果没有,会先对id求一个hash,这里的hash函数默认是Murmur3,当然你也可以通过配置index.legacy.routing.hash.type来决定使用的hash函数,决定发到哪个shard:

return MathUtils.mod(hash, indexMetaData.getNumberOfShards()); 注意:最新版ES代码已经改变!

即用hash对shard的总数求模来获取shardId,将shardId作为key,通过遍历的index和request组成BulkItemRequest的集合作为value放入之前说的map中(为什么要拿到遍历的index,因为在bulk response中可以看到对每个request的请求处理结果的),其实说了这么多就是要对request按shard来分组(为负载均衡)。

3、遍历上面得到的map,对不同的分组创建一个bulkShardRequest,包含配置consistencyLevel和timeout。并从集群state中获得primary shard,如果primary在本机就直接执行,如果不在会再发送到其shard所在的node。

源码位置:https://github.com/elastic/elasticsearch/blob/master/core/src/main/java/org/elasticsearch/action/bulk/TransportBulkAction.java

    void executeBulk(Task task, final BulkRequest bulkRequest, final long startTimeNanos, final ActionListener<BulkResponse> listener, final AtomicArray<BulkItemResponse> responses ) {
        final ClusterState clusterState = clusterService.state();
        // TODO use timeout to wait here if its blocked...
        clusterState.blocks().globalBlockedRaiseException(ClusterBlockLevel.WRITE);

        final ConcreteIndices concreteIndices = new ConcreteIndices(clusterState, indexNameExpressionResolver);
        MetaData metaData = clusterState.metaData();
        for (int i = 0; i < bulkRequest.requests.size(); i++) {
            DocWriteRequest docWriteRequest = bulkRequest.requests.get(i);
            //the request can only be null because we set it to null in the previous step, so it gets ignored
            if (docWriteRequest == null) {
                continue;
            }
            if (addFailureIfIndexIsUnavailable(docWriteRequest, bulkRequest, responses, i, concreteIndices, metaData)) {
                continue;
            }
            Index concreteIndex = concreteIndices.resolveIfAbsent(docWriteRequest);
            try {
                switch (docWriteRequest.opType()) {
                    case CREATE:
                    case INDEX:
                        IndexRequest indexRequest = (IndexRequest) docWriteRequest;
                        MappingMetaData mappingMd = null;
                        final IndexMetaData indexMetaData = metaData.index(concreteIndex);
                        if (indexMetaData != null) {
                            mappingMd = indexMetaData.mappingOrDefault(indexRequest.type());
                        }
                        indexRequest.resolveRouting(metaData);
                        indexRequest.process(mappingMd, allowIdGeneration, concreteIndex.getName());
                        break;
                    case UPDATE:
                        TransportUpdateAction.resolveAndValidateRouting(metaData, concreteIndex.getName(), (UpdateRequest) docWriteRequest);
                        break;
                    case DELETE:
                        TransportDeleteAction.resolveAndValidateRouting(metaData, concreteIndex.getName(), (DeleteRequest) docWriteRequest);
                        break;
                    default: throw new AssertionError("request type not supported: [" + docWriteRequest.opType() + "]");
                }
            } catch (ElasticsearchParseException | RoutingMissingException e) {
                BulkItemResponse.Failure failure = new BulkItemResponse.Failure(concreteIndex.getName(), docWriteRequest.type(), docWriteRequest.id(), e);
                BulkItemResponse bulkItemResponse = new BulkItemResponse(i, docWriteRequest.opType(), failure);
                responses.set(i, bulkItemResponse);
                // make sure the request gets never processed again
                bulkRequest.requests.set(i, null);
            }
        }

        // first, go over all the requests and create a ShardId -> Operations mapping
        Map<ShardId, List<BulkItemRequest>> requestsByShard = new HashMap<>();
        for (int i = 0; i < bulkRequest.requests.size(); i++) {
            DocWriteRequest request = bulkRequest.requests.get(i);
            if (request == null) {
                continue;
            }
            String concreteIndex = concreteIndices.getConcreteIndex(request.index()).getName();
            ShardId shardId = clusterService.operationRouting().indexShards(clusterState, concreteIndex, request.id(), request.routing()).shardId();
            List<BulkItemRequest> shardRequests = requestsByShard.computeIfAbsent(shardId, shard -> new ArrayList<>());
            shardRequests.add(new BulkItemRequest(i, request));
        }

        if (requestsByShard.isEmpty()) {
            listener.onResponse(new BulkResponse(responses.toArray(new BulkItemResponse[responses.length()]), buildTookInMillis(startTimeNanos)));
            return;
        }

        final AtomicInteger counter = new AtomicInteger(requestsByShard.size());
        String nodeId = clusterService.localNode().getId();
        for (Map.Entry<ShardId, List<BulkItemRequest>> entry : requestsByShard.entrySet()) {
            final ShardId shardId = entry.getKey();
            final List<BulkItemRequest> requests = entry.getValue();
            BulkShardRequest bulkShardRequest = new BulkShardRequest(shardId, bulkRequest.getRefreshPolicy(),
                    requests.toArray(new BulkItemRequest[requests.size()]));
            bulkShardRequest.waitForActiveShards(bulkRequest.waitForActiveShards());
            bulkShardRequest.timeout(bulkRequest.timeout());
            if (task != null) {
                bulkShardRequest.setParentTask(nodeId, task.getId());
            }
            shardBulkAction.execute(bulkShardRequest, new ActionListener<BulkShardResponse>() {
                @Override
                public void onResponse(BulkShardResponse bulkShardResponse) {
                    for (BulkItemResponse bulkItemResponse : bulkShardResponse.getResponses()) {
                        // we may have no response if item failed
                        if (bulkItemResponse.getResponse() != null) {
                            bulkItemResponse.getResponse().setShardInfo(bulkShardResponse.getShardInfo());
                        }
                        responses.set(bulkItemResponse.getItemId(), bulkItemResponse);
                    }
                    if (counter.decrementAndGet() == 0) {
                        finishHim();
                    }
                }
            });
        }
    }

路由代码:

ShardId shardId = clusterService.operationRouting().indexShards(clusterState, concreteIndex, request.id(), request.routing()).shardId();

时间: 2024-10-25 02:20:56

ES bulk源码分析——ES 5.0的相关文章

Android 资源加载Resources源码分析(8.0)

我们熟悉的资源加载代码: 1.Activity.getResources(); 2.Context.getResources(); 这2种方式获取的都是Resources对象 先看第一种获取Resources对象源码分析: 说明:(AppcompatActivity中getResource()方法与Activity.getResources()是有区别的.AppcompatActivity是new Resources(...)对象) 一:Activity.getResources()源码分析:

JQ源码分析(2.0.3)---$.extend() $.fn.extend()

$.extend():作为扩展工具方法 $.extend({ aaa:function(){ alert(123); }, bbb:function(){ alert(456); }}) $.aaa();$.bbb(); $.fn.extend():作为jQ的实例方法 $.fn.extend({ aaa:function(){ alert(123); }, bbb:function(){ alert(456); }}) $().aaa();$().bbb(); 深拷贝和浅拷贝 var a = {

[转]jQuery源码分析系列

文章转自:jQuery源码分析系列-Aaron 版本截止到2013.8.24 jQuery官方发布最新的的2.0.3为准 附上每一章的源码注释分析 :https://github.com/JsAaron/jQuery 正在编写的书 - jQuery架构设计与实现 本人在慕课网的教程(完结) jQuery源码解析(架构与依赖模块) 64课时 jQuery源码解析(DOM与核心模块)64课时 jQuery源码分析目录(完结) jQuery源码分析系列(01) : 整体架构 jQuery源码分析系列(

5 cocos2dx 3.0源码分析 渲染 render

渲染,感觉这个挺重要了,这里代入一个简单的例子 Sprite 建立及到最后的画在屏幕上, 我们描述一下这个渲染的流程: 1 sprite 初始化(纹理, 坐标,及当前元素的坐标大小信息) 2 主循环调用sprite的draw(), 把绘制命令发送到系统的render的渲染队列中. 3 Render拿到渲染队列中的渲染命令, 分别对每个命令进行处理, 我们这里的QUAD_COMMAND, 把这个命令中的坐标信息复制到自己的渲染缓冲中, 之后通过调用OpenGL命令对当前的矩形进行绘制. 1 Spr

Linux内核源码分析--内核启动之(5)Image内核启动(rest_init函数)(Linux-3.0 ARMv7)【转】

原文地址:Linux内核源码分析--内核启动之(5)Image内核启动(rest_init函数)(Linux-3.0 ARMv7) 作者:tekkamanninja 转自:http://blog.chinaunix.net/uid-25909619-id-4938395.html 前面粗略分析start_kernel函数,此函数中基本上是对内存管理和各子系统的数据结构初始化.在内核初始化函数start_kernel执行到最后,就是调用rest_init函数,这个函数的主要使命就是创建并启动内核线

Solr4.8.0源码分析(22)之 SolrCloud的Recovery策略(三)

Solr4.8.0源码分析(22)之 SolrCloud的Recovery策略(三) 本文是SolrCloud的Recovery策略系列的第三篇文章,前面两篇主要介绍了Recovery的总体流程,以及PeerSync策略.本文以及后续的文章将重点介绍Replication策略.Replication策略不但可以在SolrCloud中起到leader到replica的数据同步,也可以在用多个单独的Solr来实现主从同步.本文先介绍在SolrCloud的leader到replica的数据同步,下一篇

Linux内核源码分析--内核启动之(6)Image内核启动(do_basic_setup函数)(Linux-3.0 ARMv7)【转】

原文地址:Linux内核源码分析--内核启动之(6)Image内核启动(do_basic_setup函数)(Linux-3.0 ARMv7) 作者:tekkamanninja 转自:http://blog.chinaunix.net/uid-25909619-id-4938396.html 在基本分析完内核启动流程的之后,还有一个比较重要的初始化函数没有分析,那就是do_basic_setup.在内核init线程中调用了do_basic_setup,这个函数也做了很多内核和驱动的初始化工作,详解

Linux内核源码分析--内核启动之(3)Image内核启动(C语言部分)(Linux-3.0 ARMv7) 【转】

原文地址:Linux内核源码分析--内核启动之(3)Image内核启动(C语言部分)(Linux-3.0 ARMv7) 作者:tekkamanninja 转自:http://blog.chinaunix.net/uid-25909619-id-4938390.html 在构架相关的汇编代码运行完之后,程序跳入了构架无关的内核C语言代码:init/main.c中的start_kernel函数,在这个函数中Linux内核开始真正进入初始化阶段, 下面我就顺这代码逐个函数的解释,但是这里并不会过于深入

Linux内核源码分析--内核启动之(4)Image内核启动(setup_arch函数)(Linux-3.0 ARMv7)【转】

原文地址:Linux内核源码分析--内核启动之(4)Image内核启动(setup_arch函数)(Linux-3.0 ARMv7) 作者:tekkamanninja 转自:http://blog.chinaunix.net/uid-25909619-id-4938393.html 在分析start_kernel函数的时候,其中有构架相关的初始化函数setup_arch. 此函数根据构架而异,对于ARM构架的详细分析如下: void __init setup_arch(char **cmdlin