ES6.3.2 副本失败处理

ES6.3.2 副本失败处理

副本的失败处理对理解ES的数据副本模型很有帮助。在ES6.3.2 index操作源码流程的总结中提到:ES的写操作会先写主分片,然后主分片再将操作同步到副本分片。本文给出ES中的源码片断,分析副本执行操作失败时,ES是如何处理的。

副本执行源码:replicasProxy.performOn实现了副本操作,执行正常结束回调onResponse(),异常回调onFailure()

replicasProxy.performOn(shard, replicaRequest, globalCheckpoint, new ActionListener<ReplicaResponse>() {
            @Override
            public void onResponse(ReplicaResponse response) {
                successfulShards.incrementAndGet();
                try {
                    primary.updateLocalCheckpointForShard(shard.allocationId().getId(), response.localCheckpoint());//执行成功回调更新检查点
                    primary.updateGlobalCheckpointForShard(shard.allocationId().getId(), response.globalCheckpoint());
                } catch (final AlreadyClosedException e) {
                    // okay, the index was deleted or this shard was never activated after a relocation; fall through and finish normally
                } catch (final Exception e) {
                    // fail the primary but fall through and let the rest of operation processing complete
                    final String message = String.format(Locale.ROOT, "primary failed updating local checkpoint for replica %s", shard);
                    primary.failShard(message, e);
                }
                decPendingAndFinishIfNeeded();//不管是正常的onResponse还是异常的onFailure,都会调用这个方法,代表已经完成了一个操作,pendingActions减1
            }

            @Override
            public void onFailure(Exception replicaException) {
                logger.trace(() -> new ParameterizedMessage(
                    "[{}] failure while performing [{}] on replica {}, request [{}]",
                    shard.shardId(), opType, shard, replicaRequest), replicaException);
                // Only report "critical" exceptions - TODO: Reach out to the master node to get the latest shard state then report.
                if (TransportActions.isShardNotAvailableException(replicaException) == false) {
                    RestStatus restStatus = ExceptionsHelper.status(replicaException);
                    shardReplicaFailures.add(new ReplicationResponse.ShardInfo.Failure(
                        shard.shardId(), shard.currentNodeId(), replicaException, restStatus, false));
                }
                String message = String.format(Locale.ROOT, "failed to perform %s on replica %s", opType, shard);
                //---> failShardIfNeeded 具体执行何种操作要看 replicasProxy的真正实现类:如果是WriteActionReplicasProxy则会报告shard错误
                replicasProxy.failShardIfNeeded(shard, message,
                    replicaException, ReplicationOperation.this::decPendingAndFinishIfNeeded,
                    ReplicationOperation.this::onPrimaryDemoted, throwable -> decPendingAndFinishIfNeeded());
            }
        });
    }

执行正常结束回调onResponse()

successfulShards.incrementAndGet();,在返回的结果里面,_shards 字段里面就能看到 successful 数值。

更新 local checkpoint 和 global checkpoint:如果检查点更新失败,触发:replica shard engine 关闭。

/**
     * Fails the shard and marks the shard store as corrupted if
     * <code>e</code> is caused by index corruption
     *
     * org.elasticsearch.index.shard.IndexShard#failShard
     */
    public void failShard(String reason, @Nullable Exception e) {
        // fail the engine. This will cause this shard to also be removed from the node's index service.
        getEngine().failEngine(reason, e);
    }
fail engine due to some error. the engine will also be closed.
The underlying store is marked corrupted iff failure is caused by index corruption

关于检查点,可参考这篇文章:elasticsearch-sequence-ids-6-0

异常结束回调 onFailure()

replicasProxy.failShardIfNeeded(shard, message,
                    replicaException, ReplicationOperation.this::decPendingAndFinishIfNeeded,
                    ReplicationOperation.this::onPrimaryDemoted, throwable -> decPendingAndFinishIfNeeded());

failShardIfNeeded 可以做2件事情,具体是如何执行得看failShardIfNeeded的实现类。

  1. onPrimaryDemoted

    通知master primary stale(过时)了。index操作首先在primary shard执行成功了,然后同步给replica,但是replica发现此primary shard 的 primary term 比它知道的该索引的primary term 还小,于是replica就认为此primary shard是一个已经过时了的primary shard,因此就回调onFailure()拒绝执行,并执行onPrimaryDemoted通知master节点。

    private void onPrimaryDemoted(Exception demotionFailure) {
            String primaryFail = String.format(Locale.ROOT,
                "primary shard [%s] was demoted while failing replica shard",
                primary.routingEntry());
            // we are no longer the primary, fail ourselves and start over
            primary.failShard(primaryFail, demotionFailure);
            finishAsFailed(new RetryOnPrimaryException(primary.routingEntry().shardId(), primaryFail, demotionFailure));
        }

    ?

  2. decPendingAndFinishIfNeeded

    计数。一个请求会由ReplicationGroup中的 多个分片执行,这些分片是否都已经执行完成了?就由pendingActions计数。不管是执行正常结束onResponse还是异常结束onFailure都会调用这个方法。

    private void decPendingAndFinishIfNeeded() {
            assert pendingActions.get() > 0 : "pending action count goes below 0 for request [" + request + "]";
            if (pendingActions.decrementAndGet() == 0) {//当所有的shard都处理完这个请求,client收到ACK(里面允许一些replica执行失败), 或者是收到一个请求超时的响应
                finish();
            }
        }

    对于发起index操作的Client而言,该 index 操作会由primary shard 执行,也会由若干个replica执行。因此,pendingActions统计到底有多少个分片(既包括主分片也包括副本分片)执行完成(在某些副本分片上执行失败也算执行完成)了。正是由于不管是 onResponse() 还是 onFailure(),都会执行decPendingAndFinishIfNeeded()方法,每执行一次,意味着有一个分片返回了响应,这时if (pendingActions.decrementAndGet() == 0)就减1,直到减为0时,调用finish()方法给Client返回ACK响应。

    private void finish() {
        if (finished.compareAndSet(false, true)) {
            final ReplicationResponse.ShardInfo.Failure[] failuresArray;
            if (shardReplicaFailures.isEmpty()) {
                failuresArray = ReplicationResponse.EMPTY;
            } else {
                failuresArray = new ReplicationResponse.ShardInfo.Failure[shardReplicaFailures.size()];
                shardReplicaFailures.toArray(failuresArray);
            }
            primaryResult.setShardInfo(new ReplicationResponse.ShardInfo(
                    totalShards.get(),
                    successfulShards.get(),
                    failuresArray
                )
            );
            resultListener.onResponse(primaryResult);
        }
    }

Client要么收到一个执行成功的ACK(默认情况下,只要primary shard执行成功,若存在 replica执行失败,Client也会收到一个执行成功的ACK,只不过 返回的ACK里面 _shards参数下的 failed 不为0而已),如下:

{
"_index": "user",
"_type": "profile",
"_id": "10",
"_version": 1,
"result": "created",
"_shards": {
? "total": 3,
? "successful": 1,
? "failed": 0
},
"_seq_no": 0,
"_primary_term": 1
}

另外,ES6.3.2 index操作源码流程 的总结部分,详细解释了Client收到执行成功的ACK的原因。

要么收到一个超时ACK,如下:(这篇文章提到了如何产生一个超时的ACK)

{
"statusCode": 504,
"error": "Gateway Time-out",
"message": "Client request timeout"
}

failShardIfNeeded方法一共有2个具体实现,看类图:

TransportReplicationAction.ReplicasProxy#failShardIfNeeded (默认实现)

@Override
        public void failShardIfNeeded(ShardRouting replica, String message, Exception exception,Runnable onSuccess, Consumer<Exception> onPrimaryDemoted, Consumer<Exception> onIgnoredFailure) {
            // This does not need to fail the shard. The idea is that this
            // is a non-write operation (something like a refresh or a global
            // checkpoint sync) and therefore the replica should still be
            // "alive" if it were to fail.
            onSuccess.run();
        }

TransportResyncReplicationAction.ResyncActionReplicasProxy#failShardIfNeeded(副本resync操作的实现)

/**
     * A proxy for primary-replica resync operations which are performed on replicas when a new primary is promoted.
     * Replica shards fail to execute resync operations will be failed but won't be marked as stale.
     * This avoids marking shards as stale during cluster restart but enforces primary-replica resync mandatory.
     */
    class ResyncActionReplicasProxy extends ReplicasProxy {
        @Override
        public void failShardIfNeeded(ShardRouting replica, String message, Exception exception, Runnable onSuccess,
                                      Consumer<Exception> onPrimaryDemoted, Consumer<Exception> onIgnoredFailure) {
            shardStateAction.remoteShardFailed(replica.shardId(), replica.allocationId().getId(), primaryTerm, false, message, exception,
                createShardActionListener(onSuccess, onPrimaryDemoted, onIgnoredFailure));
        }
    }

TransportWriteAction.WriteActionReplicasProxy#failShardIfNeeded(index 写操作的实现)

/**
 * A proxy for <b>write</b> operations that need to be performed on the
 * replicas, where a failure to execute the operation should fail
 * the replica shard and/or mark the replica as stale.
 *
 * This extends {@code TransportReplicationAction.ReplicasProxy} to do the
 * failing and stale-ing.
 */
class WriteActionReplicasProxy extends ReplicasProxy {

    @Override
    public void failShardIfNeeded(ShardRouting replica, String message, Exception exception,Runnable onSuccess, Consumer<Exception> onPrimaryDemoted, Consumer<Exception> onIgnoredFailure) {
        if (TransportActions.isShardNotAvailableException(exception) == false) {
            logger.warn(new ParameterizedMessage("[{}] {}", replica.shardId(), message), exception);}
        shardStateAction.remoteShardFailed(replica.shardId(), replica.allocationId().getId(), primaryTerm, true, message, exception,
            createShardActionListener(onSuccess, onPrimaryDemoted, onIgnoredFailure));
    }

原文:https://www.cnblogs.com/hapjin/p/10585555.html

原文地址:https://www.cnblogs.com/hapjin/p/10585555.html

时间: 2024-08-04 00:14:15

ES6.3.2 副本失败处理的相关文章

剖析Elasticsearch集群系列之二:分布式的三个C、translog和Lucene段

转载:http://www.infoq.com/cn/articles/anatomy-of-an-elasticsearch-cluster-part02 共识——裂脑问题及法定票数的重要性 共识是分布式系统的一项基本挑战.它要求系统中的所有进程/节点必须对给定数据的值/状态达成共识.已经有很多共识算法诸如Raft.Paxos等,从数学上的证明了是行得通的.但是,Elasticsearch却实现了自己的共识系统(zen discovery),Elasticsearch之父Shay Banon在

详解 Exchange 2013邮箱高可用

详解 Exchange 2013邮箱高可用 邮箱数据库及其包含的数据是任何 Exchange 组织最重要的组件之一.     在 Microsoft Exchange Server 2013 中,可以通过配置邮箱数据库以实现高可用性和站点恢复来保护邮箱数据库及其包含的数据.Exchange 2013 在提供更高级别的端到端可用性和支持较大的邮箱的同时,还可以减少部署具有高可用性和恢复能力的邮件解决方案的成本和复杂性.Exchange 2013 构建于 Exchange 2010 中的本机复制功能

6 C++ Boost 函数对象

6 C++ Boost 函数对象 目录: 关于bind bind2nd程序 bind与bind2nd,效果一样 bind1st 减法 bind1st 与bind 做减法 bind2nd调用仿函数 bind 不需要ptr_fun适配 std:bind2nd 与 boost:bind 当参数大于2个,std::bind已经没办法了,boost::bind限10个 bind_api[图] bind用于函数 以及 函数指针 bind用于函数对象 bind用于函数对象,(用引用避免函数对象的拷贝) bin

游戏开发中的心理学(一):认知失调有前提条件

 以下为第一篇:游戏中的心理学(一):认知失调 游戏业属于服务业,而我们服务的对象就是玩家.我们想要做好一款游戏,除了必要的专业知识,对服务对象的了解程度也非常重要. 笔者最近自学了一点心理学的皮毛,在这里尝试用心理学知识来分析玩家的心理和行为,今天先说说"认知失调理论". 认知失调会让玩家对体验差的游戏作出"好玩"的评价 "认知失调理论"是最让人诧异的理论,费斯廷格的这个理论是这样的:"当人们陷入一种很荒谬的情况时,他们就会想出一

babel吐槽

1. .babelrc文件无法复制 每次复制项目文件,.babelrc文件都会丢失,导致项目的ES6莫名的编译失败,最可能出现的错误就是Unexpected token import错误,import首先报错 记录这个坑,以后对.babelrc多加注意

分布式入门之2:Quorum机制

1.  全写读1(write all, read one) 全写读1是最直观的副本控制规则.写时,只有全部副本写成功,才算是写成功.这样,读取时只需要从其中一个副本上读数据,就能保证正确性. 这种规则需要解决一个问题:如果是一个kv系统,对某个key的第i次写如果只有部分成功,那么系统中既存在次i次写的结果,又存在着第i-1次写的结果.而根据规则,生效的仅仅是第i-1个版本.因此,需要全局性地记录某个key对应的数据目前的版本号i-1.这个元数据可能为是系统的瓶颈. 可用性:对于写操作,虽然有N

FAQ收集

FAQ 1.为什么我在Chrome下,认证上传营业执照副本失败 小码哥:可能是图片超过10M.建议图片大小控制在200K-1M,方便快速上传. 要是还是无法上传,则私信给@小码哥  ,小码哥会在两天内联系你. 2.我们公司研发部的同事在上面注册了公司信息,现在我却无法注册,这个问题该如何解决? 小码哥:请留言给小码哥,@小码哥  将为你解决这个问题.留言的时候也添加上你的 IM 联系方式:) 不同的事业群.分公司若出现已有同事注册了公司信息,而你现在无法注册,请留言给@小码哥 3.企业认证后,以

GFS浅析

1 . 简介 GFS, Big Table, Map Reduce称为Google的三驾马车,是许多基础服务的基石 GFS于2003年提出,是一个分布式的文件系统,与此前的很多分布式系统的前提假设存在很大的不同,适用于以下场景 1)认为组件失效是一种常态,提供了容错机制,自动负载均衡,使得分布式文件系统可以在廉价机器上运行 2)面向大文件存储,系统主要的工作负载是大规模的流式读取,写操作主要是追加方式写,很少有随机写 3)一次写入,多次读取,例如互联网上的网页存储 2. 架构 一个GFS集群由一

elasticsearch系列七:ES Java客户端-Elasticsearch Java client(ES Client 简介、Java REST Client、Java Client、Spring Data Elasticsearch)

一.ES Client 简介 1. ES是一个服务,采用C/S结构 2. 回顾 ES的架构 3. ES支持的客户端连接方式 3.1 REST API ,端口 9200 这种连接方式对应于架构图中的RESTful style API这一层,这种客户端的连接方式是RESTful风格的,使用http的方式进行连接 3.2 Transport 连接 端口 9300 这种连接方式对应于架构图中的Transport这一层,这种客户端连接方式是直接连接ES的节点,使用TCP的方式进行连接 4. ES提供了多种