hystrix源码小贴士之调用异常处理

  executeCommandAndObserve方法处理onerror异常。

return execution.doOnNext(markEmits)
                .doOnCompleted(markOnCompleted)
                .onErrorResumeNext(handleFallback)
                .doOnEach(setRequestContext);

  handleFallback方法处理执行过程中的各种异常

final Func1<Throwable, Observable<R>> handleFallback = new Func1<Throwable, Observable<R>>() {
            @Override
            public Observable<R> call(Throwable t) {
                Exception e = getExceptionFromThrowable(t);
                executionResult = executionResult.setExecutionException(e);
                if (e instanceof RejectedExecutionException) {
                    return handleThreadPoolRejectionViaFallback(e);
                } else if (t instanceof HystrixTimeoutException) {
                    return handleTimeoutViaFallback();
                } else if (t instanceof HystrixBadRequestException) {
                    return handleBadRequestByEmittingError(e);
                } else {
                    /*
                     * Treat HystrixBadRequestException from ExecutionHook like a plain HystrixBadRequestException.
                     */
                    if (e instanceof HystrixBadRequestException) {
                        eventNotifier.markEvent(HystrixEventType.BAD_REQUEST, commandKey);
                        return Observable.error(e);
                    }

                    return handleFailureViaFallback(e);
                }
            }
        };

   handleThreadPoolRejectionViaFallback、handleTimeoutViaFallback、handleBadRequestByEmittingError、handleFailureViaFallback最终都会调用getFallbackOrThrowException来处理各种异常。

  getFallbackOrThrowException方法执行fallback方法并返回结果,如果执行过程中异常,返回异常信息。

private Observable<R> getFallbackOrThrowException(final AbstractCommand<R> _cmd, final HystrixEventType eventType, final FailureType failureType, final String message, final Exception originalException) {
        ...
                Observable<R> fallbackExecutionChain;

                // acquire a permit
                if (fallbackSemaphore.tryAcquire()) {
                    try {
                        if (isFallbackUserDefined()) {
                            executionHook.onFallbackStart(this);
                            fallbackExecutionChain = getFallbackObservable();
                        } else {
                            //same logic as above without the hook invocation
                            fallbackExecutionChain = getFallbackObservable();
                        }
                    } catch (Throwable ex) {
                        //If hook or user-fallback throws, then use that as the result of the fallback lookup
                        fallbackExecutionChain = Observable.error(ex);
                    }

                    return fallbackExecutionChain
                            .doOnEach(setRequestContext)
                            .lift(new FallbackHookApplication(_cmd))
                            .doOnNext(markFallbackEmit)
                            .doOnCompleted(markFallbackCompleted)
                            .onErrorResumeNext(handleFallbackError)
                            .doOnTerminate(singleSemaphoreRelease)
                            .doOnUnsubscribe(singleSemaphoreRelease);
                } else {
                   return handleFallbackRejectionByEmittingError();
                }
            } else {
                return handleFallbackDisabledByEmittingError(originalException, failureType, message);
            }
        }
    }

  handleFallbackError方法,返回一次信息

final Func1<Throwable, Observable<R>> handleFallbackError = new Func1<Throwable, Observable<R>>() {
                    @Override
                    public Observable<R> call(Throwable t) {
                        Exception e = originalException;
                        Exception fe = getExceptionFromThrowable(t);

                        if (fe instanceof UnsupportedOperationException) {
                            long latency = System.currentTimeMillis() - executionResult.getStartTimestamp();
                            logger.debug("No fallback for HystrixCommand. ", fe); // debug only since we‘re throwing the exception and someone higher will do something with it
                            eventNotifier.markEvent(HystrixEventType.FALLBACK_MISSING, commandKey);
                            executionResult = executionResult.addEvent((int) latency, HystrixEventType.FALLBACK_MISSING);

                            /* executionHook for all errors */
                            e = wrapWithOnErrorHook(failureType, e);

                            return Observable.error(new HystrixRuntimeException(failureType, _cmd.getClass(), getLogMessagePrefix() + " " + message + " and no fallback available.", e, fe));
                        } else {
                            long latency = System.currentTimeMillis() - executionResult.getStartTimestamp();
                            logger.debug("HystrixCommand execution " + failureType.name() + " and fallback failed.", fe);
                            eventNotifier.markEvent(HystrixEventType.FALLBACK_FAILURE, commandKey);
                            executionResult = executionResult.addEvent((int) latency, HystrixEventType.FALLBACK_FAILURE);

                            /* executionHook for all errors */
                            e = wrapWithOnErrorHook(failureType, e);

                            return Observable.error(new HystrixRuntimeException(failureType, _cmd.getClass(), getLogMessagePrefix() + " " + message + " and fallback failed.", e, fe));
                        }
                    }
                };
时间: 2024-11-04 02:22:46

hystrix源码小贴士之调用异常处理的相关文章

hystrix源码小贴士之中断

execution.isolation.thread.interruptOnCancel可以设置当cancellation发生时是否需要中断.通过Future的cancel方法和线程的中断方法来实现是否需要中断. public Future<R> queue() { /* * The Future returned by Observable.toBlocking().toFuture() does not implement the * interruption of the executi

hystrix源码小贴士之Yammer Publisher

HystrixYammerMetricsPublisher 继承HystrixMetricsPublisher,创建HystrixYammerMetricsPublisherCommand.HystrixYammerMetricsPublisherThreadPool.HystrixYammerMetricsPublisherCollapser. @Override public HystrixMetricsPublisherCommand getMetricsPublisherForComma

hystrix源码小贴士之Servo Publisher

HystrixServoMetricsPublisher 继承HystrixMetricsPublisher,创建HystrixServoMetricsPublisherCommand.HystrixServoMetricsPublisherThreadPool.HystrixServoMetricsPublisherCollapser. @Override public HystrixMetricsPublisherCommand getMetricsPublisherForCommand(H

RxJava &amp;&amp; Agera 从源码简要分析基本调用流程(2)

版权声明:本文由晋中望原创文章,转载请注明出处: 文章原文链接:https://www.qcloud.com/community/article/124 来源:腾云阁 https://www.qcloud.com/community 接上篇RxJava && Agera 从源码简要分析基本调用流程(1)我们从"1.订阅过程"."2.变换过程"进行分析,下篇文章我们继续分析"3.线程切换过程" 3.线程切换过程 从上文中我们知道了R

ViewAnimator实例源码小Demo+Tab例子

ViewAnimator实例源码小Demo+Tab例子,仅供学习~ 例子中主要有ImageSwitcher.TextSwitcher.ViewFilpper.Tabs(ActionBar)的使用 源码下载地址:http://yunpan.cn/QacbksIx2Snme (提取码:acc0) ViewAnimator实例源码小Demo+Tab例子

Hystrix源码解析

1. Hystrix源码解析 1.1. @HystrixCommand原理 直接通过Aspect切面来做的 1.2. feign hystrix原理 它的本质原理就是对HystrixCommand的动态代理封装 1.2.1. 如何启动Feign hystrix的? 在openFeign里,已经封装了Hystrix,当feign.hystrix.enabled为true时生效 Github地址:https://github.com/tzxylao/learn-demo 原文地址:https://w

Android源码——小苏闹钟

小苏闹钟是一款非常有趣的闹钟.本闹钟和其他闹钟的不同在于独特的取消闹钟的方法,非常适合爱赖床的朋友使用. 取消闹钟的三种方式: 1.做算术题.闹钟响的同时会随机产生算术题,只有做对了指定的题才能取消闹钟,做题次数可自定义. 2.摇晃手机.根据摇晃手机的力度判断清醒程度,清醒程度达到100%取消闹钟,摇晃手机灵敏度可在设置里设置. 下载地址:http://www.devstore.cn/code/info/1136.html 运行截图:     热门源码下载: 高仿京东商城 Android快速开发

Xutils源码文件下载方法的调用流程

//我主要是好奇Xutils是在哪里回调onLoading(),找老半天没找到,好不容易找到就写下来吧 前言: 1.仅对主要代码行进行摘要,提供大致流程 2.为便于理解,本文变量名不同于源码变量名,而是类名的驼峰式写法.如源码中:WorkRunnable mWorker,在本文中为 workRunnable 3.需要配合看Xutils的源码,可以让你开Xutils源代码时减少一定的难度 代码主干: HttpFragment: //HttpFragment是Xutils自带例子中的一个类 http

dubbo源码阅读笔记--服务调用时序

上接dubbo源码阅读笔记--暴露服务时序,继续梳理服务调用时序,下图右面红线流程. 整理了调用时序图 分为3步,connect,decode,invoke. 连接 AllChannelHandler.connected(Channel) line: 38 HeartbeatHandler.connected(Channel) line: 47 MultiMessageHandler(AbstractChannelHandlerDelegate).connected(Channel) line: