Hystrix源码

HystrixInvocationHandler.invoke()--->HystrixCommand.execute()--->queue()--->toObservable().toBlocking.toFuture()--->toFuture方法中that.single().subscribe()订阅subscriber

而生成Observable的逻辑是:toObservable--->applyHystrixSemantics(cmd)--->executeCommandAndObserve()--->executeCommandWithSpecifiedIsolation()--->getUserExecutionObservable()

--->getExecutionObservable()--->Observable.just(run())/Observable.error(ex); run方法中是feign和ribbon的请求逻辑。

去除掉一些空方法或者无用的逻辑以及defer的部分,生成的Observable就是Observable.just(run())

.doOnTerminate().doOnUnsubscribe().subscribeOn()---在executeCommandWithSpecifiedIsolation方法中

.lift(new HystrixObservableTimeoutOperator<R>(_cmd)).doOnNext(markEmits).doOnCompleted(markOnCompleted).onErrorResumeNext(handleFallback).doOnEach(setRequestContext)---executeCommandAndObserve方法中

.doOnError(markExceptionThrown).doOnTerminate(singleSemaphoreRelease).doOnUnsubscribe(singleSemaphoreRelease)---applyHystrixSemantics方法中

.doOnTerminate(terminateCommandCleanup).doOnUnsubscribe(unsubscribeCommandCleanup).doOnCompleted(fireOnCompletedHook)---toObservable方法中

toObservable方法中,经过上面的十多个方法,一层一层的装饰justObservable。然后在toObservable().subscribe(原始subscriber)方法中,一层一层的剥离得到justObservable的同时,一层层终极subscriber(简称终极subscriber)。最后会师的时候,执行

justObservable.onSubscribe的call(终极的subscriber) ,调用终极的subscriber的链式的一路setProducer(WeakSingleProducer producer)下去,直到原始subscriber没有子subscriber为止(注意中间有可能

被lift给截断),就开始调用producer的request方法,因为producer里面有终极的subscriber的引用,request开始调用终极的subscriber(没被取消订阅unsubscribe的)的onNext,

onComplete方法一路再这样链式一直到原始subscriber。-------------------------------------------------------------------------------------------------------------------------------------------Hystrix的超时原理:

在上面的lift(new HystrixObservableTimeoutOperator<R>(_cmd))中,HystrixObservableTimeoutOperator的call方法中,新建一个TimerListener,开启一个定时任务,放入HystrixTimer这个线程池中,这个设计有意思,这个定时任务在

getIntervalTimeInMilliseconds也就是originalCommand.properties.executionTimeoutInMilliseconds的时间后,执行一个originalCommand.isCommandTimedOut.compareAndSet(TimedOutStatus.NOT_EXECUTED, TimedOutStatus.TIMED_OUT)

的操作,如果成功了,说明正常的just(run)方法的后续操作还没有到达这一步,也就是说明超时了。如果超时了,会执行两个方法,先说第一个:

CompositeSubscription s.unsubscribe,这是在新开的线程的1秒钟以后的定时任务方法中,这个方法执行之前,异步的“主”线程中已经执行了s.add(parent),这个parent是Subscriber链中的一员。

s.unsubscribe会调用parent的unsubscribe,在上面说的subscriber链式调用中,开始的时候是WeakSingleProducer的request方法,判断,终极subscriber.isUnsubscribed,如果是,返回不执行下面的onNext和onCompleted,

那么问题来了,刚才说的CompositeSubscription的unsubscribe只是解除了parent的订阅,而parent只是终极的subscriber的递归链中的一员,为什么parent的订阅的解除可以引起终极的subscriber的订阅的解除呢?

注意subscriber的装饰过程中,构造方法的shareSubscriptions总是true,也就是subscriptions是共享的,链中所有的subscriber的subscriptions都指向同一个对象,而且subscriptions的unsubscribed是volatile的,

所以是线程可见的。s.add(parent)可以被子线程的s.unsubscribe看到是因为对subscriptionList的操作都是在synchronized内部的。-------------------------------------------------------------------------------------------------------------------------------------------如果在1秒钟内完成请求,就会调用onNext一路然后再onCompleted一路,注意调用onNext的是上面说的终极subscriber,现在开始一层一层剥离了,上面14个封装里的onNext,最先被调用的是最外层的

.doOnCompleted(fireOnCompletedHook),不过它的onNext是空,直到executeCommandAndObserve方法中的execution.doOnNext(markEmits),里面有circuitBreaker.markSuccess(),就是如果断路器现在是开着

的,当前的访问是漏网之鱼,那么就把断路器关掉,这个暂时不管,然后到最后也没什么主要的逻辑。再看onCompleted,首先执行terminateCommandCleanup,handleCommandEnd,metrics.markCommandDone

,HystrixThreadEventStream.getInstance().executionDone,writeOnlyCommandCompletionSubject.onNext(event)。这个是告诉hystrix的统计功能,这次请求的结果类型是成功了还是失败了,是用来计算成功率以

决定断路器要不要开启。-------------------------------------------------------------------------------------------------------------------------------------------circuitBreaker的获取是从一个静态concurrentHashMap,key是feignClient名+方法名,也就是说断路器是方法级别的。-------------------------------------------------------------------------------------------------------------------------------------------HystrixCommand是请求级别的,每一次请求都会实例化一个-------------------------------------------------------------------------------------------------------------------------------------------判断断路器有没有开启是在applyHystrixSemantics中circuitBreaker.allowRequest(),点进去

允许请求的话是判断断路器是不是没有开启:!isOpen(),或者另外一种情况:在断路器开启的情况下,每隔circuitBreakerSleepWindowInMilliseconds的时间要试探性的访问一次

先看isOpen的逻辑:从metrics取出metrics.getHealthCounts(),判断getTotalRequests和getErrorPercentage,总访问量和失败比例,点进getHealthCounts,healthCountsStream.getLatest()。

healthCountsStream的生成是在metrics的构造方法中,HealthCountsStream.getInstance(key, properties); 点进去看,

HealthCountsStream继承了BucketedRollingCounterStream,BucketedRollingCounterStream继承BucketedCounterStream,并在构造方法中生成一个HystrixCommandCompletionStream。

BucketedRollingCounterStream的核心属性sourceStream是父类BucketedCounterStream的核心属性bucketedStream经过一系列的封装而来,而bucketedStream又是上面说的HystrixCommandCompletionStream

经过一系列的封装而来。

healthCountsStream.getLatest()调用的是counterSubject.getValue(),而new HealthCountsStream的时候调用的startCachingStreamValuesIfUnstarted方法中,BucketedRollingCounterStream的sourceStream

属性会subscribe(counterSubject),这样HealthCounts就和BucketedRollingCounterStream联系上了。

每次请求成功后,终极Subscriber的onCompleted会调用handleCommandEnd,metrics.markCommandDone,会调用HystrixThreadEventStream的writeOnlyCommandCompletionSubject.onNext(event);

HystrixThreadEventStream的构造方法中,会让writeOnlyCommandCompletionSubject.doOnNext(writeCommandCompletionsToShardedStreams),writeCommandCompletionsToShardedStreams是一个Action,

其call方法中,调用HystrixCommandCompletionStream的write方法,又知道HystrixCommandCompletionStream是BucketedRollingCounterStream的核心。这样请求成功就可以反映到HealthCounts上了。

而BucketedCounterStream中bucketedStream初始化的时候,inputEventStream.window方法中,启动一个定时任务定时获取数据进行统计。

原文地址:https://www.cnblogs.com/chuliang/p/11705028.html

时间: 2024-11-04 13:07:23

Hystrix源码的相关文章

Hystrix源码解析

1. Hystrix源码解析 1.1. @HystrixCommand原理 直接通过Aspect切面来做的 1.2. feign hystrix原理 它的本质原理就是对HystrixCommand的动态代理封装 1.2.1. 如何启动Feign hystrix的? 在openFeign里,已经封装了Hystrix,当feign.hystrix.enabled为true时生效 Github地址:https://github.com/tzxylao/learn-demo 原文地址:https://w

3.Hystrix源码-熔断策略

前言 强制开关 正常策略 流量探测 小结 请求统计方式 前言 上一篇讲解了hystrix四个接口的关系和调用流程,这一节主要讲解一下在住流程中熔断策略是怎样的,是如何判断应用是否该进行熔断的 强制开关 在流量进来的时候,会经过这个方法来判断现在的熔断开关状态,如果为true则允许流量通过,如果为false则进入fallback阶段 @Override public boolean allowRequest() {     if (properties.circuitBreakerForceOpe

hystrix源码小贴士之调用异常处理

executeCommandAndObserve方法处理onerror异常. return execution.doOnNext(markEmits) .doOnCompleted(markOnCompleted) .onErrorResumeNext(handleFallback) .doOnEach(setRequestContext); handleFallback方法处理执行过程中的各种异常 final Func1<Throwable, Observable<R>> han

hystrix源码小贴士之中断

execution.isolation.thread.interruptOnCancel可以设置当cancellation发生时是否需要中断.通过Future的cancel方法和线程的中断方法来实现是否需要中断. public Future<R> queue() { /* * The Future returned by Observable.toBlocking().toFuture() does not implement the * interruption of the executi

hystrix源码小贴士之Yammer Publisher

HystrixYammerMetricsPublisher 继承HystrixMetricsPublisher,创建HystrixYammerMetricsPublisherCommand.HystrixYammerMetricsPublisherThreadPool.HystrixYammerMetricsPublisherCollapser. @Override public HystrixMetricsPublisherCommand getMetricsPublisherForComma

hystrix源码小贴士之Servo Publisher

HystrixServoMetricsPublisher 继承HystrixMetricsPublisher,创建HystrixServoMetricsPublisherCommand.HystrixServoMetricsPublisherThreadPool.HystrixServoMetricsPublisherCollapser. @Override public HystrixMetricsPublisherCommand getMetricsPublisherForCommand(H

turbine源码分析

turbine源码分析 1.turbine架构设计 一切从InstanceDiscovery模块开始,该模块提供所有的主机信息.它会定期的发送更新,ConnectionManager负责创建连接到主机.一旦建立起连接,数据流将源源不断的发送给Aggregator既聚合器.聚合器将数据汇聚后的数据输出到客户端或者下游监听者. 汇聚示例: {type:'weather-data-temp', name:'New York', temp:74} {type:'weather-data-temp', n

如何使用加拿大28源码下载理解线程池

平时接触过加拿大28源码下载[dashengba.com]Q3266397597多线程开发的童鞋应该都或多或少了解过线程池,之前发布的<阿里巴巴 Java 手册>里也有一条:可见线程池的重要性.简单来说使用线程池有以下几个目的:线程是稀缺资源,不能频繁的创建.解耦作用:线程的创建于执行完全分开,方便维护.应当将其放入一个池子中,可以给其他任务进行复用.线程池原理谈到线程池就会想到池化技术,其中最核心的思想就是把宝贵的资源放到一个池子中:每次使用都从里面获取,用完之后又放回池子供其他人使用,有点

SpringCloud(4)---Ribbon服务调用,源码分析

SpringCloud(4)---Ribbon 本篇模拟订单服务调用商品服务,同时商品服务采用集群部署. 注册中心服务端口号7001,订单服务端口号9001,商品集群端口号:8001.8002.8003. 各服务的配置文件这里我这边不在显示了,和上篇博客配置一样.博客地址:SpringCloud(3)---Eureka服务注册与发现 一.商品中心服务端 1.pom.xml <?xml version="1.0" encoding="UTF-8"?> &l