Hystrix线程隔离技术解析-线程池(转)

认识Hystrix

Hystrix是Netflix开源的一款容错框架,包含常用的容错方法:线程隔离、信号量隔离、降级策略、熔断技术。

在高并发访问下,系统所依赖的服务的稳定性对系统的影响非常大,依赖有很多不可控的因素,比如网络连接变慢,资源突然繁忙,暂时不可用,服务脱机等。我们要构建稳定、可靠的分布式系统,就必须要有这样一套容错方法。

本文主要讨论线程隔离技术。

为什么要做线程隔离

比如我们现在有3个业务调用分别是查询订单、查询商品、查询用户,且这三个业务请求都是依赖第三方服务-订单服务、商品服务、用户服务。三个服务均是通过RPC调用。当查询订单服务,假如线程阻塞了,这个时候后续有大量的查询订单请求过来,那么容器中的线程数量则会持续增加直致CPU资源耗尽到100%,整个服务对外不可用,集群环境下就是雪崩。如下图

订单服务不可用.png

整个tomcat容器不可用.png

Hystrix是如何通过线程池实现线程隔离的

Hystrix通过命令模式,将每个类型的业务请求封装成对应的命令请求,比如查询订单->订单Command,查询商品->商品Command,查询用户->用户Command。每个类型的Command对应一个线程池。创建好的线程池是被放入到ConcurrentHashMap中,比如查询订单:

final static ConcurrentHashMap<String, HystrixThreadPool> threadPools = new ConcurrentHashMap<String, HystrixThreadPool>();
threadPools.put(“hystrix-order”, new HystrixThreadPoolDefault(threadPoolKey, propertiesBuilder));

当第二次查询订单请求过来的时候,则可以直接从Map中获取该线程池。具体流程如下图:

hystrix线程执行过程和异步化.png

创建线程池中的线程的方法,查看源代码如下:

public ThreadPoolExecutor getThreadPool(final HystrixThreadPoolKey threadPoolKey, HystrixProperty<Integer> corePoolSize, HystrixProperty<Integer> maximumPoolSize, HystrixProperty<Integer> keepAliveTime, TimeUnit unit, BlockingQueue<Runnable> workQueue) {
    ThreadFactory threadFactory = null;
    if (!PlatformSpecific.isAppEngineStandardEnvironment()) {
        threadFactory = new ThreadFactory() {
            protected final AtomicInteger threadNumber = new AtomicInteger(0);

            @Override
            public Thread newThread(Runnable r) {
                Thread thread = new Thread(r, "hystrix-" + threadPoolKey.name() + "-" + threadNumber.incrementAndGet());
                thread.setDaemon(true);
                return thread;
            }

        };
    } else {
        threadFactory = PlatformSpecific.getAppEngineThreadFactory();
    }

    final int dynamicCoreSize = corePoolSize.get();
    final int dynamicMaximumSize = maximumPoolSize.get();

    if (dynamicCoreSize > dynamicMaximumSize) {
        logger.error("Hystrix ThreadPool configuration at startup for : " + threadPoolKey.name() + " is trying to set coreSize = " +
                dynamicCoreSize + " and maximumSize = " + dynamicMaximumSize + ".  Maximum size will be set to " +
                dynamicCoreSize + ", the coreSize value, since it must be equal to or greater than the coreSize value");
        return new ThreadPoolExecutor(dynamicCoreSize, dynamicCoreSize, keepAliveTime.get(), unit, workQueue, threadFactory);
    } else {
        return new ThreadPoolExecutor(dynamicCoreSize, dynamicMaximumSize, keepAliveTime.get(), unit, workQueue, threadFactory);
    }
}

执行Command的方式一共四种,直接看官方文档(https://github.com/Netflix/Hystrix/wiki/How-it-Works),具体区别如下:

  • execute():以同步堵塞方式执行run()。调用execute()后,hystrix先创建一个新线程运行run(),接着调用程序要在execute()调用处一直堵塞着,直到run()运行完成。
  • queue():以异步非堵塞方式执行run()。调用queue()就直接返回一个Future对象,同时hystrix创建一个新线程运行run(),调用程序通过Future.get()拿到run()的返回结果,而Future.get()是堵塞执行的。
  • observe():事件注册前执行run()/construct()。第一步是事件注册前,先调用observe()自动触发执行run()/construct()(如果继承的是HystrixCommand,hystrix将创建新线程非堵塞执行run();如果继承的是HystrixObservableCommand,将以调用程序线程堵塞执行construct()),第二步是从observe()返回后调用程序调用subscribe()完成事件注册,如果run()/construct()执行成功则触发onNext()和onCompleted(),如果执行异常则触发onError()。
  • toObservable():事件注册后执行run()/construct()。第一步是事件注册前,调用toObservable()就直接返回一个Observable<String>对象,第二步调用subscribe()完成事件注册后自动触发执行run()/construct()(如果继承的是HystrixCommand,hystrix将创建新线程非堵塞执行run(),调用程序不必等待run();如果继承的是HystrixObservableCommand,将以调用程序线程堵塞执行construct(),调用程序等待construct()执行完才能继续往下走),如果run()/construct()执行成功则触发onNext()和onCompleted(),如果执行异常则触发onError()

    注:

    execute()和queue()是在HystrixCommand中,observe()和toObservable()是在HystrixObservableCommand 中。从底层实现来讲,HystrixCommand其实也是利用Observable实现的(看Hystrix源码,可以发现里面大量使用了RxJava),尽管它只返回单个结果。HystrixCommand的queue方法实际上是调用了toObservable().toBlocking().toFuture(),而execute方法实际上是调用了queue().get()。

如何应用到实际代码中
package myHystrix.threadpool;

import com.netflix.hystrix.*;
import org.junit.Test;

import java.util.List;
import java.util.concurrent.Future;

/**
 * Created by wangxindong on 2017/8/4.
 */
public class GetOrderCommand extends HystrixCommand<List> {

    OrderService orderService;

    public GetOrderCommand(String name){
        super(Setter.withGroupKey(HystrixCommandGroupKey.Factory.asKey("ThreadPoolTestGroup"))
                .andCommandKey(HystrixCommandKey.Factory.asKey("testCommandKey"))
                .andThreadPoolKey(HystrixThreadPoolKey.Factory.asKey(name))
                .andCommandPropertiesDefaults(
                        HystrixCommandProperties.Setter()
                                .withExecutionTimeoutInMilliseconds(5000)
                )
                .andThreadPoolPropertiesDefaults(
                        HystrixThreadPoolProperties.Setter()
                                .withMaxQueueSize(10)   //配置队列大小
                                .withCoreSize(2)    // 配置线程池里的线程数
                )
        );
    }

    @Override
    protected List run() throws Exception {
        return orderService.getOrderList();
    }

    public static class UnitTest {
        @Test
        public void testGetOrder(){
//            new GetOrderCommand("hystrix-order").execute();
            Future<List> future =new GetOrderCommand("hystrix-order").queue();
        }

    }
}
总结

执行依赖代码的线程与请求线程(比如Tomcat线程)分离,请求线程可以自由控制离开的时间,这也是我们通常说的异步编程,Hystrix是结合RxJava来实现的异步编程。通过设置线程池大小来控制并发访问量,当线程饱和的时候可以拒绝服务,防止依赖问题扩散。

线程隔离.png

线程隔离的优点:

[1]:应用程序会被完全保护起来,即使依赖的一个服务的线程池满了,也不会影响到应用程序的其他部分。

[2]:我们给应用程序引入一个新的风险较低的客户端lib的时候,如果发生问题,也是在本lib中,并不会影响到其他内容,因此我们可以大胆的引入新lib库。

[3]:当依赖的一个失败的服务恢复正常时,应用程序会立即恢复正常的性能。

[4]:如果我们的应用程序一些参数配置错误了,线程池的运行状况将会很快显示出来,比如延迟、超时、拒绝等。同时可以通过动态属性实时执行来处理纠正错误的参数配置。

[5]:如果服务的性能有变化,从而需要调整,比如增加或者减少超时时间,更改重试次数,就可以通过线程池指标动态属性修改,而且不会影响到其他调用请求。

[6]:除了隔离优势外,hystrix拥有专门的线程池可提供内置的并发功能,使得可以在同步调用之上构建异步的外观模式,这样就可以很方便的做异步编程(Hystrix引入了Rxjava异步框架)。

尽管线程池提供了线程隔离,我们的客户端底层代码也必须要有超时设置,不能无限制的阻塞以致线程池一直饱和。

线程隔离的缺点:

[1]:线程池的主要缺点就是它增加了计算的开销,每个业务请求(被包装成命令)在执行的时候,会涉及到请求排队,调度和上下文切换。不过Netflix公司内部认为线程隔离开销足够小,不会产生重大的成本或性能的影响。

The Netflix API processes 10+ billion Hystrix Command executions per day using thread isolation. Each API instance has 40+ thread-pools with 5–20 threads in each (most are set to 10).

Netflix API每天使用线程隔离处理10亿次Hystrix Command执行。 每个API实例都有40多个线程池,每个线程池中有5-20个线程(大多数设置为10个)。

对于不依赖网络访问的服务,比如只依赖内存缓存这种情况下,就不适合用线程池隔离技术,而是采用信号量隔离,后面文章会介绍。

因此我们可以放心使用Hystrix的线程隔离技术,来防止雪崩这种可怕的致命性线上故障。

转载请注明出处,并附上链接 http://www.jianshu.com/p/df1525d58c20

参考资料:

https://github.com/Netflix/Hystrix/wiki


作者:新栋BOOK链接:http://www.jianshu.com/p/df1525d58c20來源:简书著作权归作者所有。商业转载请联系作者获得授权,非商业转载请注明出处。

hystrix的线程隔离技术除了线程池,还有另外一种方式:信号量。

线程池和信号量的区别

《Hystrix线程隔离技术解析-线程池》一文最后,我们谈到了线程池的缺点,当我们依赖的服务是极低延迟的,比如访问内存缓存,就没有必要使用线程池的方式,那样的话开销得不偿失,而是推荐使用信号量这种方式。下面这张图说明了线程池隔离和信号量隔离的主要区别:线程池方式下业务请求线程和执行依赖的服务的线程不是同一个线程;信号量方式下业务请求线程和执行依赖服务的线程是同一个线程

信号量和线程池的区别.png

如何使用信号量来隔离线程

将属性execution.isolation.strategy设置为SEMAPHORE ,象这样 ExecutionIsolationStrategy.SEMAPHORE,则Hystrix使用信号量而不是默认的线程池来做隔离。

public class CommandUsingSemaphoreIsolation extends HystrixCommand<String> {

    private final int id;

    public CommandUsingSemaphoreIsolation(int id) {
        super(Setter.withGroupKey(HystrixCommandGroupKey.Factory.asKey("ExampleGroup"))
                // since we‘re doing work in the run() method that doesn‘t involve network traffic
                // and executes very fast with low risk we choose SEMAPHORE isolation
                .andCommandPropertiesDefaults(HystrixCommandProperties.Setter()
                        .withExecutionIsolationStrategy(ExecutionIsolationStrategy.SEMAPHORE)));
        this.id = id;
    }

    @Override
    protected String run() {
        // a real implementation would retrieve data from in memory data structure
        // or some other similar non-network involved work
        return "ValueFromHashMap_" + id;
    }

}
总结

信号量隔离的方式是限制了总的并发数,每一次请求过来,请求线程和调用依赖服务的线程是同一个线程,那么如果不涉及远程RPC调用(没有网络开销)则使用信号量来隔离,更为轻量,开销更小。

转载请注明出处,并附上链接http://www.jianshu.com/p/af8dc67e5238

参考资料:https://github.com/Netflix/Hystrix/wiki


作者:新栋BOOK链接:http://www.jianshu.com/p/af8dc67e5238來源:简书著作权归作者所有。商业转载请联系作者获得授权,非商业转载请注明出处。
时间: 2025-01-07 20:55:20

Hystrix线程隔离技术解析-线程池(转)的相关文章

flask LOCAL线程隔离技术

from threading import Thread from werkzeug.local import Local local = Local()#实例化一个线程隔离对象 request = '123' class MyThread(Thread): def run(self): global request request = 'abc' print('子线程',request) mythread = MyThread() mythread.start() mythread.join(

java 线程Thread 技术--创建线程的方式

在第一节中,对线程的创建我们通过看文档,得知线程的创建有两种方式进行实现,我们进行第一种方式的创建,通过继承Thread 类 ,并且重写它的run 方法,就可以进行线程的创建,所有的程序执行都放在了run 方法里:可以说run 方法里放入的是线程执行的程序:在执行线程的时候,需要调用线程的start 方法,就可以进行线程的启动: 总之就是:代码写在run 方法里面,但是线程的执行调用start 方法,start 方法会开启一个线程去执行run 方法: 方式-: public class Thre

深入解析线程池的使用

为什么需要线程池 目前的大多数网络服务器,包括Web服务器.Email服务器以及数据库服务器等都具有一个共同点,就是单位时间内必须处理数目巨大的连接请求,但处理时间却相对较短. 传 统多线程方案中我们采用的服务器模型则是一旦接受到请求之后,即创建一个新的线程,由该线程执行任务.任务执行完毕后,线程退出,这就是是"即时创建,即 时销毁"的策略.尽管与创建进程相比,创建线程的时间已经大大的缩短,但是如果提交给线程的任务是执行时间较短,而且执行次数极其频繁,那么服务器将处于 不停的创建线程,

池化技术——自定义线程池

目录 池化技术--自定义线程池 1.为什么要使用线程池? 1.1.池化技术的特点: 1.2.线程池的好处: 1.3.如何自定义一个线程池 2.三大方法 2.1.单个线程的线程池方法 2.2.固定的线程池的大小的方法 2.3.可伸缩的线程池的方法 2.4.完整的测试代码为: 3.为什么要自定义线程池?三大方法创建线程池的弊端分析 4.七大参数 5.如何手动的去创建一个线程池 6.四种拒绝策略 6.1.会抛出异常的拒绝策略 6.2.哪来的去哪里拒绝策略 6.3.丢掉任务拒绝策略 6.4.尝试竞争拒绝

完全解析线程池ThreadPool原理&amp;使用

目录 1. 简介 2. 工作原理 2.1 核心参数 线程池中有6个核心参数,具体如下 上述6个参数的配置 决定了 线程池的功能,具体设置时机 = 创建 线程池类对象时 传入 ThreadPoolExecutor类 = 线程池的真正实现类 开发者可根据不同需求 配置核心参数,从而实现自定义线程池 // 创建线程池对象如下 // 通过 构造方法 配置核心参数 Executor executor = new ThreadPoolExecutor( CORE_POOL_SIZE, MAXIMUM_POO

Java并发编程与技术内幕:线程池深入理解

林炳文Evankaka原创作品.转载请注明出处http://blog.csdn.net/evankaka 摘要: 本文主要讲了Java当中的线程池的使用方法.注意事项及其实现源码实现原理,并辅以实例加以说明,对加深Java线程池的理解有很大的帮助. 首先,讲讲什么是线程池?照笔者的简单理解,其实就是一组线程实时处理休眠状态,等待唤醒执行.那么为什么要有线程池这个东西呢?可以从以下几个方面来考虑:其一.减少在创建和销毁线程上所花的时间以及系统资源的开销 .其二.2将当前任务与主线程隔离,能实现和主

10-多线程、多进程和线程池编程

一.多线程.多进程和线程池编程 1.1.Python中的GIL锁 CPython中,global interpreter lock(简称GIL)是一个互斥体,用于保护对Python对象的访问,从而防止多个线程一次执行Python字节码(也就是说,GIL锁每次只能允许一个线程工作,无法多个线程同时在CPU上工作).锁定是必要的,主要是因为CPython的内存管理不是线程安全的.(但是,由于存在GIL,因此其他功能已经变得越来越依赖于它所执行的保证.)CPython扩展必须支持GIL,以避免破坏线程

Java多线程与并发——线程生命周期和线程池

线程生命周期:  线程池:是预先创建线程的一种技术.线程池在还没有任务到来之前,创建一定数量的线程,放入空闲队列中,然后对这些资源进行复用.减少频繁的创建和销毁对象. java里面线程池的顶级接口是Executor,是一个执行线程的工具. 线程池接口是ExecutorService. java.util.concurrent包:并发编程中很常用的实用工具类 Executor接口:执行已提交的Runnable任务的对象. ExecutorService接口:Executor提供了管理终止的方法,以

进程间的数据共享、进程池的回调函数和线程初识、守护线程

一.进程的数据共享 进程间数据是独立的,可以借助于队列或管道实现通信,二者都是基于消息传递的 虽然进程间数据独立,但可以通过Manager实现数据共享. 把所有实现了数据共享的比较便捷的类都重新又封装了一遍,并且在原有的multiprocessing基础上增加了新的机制 list dict等 数据共享的机制 支持数据类型非常有限 list dict都不是数据安全的,你需要自己加锁来保证数据安全 Manager用法: Manager().dict() # 创建共享的字典 Manager().lis