Introduction
1、Where does the name come from?
hystrix对应的中文名字是“豪猪”,豪猪周身长满了刺,能保护自己不受天敌的伤害,代表了一种防御机制,这与hystrix本身的功能不谋而合,因此Netflix团队将该框架命名为Hystrix,并使用了对应的卡通形象做作为logo。
2、What Is Hystrix?
在一个分布式系统里,许多依赖不可避免的会调用失败,比如超时、异常等,如何能够保证在一个依赖出问题的情况下,不会导致整体服务失败,这个就是Hystrix需要做的事情。Hystrix提供了熔断、隔离、Fallback、cache、监控等功能,能够在一个、或多个依赖同时出现问题时保证系统依然可用。
3、Hello Hystrix
1 public class CommandHelloWorld extends HystrixCommand<String> { 2 3 private final String name; 4 5 public CommandHelloWorld(String name) { 6 super(HystrixCommandGroupKey.Factory.asKey("ExampleGroup")); //必须 7 this.name = name; 8 } 9 10 @Override 11 protected String run() { 12 /* 13 网络调用 或者其他一些业务逻辑,可能会超时或者抛异常 14 */ 15 return "Hello " + name + "!"; 16 } 17 } 18 19 String s = new CommandHelloWorld("Bob").execute(); // 20 Future<String> s = new CommandHelloWorld("Bob").queue(); 21 Observable<String> s = new CommandHelloWorld("Bob").observe(); 22 Observable<String> s = new CommandHelloWorld("Bob").toObservable()
说明:
execute()
— blocks, then returns the single response received from the dependency (or throws an exception in case of an error)queue()
— returns aFuture
with which you can obtain the single response from the dependencyobserve()
— subscribes to theObservable
that represents the response(s) from the dependency and returns anObservable
that replicates that sourceObservable
toObservable()
— returns anObservable
that, when you subscribe to it, will execute the Hystrix command and emit its responses
4、Flow Chart
说明:
- Construct a HystrixCommand or HystrixObservableCommand Object
- Execute the Command
- Is the Response Cached?
- Is the Circuit Open?
- Is the Thread Pool/Queue/Semaphore Full?
- HystrixObservableCommand.construct() or HystrixCommand.run()
- Calculate Circuit Health
- Get the Fallback
- Return the Successful Response
常用功能介绍
依赖隔离
一个用户请求的成功执行,肯能依赖数十上百个外部服务,如果没有隔离,单个依赖的失败,可能会印象其他依赖的正常执行。如下图所示,为每个依赖配置了单独线程池
在下图中,当Dep I 出现问题时,DepA 和Dep M大以来可以正常执行
线程池隔离的使用例子
1 public class CommandHelloWorld extends HystrixCommand<String> { 2 3 private final String name; 4 5 public CommandHelloWorld(String name) { 6 super(Setter.withGroupKey(HystrixCommandGroupKey.Factory.asKey("ExampleGroup")) //必须 7 .andThreadPoolKey(HystrixThreadPoolKey.Factory.asKey("ExampleGroup-pool")) //可选,默认 使用 this.getClass().getSimpleName(); 8 .andThreadPoolPropertiesDefaults(HystrixThreadPoolProperties.Setter().withCoreSize(4))); 9 10 this.name = name; 11 } 12 13 @Override 14 protected String run() throws InterruptedException { 15 System.out.println("running"); 16 TimeUnit.MILLISECONDS.sleep(1000); 17 return "Hello " + name + "!"; 18 } 19 20 }
线程池常用参数设置:
实现类:HystrixThreadPoolProperties
名称 |
类型 |
含义 |
默认值 |
---|---|---|---|
coreSize |
Integer |
线程池大小 | 10 |
maxQueueSize |
Integer |
队列大小,一经初始化后不能修改 | -1 |
queueSizeRejectionThreshold |
Integer |
队列reject阈值,可以动态修改
maxQueueSize>0是生效,一般设置为小于 maxQueueSizede 的数值 |
5 |
Setter.withGroupKey(HystrixCommandGroupKey.Factory.asKey("ExampleGroup")) //必须 .andCommandPropertiesDefaults(HystrixCommandProperties.Setter() .withExecutionTimeoutInMilliseconds(500)) //超时时间 .andThreadPoolKey(HystrixThreadPoolKey.Factory.asKey("ExampleGroup-pool")) //可选,默认 使用 this.getClass().getSimpleName(); .andThreadPoolPropertiesDefaults(HystrixThreadPoolProperties.Setter().withCoreSize(4) .withMaxQueueSize(10).withQueueSizeRejectionThreshold(7))
Q: 怎么设置线程池大小?
A:Qps* Tp99 +冗余线程
信号量隔离
线程池隔离中,发起请求的线程和真实执行的线程不是同一个线程,使用信号量隔离时,它们是同一个线程, 两种隔离的区别如下图:
1 public class CommandUsingSemaphoreIsolation extends HystrixCommand<String> { 2 3 private final int id; 4 private long start,end ; 5 6 public CommandUsingSemaphoreIsolation(int id) { 7 super(Setter.withGroupKey(HystrixCommandGroupKey.Factory.asKey("ExampleGroup")) 8 // since we‘re doing an in-memory cache lookup we choose SEMAPHORE isolation 9 .andCommandPropertiesDefaults(HystrixCommandProperties.Setter() 10 .withExecutionIsolationStrategy(HystrixCommandProperties.ExecutionIsolationStrategy.SEMAPHORE) //设置使用信号量隔离策略 11 .withExecutionIsolationSemaphoreMaxConcurrentRequests(3) //设置信号量隔离时的最大并发请求数 12 .withFallbackIsolationSemaphoreMaxConcurrentRequests(5) //设置fallback的最大并发数 13 .withExecutionTimeoutInMilliseconds(300))); //设置超时时间 14 this.id = id; 15 this.start = System.currentTimeMillis(); 16 } 17 18 @Override 19 protected String run() throws InterruptedException { 20 // a real implementation would retrieve data from in memory data structure 21 TimeUnit.MILLISECONDS.sleep(id*30); 22 System.out.println("running normal, id="+id); 23 return "ValueFromHashMap_" + id; 24 } 25 26 @Override 27 protected String getFallback(){ 28 System.out.println(" fallback, id="+id); 29 return "fallback:"+id; 30 } 31 32 } 33 34 @Test 35 public void maxCurrentRequst() throws InterruptedException { 36 int count =10; 37 while (count >0){ 38 int id = count--; 39 new Thread(() -> { 40 try { 41 new CommandUsingSemaphoreIsolation(id).execute(); 42 }catch (Exception ex){ 43 System.out.println("Exception:"+ex.getMessage()+" id="+id); 44 } 45 }).start(); 46 } 47 48 TimeUnit.SECONDS.sleep(100); 49 } 50 //注:使用信号量隔离,在同一个线程中即使循环调用new CommandUsingSemaphoreIsolation(id).queue(),run方法也是顺序执行;
//控制台输出
fallback, id=10
fallback, id=9
fallback, id=5
fallback, id=8
fallback, id=1
Exception:CommandUsingSemaphoreIsolation fallback execution rejected. id=4
Exception:CommandUsingSemaphoreIsolation fallback execution rejected. id=7
running normal, id=2
running normal, id=3
running normal, id=6
Q: 什么时候使用线程池隔离,什么使用信号量隔离?
A: 线程池隔离缺点是带来一定的开销,但不会阻塞请求线程,适合于于IO密集型的任务
信号量隔离使用用户请求线程,没有格外线程切换开销,使用与执行时间和执行逻辑都比较短的本地计算。比如CPU密集型的任务
Fallback
Q1: 为什么需要fallback?
简单来说,在依赖调用失败时,我们一般会需要提供降级方案,Hystrix对此提供了支持
降级
1 public class CommandHelloWorld extends HystrixCommand<String> { 2 3 private final String name; 4 5 public CommandHelloWorld(String name) { 6 super(Setter.withGroupKey(HystrixCommandGroupKey.Factory.asKey("ExampleGroup")) //必须 7 .andCommandPropertiesDefaults(HystrixCommandProperties.Setter() 8 .withExecutionTimeoutInMilliseconds(500)) //超时时间 9 .andThreadPoolKey(HystrixThreadPoolKey.Factory.asKey("ExampleGroup-pool")) //可选,默认 使用 this.getClass().getSimpleName(); 10 .andThreadPoolPropertiesDefaults(HystrixThreadPoolProperties.Setter().withCoreSize(4))); 11 12 this.name = name; 13 } 14 15 @Override 16 protected String run() throws InterruptedException { 17 System.out.println("running"); 18 TimeUnit.MILLISECONDS.sleep(1000); 19 return "Hello " + name + "!"; 20 } 21 22 @Override 23 protected String getFallback() { 24 return "Hello "+"Fallback"; 25 } 26 } 27 28 @Test 29 public void fallbackTest(){ 30 assertEquals("Hello Fallback",new CommandHelloWorld("World").execute()); 31 }
Q2:什么情况下会触发fallback?
简单来说,就是run方法抛异常,超时,线程/信号量reject、短路
Failure Type |
Exception class |
Exception.cause |
subject to fallback |
---|---|---|---|
FAILURE | HystrixRuntimeException |
underlying exception (user-controlled) | YES |
TIMEOUT | HystrixRuntimeException |
j.u.c.TimeoutException |
YES |
SHORT_CIRCUITED | HystrixRuntimeException |
j.l.RuntimeException |
YES |
THREAD_POOL_REJECTED | HystrixRuntimeException |
j.u.c.RejectedExecutionException |
YES |
SEMAPHORE_REJECTED | HystrixRuntimeException |
j.l.RuntimeException |
YES |
BAD_REQUEST | HystrixBadRequestException |
underlying exception (user-controlled) | NO |
以下为测试的主程序:
1 public class CommandHelloFailure extends HystrixCommand<String> { 2 3 private final String name; 4 5 public CommandHelloFailure(String name) { 6 super(Setter.withGroupKey(HystrixCommandGroupKey.Factory.asKey("ExampleGroup")) //必须 7 .andCommandPropertiesDefaults(HystrixCommandProperties.Setter() 8 .withExecutionTimeoutInMilliseconds(1000)) //超时时间 9 .andThreadPoolKey(HystrixThreadPoolKey.Factory.asKey("ExampleGroup-pool")) 10 .andThreadPoolPropertiesDefaults(HystrixThreadPoolProperties.Setter().withCoreSize(3))); 11 12 this.name = name; 13 } 14 15 @Override 16 protected String run() throws InterruptedException { 17 String theadName = this.getThreadPoolKey().name(); 18 String cmdKey=this.getThreadPoolKey().name(); 19 System.out.println("running begin , threadPool="+theadName+" cmdKey="+cmdKey+" name="+name); 20 21 if("Exception".equals(name)) { 22 throw new RuntimeException("this command always fails"); 23 }else if("Timeout".equals(name)){ 24 TimeUnit.SECONDS.sleep(2); 25 }else if("Reject".equals(name)){ 26 TimeUnit.MILLISECONDS.sleep(800); 27 } 28 System.out.println(" run end"); 29 30 return "Hello " + name + "!"; 31 } 32 33 @Override 34 protected String getFallback() { 35 StringBuilder sb = new StringBuilder("running fallback"); 36 boolean isRejected = isResponseRejected(); 37 boolean isException = isFailedExecution(); 38 boolean isTimeout= isResponseTimedOut(); 39 boolean isCircut = isCircuitBreakerOpen(); 40 41 sb.append(", isRejected:").append(isRejected); 42 sb.append(", isException:"+isException); 43 if(isException){ 44 sb.append(" msg=").append(getExecutionException().getMessage()); 45 } 46 sb.append(", isTimeout: "+isTimeout); 47 sb.append(", isCircut:"+isCircut); 48 49 sb.append(", group:").append(this.getCommandGroup().name()); 50 sb.append(", threadpool:").append(getThreadPoolKey().name()); 51 System.out.println(sb.toString()); 52 53 String msg="Hello Failure " + name + "!"; 54 return msg; 55 } 56 }
FAILURE
测试由异常导致的fallback
1 @Test 2 public void expTest() { 3 assertEquals("Hello Failure Exception!", new CommandHelloFailure("Exception").execute()); 4 } 5
//控制台输出
running begin , threadPool=ExampleGroup-pool cmdKey=ExampleGroup-pool name=Exception
running fallback, isRejected:false, isException:true msg=this command always fails, isTimeout: false, isCircut:false, group:ExampleGroup, threadpool:ExampleGroup-pool
TIMEOUT
测试有超时导致的fallback
@Test public void timeOutTest() { assertEquals("Hello Failure Timeout!", new CommandHelloFailure("Timeout").execute()); }
//控制台输出
running begin , threadPool=ExampleGroup-pool cmdKey=ExampleGroup-pool name=Timeout
running fallback, isRejected:false, isException:false, isTimeout: true, isCircut:false, group:ExampleGroup, threadpool:ExampleGroup-pool
THREAD_POOL_REJECTED
并发执行的任务数超过线程池和队列之和会被reject,导致fallback
1 @Test 2 public void rejectTest() throws InterruptedException { 3 int count = 5; 4 while (count-- > 0){ 5 new CommandHelloFailure("Reject").queue(); 6 TimeUnit.MILLISECONDS.sleep(100); 7 } 8 }
//控制台输出
running begin , threadPool=ExampleGroup-pool cmdKey=ExampleGroup-pool name=Reject
running begin , threadPool=ExampleGroup-pool cmdKey=ExampleGroup-pool name=Reject
running begin , threadPool=ExampleGroup-pool cmdKey=ExampleGroup-pool name=Reject
running fallback, isRejected:true, isException:false, isTimeout: false, isCircut:false, group:ExampleGroup, threadpool:ExampleGroup-pool
running fallback, isRejected:true, isException:false, isTimeout: false, isCircut:false, group:ExampleGroup, threadpool:ExampleGroup-pool
SEMAPHORE_REJECTED 与 THREAD_POOL_REJECTED 类似,不再演示
SHORT_CIRCUITED
在一定时间内,用户请求超过一定的比例失败时(timeout, failure, reject),断路器就会打开;短路器打开后所有请求直接走fallback
参数设置
名称 |
类型 |
含义 |
默认值 |
---|---|---|---|
circuitBreakerEnabled | Boolean | 是否启用断路器 | true |
circuitBreakerErrorThresholdPercentage | Integer | 错误百分比,超过该值打开断路器 | 50 |
circuitBreakerForceClosed | Boolean | 强制断路器打开 | false |
circuitBreakerForceOpen | Boolean | 强制短路器关闭 | false |
circuitBreakerRequestVolumeThreshold | Integer | 10s中内最少的请求量,大于该值,断路器配置才会生效 | 20 |
circuitBreakerSleepWindowInMilliseconds | Integer | 短路器打开后多长时间尝试关闭(Half open) | 5s |
一般配置如下:
1 Setter.withGroupKey(HystrixCommandGroupKey.Factory.asKey("ExampleGroup")) //必须 2 .andCommandPropertiesDefaults(HystrixCommandProperties.Setter() 3 .withExecutionTimeoutInMilliseconds(50)//超时时间 4 .withCircuitBreakerRequestVolumeThreshold(5) 5 .withCircuitBreakerSleepWindowInMilliseconds(1000) 6 .withCircuitBreakerErrorThresholdPercentage(50)) 7 .andThreadPoolKey(HystrixThreadPoolKey.Factory.asKey("ExampleGroup-pool")) //可选,默认 使用 this.getClass().getSimpleName(); 8 .andThreadPoolPropertiesDefaults(HystrixThreadPoolProperties.Setter().withCoreSize(4));
以上配置的含义是: 在10s内,如果请求在5个及以上,且有50%失败的情况下,开启断路器;断路器开启1000ms后尝试关闭
短路器的工作机制,引用自官方文档:
The precise way that the circuit opening and closing occurs is as follows: Assuming the volume across a circuit meets a certain threshold (HystrixCommandProperties.circuitBreakerRequestVolumeThreshold())... And assuming that the error percentage exceeds the threshold error percentage (HystrixCommandProperties.circuitBreakerErrorThresholdPercentage())... Then the circuit-breaker transitions from CLOSED to OPEN. While it is open, it short-circuits all requests made against that circuit-breaker. After some amount of time (HystrixCommandProperties.circuitBreakerSleepWindowInMilliseconds()), the next single request is let through (this is the HALF-OPEN state). If the request fails, the circuit-breaker returns to the OPEN state for the duration of the sleep window. If the request succeeds, the circuit-breaker transitions to CLOSED and the logic in 1. takes over again.
Q3:fallback时我们应该怎么办?
一般有以下几种策略:
1、不实现getFallback方法:依赖调用失败时直接抛出异常
2、实现getFallback方法,返回默认值:这是一种常见的策略
3、实现getFallback方法,走降级方案
此外,生产环境中,fallback时,一般需要打点记录
请求合并
简单来说,就是将一段时间内的多次请求合并为一次请求,常用于网络IO中,能减少IO次数,缺点是增加平均延迟
以下是测试代码主程序:
1 public class CommandCollapserGetValueForKey extends HystrixCollapser<List<String>, String, Integer> { 2 3 private final Integer key; 4 5 public CommandCollapserGetValueForKey(Integer key) { 6 super(Setter.withCollapserKey(HystrixCollapserKey.Factory.asKey("Collapser")) 7 .andCollapserPropertiesDefaults(HystrixCollapserProperties.Setter() 8 .withMaxRequestsInBatch(3) 9 .withTimerDelayInMilliseconds(10))); 10 this.key = key; 11 } 12 13 @Override 14 public Integer getRequestArgument() { 15 return key; 16 } 17 18 @Override 19 protected HystrixCommand<List<String>> createCommand(final Collection<CollapsedRequest<String, Integer>> requests) { 20 return new BatchCommand(requests); 21 } 22 23 @Override 24 protected void mapResponseToRequests(List<String> batchResponse, Collection<CollapsedRequest<String, Integer>> requests) { 25 int count = 0; 26 for (CollapsedRequest<String, Integer> request : requests) { 27 request.setResponse(batchResponse.get(count++)); 28 } 29 } 30 31 private static final class BatchCommand extends HystrixCommand<List<String>> { 32 private final Collection<CollapsedRequest<String, Integer>> requests; 33 34 private BatchCommand(Collection<CollapsedRequest<String, Integer>> requests) { 35 super(Setter.withGroupKey(HystrixCommandGroupKey.Factory.asKey("ExampleGroup")) 36 .andCommandKey(HystrixCommandKey.Factory.asKey("GetValueForKey"))); 37 this.requests = requests; 38 } 39 40 @Override 41 protected List<String> run() { 42 System.out.println("BatchCommand run "+requests.size()); 43 ArrayList<String> response = new ArrayList<String>(); 44 for (CollapsedRequest<String, Integer> request : requests) { 45 // artificial response for each argument received in the batch 46 response.add("ValueForKey: " + request.getArgument()); 47 } 48 return response; 49 } 50 } 51 } 52 53 54 @Test 55 public void testCollapser() throws Exception { 56 HystrixRequestContext context = HystrixRequestContext.initializeContext(); 57 try { 58 Future<String> f1 = new CommandCollapserGetValueForKey(1).queue(); 59 Future<String> f2 = new CommandCollapserGetValueForKey(2).queue(); 60 Future<String> f3 = new CommandCollapserGetValueForKey(3).queue(); 61 Future<String> f4 = new CommandCollapserGetValueForKey(4).queue(); 62 63 64 assertEquals("ValueForKey: 1", f1.get()); 65 assertEquals("ValueForKey: 2", f2.get()); 66 assertEquals("ValueForKey: 3", f3.get()); 67 assertEquals("ValueForKey: 4", f4.get()); 68 69 // assert that the batch command ‘GetValueForKey‘ was in fact 70 // executed and that it executed only once 71 assertEquals(2, HystrixRequestLog.getCurrentRequest().getAllExecutedCommands().size()); 72 HystrixCommand<?> command = HystrixRequestLog.getCurrentRequest().getAllExecutedCommands().toArray(new HystrixCommand<?>[1])[0]; 73 // assert the command is the one we‘re expecting 74 assertEquals("GetValueForKey", command.getCommandKey().name()); 75 // confirm that it was a COLLAPSED command execution 76 assertTrue(command.getExecutionEvents().contains(HystrixEventType.COLLAPSED)); 77 // and that it was successful 78 assertTrue(command.getExecutionEvents().contains(HystrixEventType.SUCCESS)); 79 } finally { 80 context.shutdown(); 81 } 82 } 83 84 //控制输出 85 BatchCommand run 3 86 BatchCommand run 1
执行流程:
使用该特性
1、必须继承HystrixCollapser类,
2、实现以下方法:
getRequestArgument: 返回请求参数对象
createCommand : 返回BatchCommand
mapResponseToRequests:实现Response和Request的映射
3、创建对应的BatchCommand类:批量请求的具体实现
参数配置:
名称 |
类型 |
含义 |
默认值 |
---|---|---|---|
maxRequestsInBatch |
Integer |
每个批次最大的请求数,超过该值,创建新的batch请求 |
Integer.MAX_VALUE |
timerDelayInMilliseconds |
Integer |
等待时间窗口,超过该值,创建新的batch请求 | 10ms |
requestCacheEnabled |
Boolean |
是否启用cache | true |
一般配置如下
Setter.withCollapserKey(HystrixCollapserKey.Factory.asKey("Collapser")) .andCollapserPropertiesDefaults(HystrixCollapserProperties.Setter() .withMaxRequestsInBatch(3) .withTimerDelayInMilliseconds(5));
请求cache
1 public class CommandUsingRequestCache extends HystrixCommand<Boolean> { 2 private final int value; 3 4 public CommandUsingRequestCache(int value) { 5 super(HystrixCommandGroupKey.Factory.asKey("ExampleGroup")); 6 this.value = value; 7 } 8 9 @Override 10 public Boolean run() { 11 return value == 0 || value % 2 == 0; 12 } 13 14 //使用cache功能,必须实现该方法 15 @Override 16 public String getCacheKey() { 17 return String.valueOf(value); 18 } 19 } 20 21 @Test 22 public void testWithCacheHits() { 23 HystrixRequestContext context = HystrixRequestContext.initializeContext(); 24 try { 25 CommandUsingRequestCache command2a = new CommandUsingRequestCache(2); 26 CommandUsingRequestCache command2b = new CommandUsingRequestCache(2); 27 28 assertTrue(command2a.execute()); 29 //第一次请求,没有cache 30 assertFalse(command2a.isResponseFromCache()); 31 32 assertTrue(command2b.execute()); 33 // 第二次请求,从cache中拿的结果 34 assertTrue(command2b.isResponseFromCache()); 35 } finally { 36 context.shutdown(); 37 } 38 39 context = HystrixRequestContext.initializeContext(); 40 try { 41 CommandUsingRequestCache command3b = new CommandUsingRequestCache(2); 42 assertTrue(command3b.execute()); 43 // this is a new request context so this 44 //new了新的 request context后,之前的cache失效 45 assertFalse(command3b.isResponseFromCache()); 46 } finally { 47 context.shutdown(); 48 } 49 }
Hystrix Context
Global Context
UserRequest Context
使用与监控
1、工程中使用
使用Hystrix很简单,只需要添加相应依赖即可,以Maven为例:
1 <!-- hystrix 依赖 --> 2 <dependency> 3 <groupId>com.netflix.hystrix</groupId> 4 <artifactId>hystrix-core</artifactId> 5 <version>1.5.9</version> 6 </dependency> 7 <dependency> 8 <groupId>com.netflix.hystrix</groupId> 9 <artifactId>hystrix-metrics-event-stream</artifactId> 10 <version>1.5.9</version> 11 </dependency>
2、DashBoard使用
web.xml中配置相应的Servlet
1 <servlet> 2 <display-name>HystrixMetricsStreamServlet</display-name> 3 <servlet-name>HystrixMetricsStreamServlet</servlet-name> 4 <servlet-class>com.netflix.hystrix.contrib.metrics.eventstream.HystrixMetricsStreamServlet</servlet-class> 5 </servlet> 6 <servlet-mapping> 7 <servlet-name>HystrixMetricsStreamServlet</servlet-name> 8 <url-pattern>/hystrix.stream</url-pattern> 9 </servlet-mapping>
下载附件中的war文件和jar文件到任意目录,执行
java -jar jetty-runner-9.2.10.v20150310.jar --port 8410 hystrix-dashboard-1.5.1.war
然后在浏览器中打开:http://localhost:8410/ ,在输入框中填写 http://hostname:port/application/hystrix.stream, 点击 Add Stream ,然后在点击Monitor Stream, 看到如下图:
每个指标对应的含义:
一般来说: Thread-pool Rejections 和Failuress/Exception应该是0,Thread timeouts是个很小的值。
代码结构
附件
1、启动脚本 start.sh
3、jetty-runner-9.2.10.v20150310.jar