hystrix源码小贴士之中断

  execution.isolation.thread.interruptOnCancel可以设置当cancellation发生时是否需要中断。通过Future的cancel方法和线程的中断方法来实现是否需要中断。

public Future<R> queue() {
        /*
         * The Future returned by Observable.toBlocking().toFuture() does not implement the
         * interruption of the execution thread when the "mayInterrupt" flag of Future.cancel(boolean) is set to true;
         * thus, to comply with the contract of Future, we must wrap around it.
         */
        final Future<R> delegate = toObservable().toBlocking().toFuture();

        final Future<R> f = new Future<R>() {

            @Override
            public boolean cancel(boolean mayInterruptIfRunning) {
                if (delegate.isCancelled()) {
                    return false;
                }

                if (HystrixCommand.this.getProperties().executionIsolationThreadInterruptOnFutureCancel().get()) {
                    /*
                     * The only valid transition here is false -> true. If there are two futures, say f1 and f2, created by this command
                     * (which is super-weird, but has never been prohibited), and calls to f1.cancel(true) and to f2.cancel(false) are
                     * issued by different threads, it‘s unclear about what value would be used by the time mayInterruptOnCancel is checked.
                     * The most consistent way to deal with this scenario is to say that if *any* cancellation is invoked with interruption,
                     * than that interruption request cannot be taken back.
                     */
                    interruptOnFutureCancel.compareAndSet(false, mayInterruptIfRunning);
                }

                final boolean res = delegate.cancel(interruptOnFutureCancel.get());

                if (!isExecutionComplete() && interruptOnFutureCancel.get()) {
                    final Thread t = executionThread.get();
                    if (t != null && !t.equals(Thread.currentThread())) {
                        t.interrupt();
                    }
                }

                return res;
            }

            @Override
            public boolean isCancelled() {
                return delegate.isCancelled();
            }

            @Override
            public boolean isDone() {
                return delegate.isDone();
            }

            @Override
            public R get() throws InterruptedException, ExecutionException {
                return delegate.get();
            }

            @Override
            public R get(long timeout, TimeUnit unit) throws InterruptedException, ExecutionException, TimeoutException {
                return delegate.get(timeout, unit);
            }

        };

   execution.isolation.thread.interruptOnTimeout可以设置当发生timeout时,是否需要中断。通过getScheduler实现。

threadPool.getScheduler(new Func0<Boolean>() {
                @Override
                public Boolean call() {
                    return properties.executionIsolationThreadInterruptOnTimeout().get() && _cmd.isCommandTimedOut.get() == TimedOutStatus.TIMED_OUT;
                }
            })
private static class FutureCompleterWithConfigurableInterrupt implements Subscription {
        private final FutureTask<?> f;
        private final Func0<Boolean> shouldInterruptThread;
        private final ThreadPoolExecutor executor;

        private FutureCompleterWithConfigurableInterrupt(FutureTask<?> f, Func0<Boolean> shouldInterruptThread, ThreadPoolExecutor executor) {
            this.f = f;
            this.shouldInterruptThread = shouldInterruptThread;
            this.executor = executor;
        }

        @Override
        public void unsubscribe() {
            executor.remove(f);
            if (shouldInterruptThread.call()) {
                f.cancel(true);
            } else {
                f.cancel(false);
            }
        }

        @Override
        public boolean isUnsubscribed() {
            return f.isCancelled();
        }
    }

原文地址:https://www.cnblogs.com/zhangwanhua/p/8116997.html

时间: 2024-11-04 16:10:58

hystrix源码小贴士之中断的相关文章

hystrix源码小贴士之调用异常处理

executeCommandAndObserve方法处理onerror异常. return execution.doOnNext(markEmits) .doOnCompleted(markOnCompleted) .onErrorResumeNext(handleFallback) .doOnEach(setRequestContext); handleFallback方法处理执行过程中的各种异常 final Func1<Throwable, Observable<R>> han

hystrix源码小贴士之Yammer Publisher

HystrixYammerMetricsPublisher 继承HystrixMetricsPublisher,创建HystrixYammerMetricsPublisherCommand.HystrixYammerMetricsPublisherThreadPool.HystrixYammerMetricsPublisherCollapser. @Override public HystrixMetricsPublisherCommand getMetricsPublisherForComma

hystrix源码小贴士之Servo Publisher

HystrixServoMetricsPublisher 继承HystrixMetricsPublisher,创建HystrixServoMetricsPublisherCommand.HystrixServoMetricsPublisherThreadPool.HystrixServoMetricsPublisherCollapser. @Override public HystrixMetricsPublisherCommand getMetricsPublisherForCommand(H

ViewAnimator实例源码小Demo+Tab例子

ViewAnimator实例源码小Demo+Tab例子,仅供学习~ 例子中主要有ImageSwitcher.TextSwitcher.ViewFilpper.Tabs(ActionBar)的使用 源码下载地址:http://yunpan.cn/QacbksIx2Snme (提取码:acc0) ViewAnimator实例源码小Demo+Tab例子

Hystrix源码解析

1. Hystrix源码解析 1.1. @HystrixCommand原理 直接通过Aspect切面来做的 1.2. feign hystrix原理 它的本质原理就是对HystrixCommand的动态代理封装 1.2.1. 如何启动Feign hystrix的? 在openFeign里,已经封装了Hystrix,当feign.hystrix.enabled为true时生效 Github地址:https://github.com/tzxylao/learn-demo 原文地址:https://w

Android源码——小苏闹钟

小苏闹钟是一款非常有趣的闹钟.本闹钟和其他闹钟的不同在于独特的取消闹钟的方法,非常适合爱赖床的朋友使用. 取消闹钟的三种方式: 1.做算术题.闹钟响的同时会随机产生算术题,只有做对了指定的题才能取消闹钟,做题次数可自定义. 2.摇晃手机.根据摇晃手机的力度判断清醒程度,清醒程度达到100%取消闹钟,摇晃手机灵敏度可在设置里设置. 下载地址:http://www.devstore.cn/code/info/1136.html 运行截图:     热门源码下载: 高仿京东商城 Android快速开发

【干货分享】影音娱乐类源码小合集

Android源码仿天天动听音乐播放器,可联网下载歌词 简介 Android源码仿天天动听音乐播放器,可联网下载歌词. 下载地址:http://www.devstore.cn/code/info/203.html  源码运行截图   Android应用源码仿暴风影音安卓客户端源码 简介 本项目是一个模仿暴风影音的UI项目源码,仿照的界面有菜单页,主页,分类页等,项目内的所有数据都使用的本地模拟数据,仿照度一般在大分辨设备上布局显示会有问题,480x800的分辨率应该正合适,默认编译版本4.2.2

3.Hystrix源码-熔断策略

前言 强制开关 正常策略 流量探测 小结 请求统计方式 前言 上一篇讲解了hystrix四个接口的关系和调用流程,这一节主要讲解一下在住流程中熔断策略是怎样的,是如何判断应用是否该进行熔断的 强制开关 在流量进来的时候,会经过这个方法来判断现在的熔断开关状态,如果为true则允许流量通过,如果为false则进入fallback阶段 @Override public boolean allowRequest() {     if (properties.circuitBreakerForceOpe

Hystrix源码

HystrixInvocationHandler.invoke()--->HystrixCommand.execute()--->queue()--->toObservable().toBlocking.toFuture()--->toFuture方法中that.single().subscribe()订阅subscriber 而生成Observable的逻辑是:toObservable--->applyHystrixSemantics(cmd)--->executeC