第五章 服务熔断(hystrix)+ retrofit底层通信(AsyncHttpclient)

一、集群容错

技术选型:hystrix。(就是上图中熔断器

熔断的作用

第一个作用:

假设有两台服务器server1(假设可以处理的请求阈值是1W请求)和server2,在server1上注册了三个服务service1、service2、service3,在server2上注册了一个服务service4,假设service4服务响应缓慢,service1调用service4时,一直在等待响应,那么在高并发下,很快的server1处很快就会达到请求阈值(server1很快就会耗尽处理线程)之后可能宕机,这时候,不只是service1不再可用,server1上的service2和service3也不可用了。

如果我们引入了hystrix,那么service1调用service4的时候,当发现service4超时,立即断掉不再执行,执行getFallback逻辑。这样的话,server1就不会耗尽处理线程,server1上的其他服务也是可用的。当然,这是在合理的配置了超时时间的情况下,如果超时时间设置的太长的话,还是会出现未引入hystrix之前的情况。

第二个作用:

当被调服务经常失败,比如说在10min(可配)中之内调用了20次,失败了15次(可配),那么我们认为这个服务是失败的,先关闭该服务,等一会儿后再自动重新启动该服务!(这是真正的熔断!)

二、实现

1、framework

1.1、pom.xml

 1         <!-- converter-jackson -->
 2         <dependency>
 3             <groupId>com.squareup.retrofit</groupId>
 4             <artifactId>converter-jackson</artifactId>
 5             <version>1.9.0</version>
 6         </dependency>
 7         <!-- async-http-client -->
 8         <dependency>
 9             <groupId>com.ning</groupId>
10             <artifactId>async-http-client</artifactId>
11             <version>1.9.31</version>
12         </dependency>
13
14         <!-- hystrix -->
15         <dependency>
16             <groupId>com.netflix.hystrix</groupId>
17             <artifactId>hystrix-core</artifactId>
18             <version>1.5.3</version>
19         </dependency>
20         <dependency>
21             <groupId>com.netflix.hystrix</groupId>
22             <artifactId>hystrix-metrics-event-stream</artifactId>
23             <version>1.5.3</version>
24         </dependency>

说明:

  • 添加retrofit的Jackson转换器,默认为GSON
  • 添加AsyncHttpClient
  • 添加hystrix及其metrics包(后者用于展示hystrix的图表信息,以后会在优化部分完成)

1.2、服务通信(retrofit)+集群容错(hystrix)

1.2.1、RestAdapterConfig

 1 package com.microservice.retrofit;
 2
 3 import org.springframework.beans.factory.annotation.Autowired;
 4 import org.springframework.stereotype.Component;
 5
 6 import com.microservice.loadBalancer.MyLoadBalancer;
 7 import com.microservice.loadBalancer.ServerAddress;
 8
 9 import retrofit.RestAdapter;
10 import retrofit.converter.JacksonConverter;
11
12 @Component
13 public class RestAdapterConfig {
14
15     @Autowired
16     private MyLoadBalancer myLoadBalancer;
17
18     /**
19      * 负载均衡并且创建传入的API接口实例
20      */
21     public <T> T create(Class<T> tclass, String serviceName) {
22         String commandGroupKey = tclass.getSimpleName();// 获得简单类名作为groupKey
23
24         ServerAddress server = myLoadBalancer.chooseServer(serviceName);// 负载均衡
25         RestAdapter restAdapter = new RestAdapter.Builder()
26                                   .setConverter(new JacksonConverter())
27                                   .setErrorHandler(new MyErrorHandler())
28                                   .setClient(new MyHttpClient(server, commandGroupKey))
29                                   .setEndpoint("/").build();
30         T tclassInstance = restAdapter.create(tclass);
31         return tclassInstance;
32     }
33 }

说明:这里我们定义了自己的retrofit.Client和自己的retrofit.ErrorHandler

1.2.2、MyHttpClient(自定义retrofit的Client)

 1 package com.microservice.retrofit;
 2
 3 import java.io.IOException;
 4
 5 import com.microservice.hystrix.HttpHystrixCommand;
 6 import com.microservice.loadBalancer.ServerAddress;
 7 import com.netflix.hystrix.HystrixCommand.Setter;
 8 import com.netflix.hystrix.HystrixCommandGroupKey;
 9
10 import retrofit.client.Client;
11 import retrofit.client.Request;
12 import retrofit.client.Response;
13
14 public class MyHttpClient implements Client {
15     private ServerAddress server;
16     private String        commandGroupKey;
17
18     public MyHttpClient(ServerAddress server, String commandGroupKey) {
19         this.server = server;
20         this.commandGroupKey = commandGroupKey;
21     }
22
23     @Override
24     public Response execute(Request request) throws IOException {
25         Setter setter = Setter.withGroupKey(HystrixCommandGroupKey.Factory.asKey(commandGroupKey));
26         return new HttpHystrixCommand(setter, server, request).execute();// 同步执行
27     }
28 }

说明:在execute()中引入了hystrix

  • 定义了hystrix的commandGroupKey是服务名(eg.myserviceA,被调用服务名
  • 没有定义commandKey(通常commandKey是服务的一个方法名,例如myserviceA的client的getProvinceByCityName),通常该方法名是被调用服务的client中的被调用方法名
  • 手动设置hystrix的属性
    • setter.andCommandPropertiesDefaults(HystrixCommandProperties.Setter().withExecutionTimeoutInMilliseconds(1000));
    • 实际上,直接配置在consul上就好了,根据上一节archaius的自动拉取配置,hystrix会自动从pollResult中取配置并设置到实例中去。
  • 查看hystrix的属性
    • command.getProperties().executionTimeoutInMilliseconds().get(),这里的command就是下边的HttpHystrixCommand实例

1.2.3、HttpHystrixCommand(hystrix核心类)

  1 package com.microservice.hystrix;
  2
  3 import java.io.ByteArrayOutputStream;
  4 import java.io.IOException;
  5 import java.util.ArrayList;
  6 import java.util.List;
  7 import java.util.concurrent.Future;
  8
  9 import org.apache.commons.lang3.StringUtils;
 10 import org.apache.tomcat.util.http.fileupload.IOUtils;
 11 import org.slf4j.Logger;
 12 import org.slf4j.LoggerFactory;
 13
 14 import com.microservice.loadBalancer.ServerAddress;
 15 import com.netflix.hystrix.HystrixCommand;
 16 import com.ning.http.client.AsyncHttpClient;
 17 import com.ning.http.client.FluentCaseInsensitiveStringsMap;
 18 import com.ning.http.client.RequestBuilder;
 19
 20 import retrofit.client.Header;
 21 import retrofit.client.Request;
 22 import retrofit.client.Response;
 23 import retrofit.mime.TypedByteArray;
 24 import retrofit.mime.TypedOutput;
 25
 26 public class HttpHystrixCommand extends HystrixCommand<Response> {
 27     private static final Logger LOGGER = LoggerFactory.getLogger(HttpHystrixCommand.class);
 28
 29     private ServerAddress       server;
 30     private Request             request;
 31     private String              requestUrl;
 32     private AsyncHttpClient     asyncHttpClient;
 33
 34     public HttpHystrixCommand(Setter setter, ServerAddress server, Request request) {
 35         super(setter);
 36         this.server = server;
 37         this.request = request;
 38
 39         //        AsyncHttpClientConfig.Builder builder = new AsyncHttpClientConfig.Builder().setRequestTimeout(5000);//5s
 40         //        this.asyncHttpClient = new AsyncHttpClient(builder.build());
 41         this.asyncHttpClient = new AsyncHttpClient();
 42     }
 43
 44     @Override
 45     public Response run() throws Exception {
 46         com.ning.http.client.Request asyncReq = retroReq2asyncReq(request, server);
 47         Future<com.ning.http.client.Response> asyncResFuture = asyncHttpClient.executeRequest(asyncReq);
 48         com.ning.http.client.Response asyncRes = asyncResFuture.get();
 49         return asynRes2RetroRes(asyncRes);
 50     }
 51
 52     /**
 53      * 1、设置方法请求类型,例如:GET/POST
 54      * 2、转换请求头header(包括mime。这个需要根据请求体的情况进行掌握)
 55      * 3、转换请求体
 56      * 4、设置请求URL
 57      */
 58     public com.ning.http.client.Request retroReq2asyncReq(Request request, ServerAddress server) {
 59         RequestBuilder requestBuilder = new RequestBuilder(request.getMethod());//传入方法请求类型,例如:GET/POST
 60         List<Header> headers = request.getHeaders();
 61         headers.forEach(x -> requestBuilder.addHeader(x.getName(), x.getValue()));
 62
 63         if (request.getBody() != null) {
 64             String mimeType = StringUtils.EMPTY;
 65             if (StringUtils.isNotEmpty(mimeType)) {
 66                 requestBuilder.addHeader("Content-Type", mimeType);
 67             } else {
 68                 requestBuilder.addHeader("Content-Type", "application/json");
 69             }
 70
 71             TypedOutput body = request.getBody();
 72             ByteArrayOutputStream outPutStream = new ByteArrayOutputStream();
 73             try {
 74                 body.writeTo(outPutStream);//将body内容写入到ByteArrayOutputStream里
 75                 requestBuilder.setBody(outPutStream.toByteArray());
 76             } catch (IOException e) {
 77                 e.printStackTrace();
 78             } finally {
 79                 IOUtils.closeQuietly(outPutStream);
 80             }
 81         }
 82         String url = new StringBuilder("http://").append(server.getIp())
 83                                                  .append(":")
 84                                                  .append(server.getPort())
 85                                                  .append("/")
 86                                                  .append(request.getUrl()).toString();
 87         requestUrl = url;
 88         requestBuilder.setUrl(url);
 89         return requestBuilder.build();
 90     }
 91
 92     public Response asynRes2RetroRes(com.ning.http.client.Response asyncRes) throws IOException {
 93         return new Response(asyncRes.getUri().toUrl(),
 94                             asyncRes.getStatusCode(),
 95                             asyncRes.getStatusText(),
 96                             getHeaders(asyncRes.getHeaders()),
 97                             new TypedByteArray(asyncRes.getContentType(), asyncRes.getResponseBodyAsBytes()));
 98     }
 99
100     private List<Header> getHeaders(FluentCaseInsensitiveStringsMap asyncHeaders) {
101         List<Header> retrofitHeaders = new ArrayList<>();
102         asyncHeaders.keySet().forEach(key -> retrofitHeaders.add(new Header(key, asyncHeaders.getFirstValue(key))));
103         return retrofitHeaders;
104     }
105
106     /**
107      * 超时后的一些操作,或者如果缓存中有信息,可以从缓存中拿一些,具体的要看业务,也可以打一些logger
108      */
109     @Override
110     public Response getFallback() {
111         LOGGER.error("请求超时了!requestUrl:‘{}‘", requestUrl);
112         /**
113          * 想要让自定义的ErrorHandler起作用以及下边的404和reason有意义,就一定要配置requestUrl和List<header>
114          * 其实这里可以看做是定义自定义异常的状态码和状态描述
115          * 其中状态码用于自定义异常中的判断(见HystrixRuntimeException)
116          */
117         return new Response(requestUrl, 404, //定义状态码
118             "execute getFallback because execution timeout", //定义消息
119             new ArrayList<Header>(), null);
120     }
121 }

说明:首先调用run(),run()失败或超时候调用getFallback()

  • run()--这里是一个定制口,我使用了AsyncHttpClient,还可以使用其他的网络调用工具,例如:okhttp

    • 首先将Retrofit的请求信息Request转化为AsyncHttpClient的Request(在这里调用了负载均衡,将请求负载到选出的一台机器)
    • 之后调用AsyncHttpClient来进行真正的http调用,并返回AsyncHttpClient型的相应Response
    • 最后将AsyncHttpClient型的响应Response转换为Retrofit型的Response
  • getFallback()
    • 直接抛异常是不行的(该接口不让),只能采取以下的方式
    • 返回一个Response对象,该对象封装了status是404+错误的原因reason+请求的url+相应的Header列表+响应体(这里的status和reason会被用在ErrorHandler中去用于指定执行不同的逻辑,具体看下边的MyErrorHandler)
    • 如果想让MyErrorHandler起作用,Response对象必须有"请求的url+相应的Header列表",其中Header列表可以使一个空List实现类,但是不可为null
  • 在构建AsyncHttpClient实例时可以设置相关的http参数,例如:注释部分的设置请求超时时间。
    • 值得注意的是我们在配置请求超时时间时,要结合hystrix的超时时间来设置,程序会以二者的最小值作为请求超时时间

1.2.4、MyErrorHandler(自定义retrofit的错误处理器)

 1 package com.microservice.retrofit;
 2
 3 import com.microservice.exception.HystrixRuntimeException;
 4
 5 import retrofit.ErrorHandler;
 6 import retrofit.RetrofitError;
 7 import retrofit.client.Response;
 8
 9 public class MyErrorHandler implements ErrorHandler{
10     @Override
11     public Throwable handleError(RetrofitError cause) {
12         Response response = cause.getResponse();
13         /**
14          * 这里是一个可以定制的地方,自己可以定义所有想要捕获的异常
15          */
16         if(response!=null && response.getStatus()==404){
17             return new HystrixRuntimeException(cause);
18         }
19         return cause;
20     }
21 }

说明:当发生了retrofit.error时(不只是上边的getFallback()返回的Response),我们可以在该ErrorHandler的handleError方法来进行相应Response的处理。这里我们指定当404时返回一个自定义异常。

1.2.5、HystrixRuntimeException(自定义异常)

 1 package com.microservice.exception;
 2
 3 /**
 4  * 自定义异常
 5  */
 6 public class HystrixRuntimeException extends RuntimeException {
 7     private static final long serialVersionUID = 8252124808929848902L;
 8
 9     public HystrixRuntimeException(Throwable cause) {
10         super(cause);//只有这样,才能将异常信息抛给客户端
11     }
12 }

说明:自定义异常只能通过super()来向客户端抛出自己指定的异常信息(上边的Response的reason,但是抛到客户端时还是一个500错误,因为run()错误或超时就是一个服务端错误)。

整个流程:

当myserviceB调用myserviceA的一个方法时,首先会执行自定义的MyHttpClient的execute()方法,在该execute()方法中我们执行了自定义的HttpHystrixCommand的execute()方法,此时就会执行执行HttpHystrixCommand的run()方法,如果该方法运行正常并在超时时间内返回数据,则调用结束。

如果run()方法调用失败或该方法超时,就会直接运行HttpHystrixCommand的getFallback()方法。该方法返回一个retrofit.Response对象,该对象的status是404,错误信息也是自定义的。之后该对象会被包装到RetrofitError对象中,之后RetrofitError对象会由MyErrorHandler的handleError()进行处理:从RetrofitError对象中先取出Response,之后根据该Response的status执行相应的操作,我们这里对404的情况定义了一个自定义异常HystrixRuntimeException。

注意点:

  • retrofit的Response最好不要是null
  • retrofit的Jackson转换器无法转化单纯的String(因为Jackson转换器会将一个json串转化为json对象),这一点缺点可以看做没有,因为我们的接口都是restful的,那么我们都是使用json格式来通信的。

三、配置与测试

1、配置

在consul上配置service/myserviceA/dev/config的配置内容和service/myserviceB/dev/config的内容。其中,myserviceB配置了hystrix的超时时间:

1 hystrix.command.default.execution.isolation.thread.timeoutInMilliseconds=1000

说明:关于hystrix的配置参数,查看http://www.cnblogs.com/java-zhao/p/5524584.html

2、测试

最后,启动consul,启动服务A和B,swagger测试就好了!!!(在测试过程中,可以动态的去改变consul中hystrix的超时时间值,来测试archaius的动态读取)

时间: 2024-12-11 11:16:13

第五章 服务熔断(hystrix)+ retrofit底层通信(AsyncHttpclient)的相关文章

第五章 服务容错保护:Spring Cloud Hystrix

在微服务架构中,我们将系统拆分为很多个服务,各个服务之间通过注册与订阅的方式相互依赖,由于各个服务都是在各自的进程中运行,就有可能由于网络原因或者服务自身的问题导致调用故障或延迟,随着服务的积压,可能会导致服务崩溃.为了解决这一系列的问题,断路器等一系列服务保护机制出现了. 断路器本身是一种开关保护机制,用于在电路上保护线路过载,当线路中有电器发生短路时,断路器能够及时切断故障电路,防止发生过载.发热甚至起火等严重后果. 在分布式架构中,断路器模式的作用也是类似的. 针对上述问题,Spring

第五章 服务容错保护: Spring Cloud Hystrix

在微服务架构中, 存在着那么多的服务单元, 若一个单元出现故障, 就很容易因依赖关系而引发故障的蔓延,最终导致整个系统的瘫痪,这样的架构相较传统架构更加不稳定.为了解决这样的问题, 产生了断路器等一系列的服务保护机制 Spring Cloud Hystrix实现了断路器. 线程隔离等一系列服务保护功能.它也是基于Netflix的开源框架Hystrix实现的, 该框架的目标在于通过控制那些访问远程系统. 服务和第三方库的节点, 从而对延迟和故障提供更强大的容错能力.Hystrix具备服务降级. 服

服务熔断Hystrix高级

Hystrix的监控平台 了实现容错功能,Hystrix还提供了近乎实时的监控,HystrixCommand和HystrixObservableCommand在执行时,会生成执行结果和运行指标.比如每秒的请求数量,成功数量等.这些状态会暴露在Actuator提供的/health端点中.只需为项目添加 spring -boot-actuator 依赖,重启项目,访问 http://localhost:9012/actuator/hystrix.stream ,即可看到实时的监控数据. 搭建Hyst

服务熔断(Hystrix、Turbine)

1.雪崩效应 在微服务架构中,服务众多,通常会涉及多个服务层级的调用,一旦基础服务发生故障,很可能会导致级联故障,进而造成整个系统不可用,这种现象被称为服务雪崩效应.服务雪崩效应是一种因"服务提供者"的不可用导致"服务消费着"的不可用并将这种不可用逐渐放大的过程.比如在一个系统中,A是服务提供者:B是A的服务消费着,C和D又是B的服务消费者.如果此时A发生故障,则会引起B的不可用,而B的不可用又将导致C和D的不可用,当这种不可用像滚雪球一样逐渐放大的时候,雪崩效应就

# 第五章 服务:让客户端发现pod并与之通信 ##

1.什么是服务 简单来说服务就是为一组功能相同的pod提供单一不变的接入点的资源: 1.因为Pod是不稳定的,所以不能固定一个Pod的IP,但是服务是稳定的,服务的IP固定,通过服务来连接Pod也是稳定的 2.服务只是连通了集群内部所有IP访问pod的问题,但是不解决外部访问pod问题 2.创建服务 1.创建一个rc控制器,启动三个pod,暴露容器端口8080 apiVersion: v1 kind: ReplicationController metadata: name: kubia spe

《linux内核设计与实现》第五章

第五章 系统调用 一.与内核通信 系统调用在用户空间进程和硬件设备之间添加了一个中间层.作用: 为用户空间提供了一种硬件的抽象接口. 系统调用保证了系统的稳定和安全. 每个进程都运行在虚拟系统中,而在用户空间和系统的其余部分提供这样一层公共接口,也是出于这种考虑. 在Linux中,系统调用是用户空间访问内核的唯一手段:除异常和陷入外,它们是内核唯一的合法入口. 二.API.POSIX和C库 一般情况下,应用程序通过在用户空间实现的应用编程接口(API)而不是直接通过系统调用来编程. 一个API定

Linux内核分析——第五章 系统调用

第五章 系统调用 5.1 与内核通信 1.系统调用在用户空间进程和硬件设备之间添加了一个中间层,该层主要作用有三个: (1)为用户空间提供了一种硬件的抽象接口 (2)系统调用保证了系统的稳定和安全 (3)每个进程都运行在虚拟系统中,而在用户空间和系统的其余部分提供这样一层公共接口. 2.在Linux中,系统调用是用户空间访问内核的唯一手段:除异常和陷入外,它们是内核唯一的合法入口. 5.2 API.POSIX和C库 1.一般情况下,应用程序通过在用户空间实现的应用编程接口(API)而不是直接通过

SpringCloud 基础教程(五) 服务熔断机制(Eureka + Ribbon + Hystrix)

1.启动[服务中心]集群,即 Eureka Server 参考 SpringCloud 基础教程(一) 服务中心及集群(Eureka Server) 2.启动[服务提供者]集群,即 Eureka Client 参考 SpringCloud 基础教程(二) 服务注册及集群(Eureka Client) 3.启动[服务消费者],即 Eureka Discovery Client 参考 SpringCloud 基础教程(三) 服务发现及负载均衡(Eureka Discovery Client + Ri

Spring Cloud实战之初级入门(四)— 利用Hystrix实现服务熔断与服务监控

目录 1.环境介绍 2.服务监控 2.1 加入依赖 2.2 修改配置文件 2.3 修改启动文件 2.4 监控服务 2.5 小结 3. 利用hystrix实现消费服务熔断 3.1 加入服务熔断 3.2 测试服务熔断 4. 利用turbine监控所有应用 4.1 创建工程 4.2 修改配置文件 4.3 修改启动文件 4.4 启动 5.一点点重要的事情 1.环境介绍 本篇文章涉及到前面文章的工程,mirco-service-provider.mirco-service-consumer以及需要另外新建