HystrixInvocationHandler.invoke()--->HystrixCommand.execute()--->queue()--->toObservable().toBlocking.toFuture()--->toFuture方法中that.single().subscribe()订阅subscriber 而生成Observable的逻辑是:toObservable--->applyHystrixSemantics(cmd)--->executeCommandAndObserve()--->executeCommandWithSpecifiedIsolation()--->getUserExecutionObservable() --->getExecutionObservable()--->Observable.just(run())/Observable.error(ex); run方法中是feign和ribbon的请求逻辑。 去除掉一些空方法或者无用的逻辑以及defer的部分,生成的Observable就是Observable.just(run()) .doOnTerminate().doOnUnsubscribe().subscribeOn()---在executeCommandWithSpecifiedIsolation方法中 .lift(new HystrixObservableTimeoutOperator<R>(_cmd)).doOnNext(markEmits).doOnCompleted(markOnCompleted).onErrorResumeNext(handleFallback).doOnEach(setRequestContext)---executeCommandAndObserve方法中 .doOnError(markExceptionThrown).doOnTerminate(singleSemaphoreRelease).doOnUnsubscribe(singleSemaphoreRelease)---applyHystrixSemantics方法中 .doOnTerminate(terminateCommandCleanup).doOnUnsubscribe(unsubscribeCommandCleanup).doOnCompleted(fireOnCompletedHook)---toObservable方法中 toObservable方法中,经过上面的十多个方法,一层一层的装饰justObservable。然后在toObservable().subscribe(原始subscriber)方法中,一层一层的剥离得到justObservable的同时,一层层终极subscriber(简称终极subscriber)。最后会师的时候,执行 justObservable.onSubscribe的call(终极的subscriber) ,调用终极的subscriber的链式的一路setProducer(WeakSingleProducer producer)下去,直到原始subscriber没有子subscriber为止(注意中间有可能 被lift给截断),就开始调用producer的request方法,因为producer里面有终极的subscriber的引用,request开始调用终极的subscriber(没被取消订阅unsubscribe的)的onNext, onComplete方法一路再这样链式一直到原始subscriber。-------------------------------------------------------------------------------------------------------------------------------------------Hystrix的超时原理: 在上面的lift(new HystrixObservableTimeoutOperator<R>(_cmd))中,HystrixObservableTimeoutOperator的call方法中,新建一个TimerListener,开启一个定时任务,放入HystrixTimer这个线程池中,这个设计有意思,这个定时任务在 getIntervalTimeInMilliseconds也就是originalCommand.properties.executionTimeoutInMilliseconds的时间后,执行一个originalCommand.isCommandTimedOut.compareAndSet(TimedOutStatus.NOT_EXECUTED, TimedOutStatus.TIMED_OUT) 的操作,如果成功了,说明正常的just(run)方法的后续操作还没有到达这一步,也就是说明超时了。如果超时了,会执行两个方法,先说第一个: CompositeSubscription s.unsubscribe,这是在新开的线程的1秒钟以后的定时任务方法中,这个方法执行之前,异步的“主”线程中已经执行了s.add(parent),这个parent是Subscriber链中的一员。 s.unsubscribe会调用parent的unsubscribe,在上面说的subscriber链式调用中,开始的时候是WeakSingleProducer的request方法,判断,终极subscriber.isUnsubscribed,如果是,返回不执行下面的onNext和onCompleted, 那么问题来了,刚才说的CompositeSubscription的unsubscribe只是解除了parent的订阅,而parent只是终极的subscriber的递归链中的一员,为什么parent的订阅的解除可以引起终极的subscriber的订阅的解除呢? 注意subscriber的装饰过程中,构造方法的shareSubscriptions总是true,也就是subscriptions是共享的,链中所有的subscriber的subscriptions都指向同一个对象,而且subscriptions的unsubscribed是volatile的, 所以是线程可见的。s.add(parent)可以被子线程的s.unsubscribe看到是因为对subscriptionList的操作都是在synchronized内部的。-------------------------------------------------------------------------------------------------------------------------------------------如果在1秒钟内完成请求,就会调用onNext一路然后再onCompleted一路,注意调用onNext的是上面说的终极subscriber,现在开始一层一层剥离了,上面14个封装里的onNext,最先被调用的是最外层的 .doOnCompleted(fireOnCompletedHook),不过它的onNext是空,直到executeCommandAndObserve方法中的execution.doOnNext(markEmits),里面有circuitBreaker.markSuccess(),就是如果断路器现在是开着 的,当前的访问是漏网之鱼,那么就把断路器关掉,这个暂时不管,然后到最后也没什么主要的逻辑。再看onCompleted,首先执行terminateCommandCleanup,handleCommandEnd,metrics.markCommandDone ,HystrixThreadEventStream.getInstance().executionDone,writeOnlyCommandCompletionSubject.onNext(event)。这个是告诉hystrix的统计功能,这次请求的结果类型是成功了还是失败了,是用来计算成功率以 决定断路器要不要开启。-------------------------------------------------------------------------------------------------------------------------------------------circuitBreaker的获取是从一个静态concurrentHashMap,key是feignClient名+方法名,也就是说断路器是方法级别的。-------------------------------------------------------------------------------------------------------------------------------------------HystrixCommand是请求级别的,每一次请求都会实例化一个-------------------------------------------------------------------------------------------------------------------------------------------判断断路器有没有开启是在applyHystrixSemantics中circuitBreaker.allowRequest(),点进去 允许请求的话是判断断路器是不是没有开启:!isOpen(),或者另外一种情况:在断路器开启的情况下,每隔circuitBreakerSleepWindowInMilliseconds的时间要试探性的访问一次 先看isOpen的逻辑:从metrics取出metrics.getHealthCounts(),判断getTotalRequests和getErrorPercentage,总访问量和失败比例,点进getHealthCounts,healthCountsStream.getLatest()。 healthCountsStream的生成是在metrics的构造方法中,HealthCountsStream.getInstance(key, properties); 点进去看, HealthCountsStream继承了BucketedRollingCounterStream,BucketedRollingCounterStream继承BucketedCounterStream,并在构造方法中生成一个HystrixCommandCompletionStream。 BucketedRollingCounterStream的核心属性sourceStream是父类BucketedCounterStream的核心属性bucketedStream经过一系列的封装而来,而bucketedStream又是上面说的HystrixCommandCompletionStream 经过一系列的封装而来。 healthCountsStream.getLatest()调用的是counterSubject.getValue(),而new HealthCountsStream的时候调用的startCachingStreamValuesIfUnstarted方法中,BucketedRollingCounterStream的sourceStream 属性会subscribe(counterSubject),这样HealthCounts就和BucketedRollingCounterStream联系上了。 每次请求成功后,终极Subscriber的onCompleted会调用handleCommandEnd,metrics.markCommandDone,会调用HystrixThreadEventStream的writeOnlyCommandCompletionSubject.onNext(event); HystrixThreadEventStream的构造方法中,会让writeOnlyCommandCompletionSubject.doOnNext(writeCommandCompletionsToShardedStreams),writeCommandCompletionsToShardedStreams是一个Action, 其call方法中,调用HystrixCommandCompletionStream的write方法,又知道HystrixCommandCompletionStream是BucketedRollingCounterStream的核心。这样请求成功就可以反映到HealthCounts上了。 而BucketedCounterStream中bucketedStream初始化的时候,inputEventStream.window方法中,启动一个定时任务定时获取数据进行统计。
原文地址:https://www.cnblogs.com/chuliang/p/11705028.html
时间: 2024-11-04 13:07:23