execution.isolation.thread.interruptOnCancel可以设置当cancellation发生时是否需要中断。通过Future的cancel方法和线程的中断方法来实现是否需要中断。
public Future<R> queue() { /* * The Future returned by Observable.toBlocking().toFuture() does not implement the * interruption of the execution thread when the "mayInterrupt" flag of Future.cancel(boolean) is set to true; * thus, to comply with the contract of Future, we must wrap around it. */ final Future<R> delegate = toObservable().toBlocking().toFuture(); final Future<R> f = new Future<R>() { @Override public boolean cancel(boolean mayInterruptIfRunning) { if (delegate.isCancelled()) { return false; } if (HystrixCommand.this.getProperties().executionIsolationThreadInterruptOnFutureCancel().get()) { /* * The only valid transition here is false -> true. If there are two futures, say f1 and f2, created by this command * (which is super-weird, but has never been prohibited), and calls to f1.cancel(true) and to f2.cancel(false) are * issued by different threads, it‘s unclear about what value would be used by the time mayInterruptOnCancel is checked. * The most consistent way to deal with this scenario is to say that if *any* cancellation is invoked with interruption, * than that interruption request cannot be taken back. */ interruptOnFutureCancel.compareAndSet(false, mayInterruptIfRunning); } final boolean res = delegate.cancel(interruptOnFutureCancel.get()); if (!isExecutionComplete() && interruptOnFutureCancel.get()) { final Thread t = executionThread.get(); if (t != null && !t.equals(Thread.currentThread())) { t.interrupt(); } } return res; } @Override public boolean isCancelled() { return delegate.isCancelled(); } @Override public boolean isDone() { return delegate.isDone(); } @Override public R get() throws InterruptedException, ExecutionException { return delegate.get(); } @Override public R get(long timeout, TimeUnit unit) throws InterruptedException, ExecutionException, TimeoutException { return delegate.get(timeout, unit); } };
execution.isolation.thread.interruptOnTimeout可以设置当发生timeout时,是否需要中断。通过getScheduler实现。
threadPool.getScheduler(new Func0<Boolean>() { @Override public Boolean call() { return properties.executionIsolationThreadInterruptOnTimeout().get() && _cmd.isCommandTimedOut.get() == TimedOutStatus.TIMED_OUT; } })
private static class FutureCompleterWithConfigurableInterrupt implements Subscription { private final FutureTask<?> f; private final Func0<Boolean> shouldInterruptThread; private final ThreadPoolExecutor executor; private FutureCompleterWithConfigurableInterrupt(FutureTask<?> f, Func0<Boolean> shouldInterruptThread, ThreadPoolExecutor executor) { this.f = f; this.shouldInterruptThread = shouldInterruptThread; this.executor = executor; } @Override public void unsubscribe() { executor.remove(f); if (shouldInterruptThread.call()) { f.cancel(true); } else { f.cancel(false); } } @Override public boolean isUnsubscribed() { return f.isCancelled(); } }
原文地址:https://www.cnblogs.com/zhangwanhua/p/8116997.html
时间: 2024-11-04 16:10:58