实际项目中使用CompletionService提升系统性能的一次实践

随着互联网应用的深入,很多传统行业也都需要接入到互联网。我们公司也是这样,保险核心需要和很多保险中介对接,比如阿里、京东等等。这些公司对于接口服务的性能有些比较高的要求,传统的核心无法满足要求,所以信息技术部领导高瞻远瞩,决定开发互联网接入服务,满足来自性能的需求。

概念

CompletionServiceExecutorBlockingQueue的功能融合在一起,将Callable任务提交给CompletionService来执行,然后使用类似于队列操作的take和poll等方法来获得已完成的结果,而这些结果会在完成时被封装为Future。对于更多的概念,请参阅其他网络文档。

线程池的设计,阿里开发手册说过不要使用Java Executors 提供的默认线程池,因此需要更接近实际的情况来自定义一个线程池,根据多次压测,采用的线程池如下:

  public ExecutorService getThreadPool(){          return new ThreadPoolExecutor(75,                  125,                  180000,                  TimeUnit.MILLISECONDS,                  new LinkedBlockingDeque<>(450),                  new ThreadPoolExecutor.CallerRunsPolicy());      }

说明:公司的业务为低频交易,对于单次调用性能要求高,但是并发压力根本不大,所以 阻塞队列已满且线程数达到最大值时所采取的饱和策略为调用者执行。

实现

业务

投保业务主要涉及这几个大的方面:投保校验、核保校验、保费试算

  • 投保校验:最主要的是要查询客户黑名单和风险等级,都是千万级的表。而且投保人和被保人都需要校验
  • 核保校验:除了常规的核保规则校验,查询千万级的大表,还需要调用外部智能核保接口获得用户的风险等级,投保人和被保人都需要校验
  • 保费试算:需要计算每个险种的保费

设计

根据上面的业务,如果串行执行的话,单次性能肯定不高,所以考虑多线程异步执行获得校验结果,再对结果综合判断

    • 投保校验:采用一个线程(也可以根据投保人和被保人数量来采用几个线程)
    • 核保校验:
      • 常规校验:采用一个线程
      • 外部调用:有几个用户(指投保人和被保人)就采用几个线程

保费计算:有几个险种就采用几个线程,最后合并得到整个的保费

代码

以下代码是样例,实际逻辑已经去掉

先创建投保、核保(常规、外部调用)、保费计算4个业务服务类:

投保服务类:InsuranceVerificationServiceImpl,假设耗时50ms

    @Service    public class InsuranceVerificationServiceImpl implements InsuranceVerificationService {        private static final Logger logger = LoggerFactory.getLogger(InsuranceVerificationServiceImpl.class);        @Override        public TaskResponseModel<Object> insuranceCheck(String key, PolicyModel policyModel) {            try {                //假设耗时50ms                Thread.sleep(50);                            return TaskResponseModel.success().setKey(key).setData(policyModel);            } catch (InterruptedException e) {                logger.warn(e.getMessage());                            return TaskResponseModel.failure().setKey(key).setResultMessage(e.getMessage());            }        }    }

核保常规校验服务类:UnderwritingCheckServiceImpl,假设耗时50ms

    @Service    public class UnderwritingCheckServiceImpl implements UnderwritingCheckService {        private static final Logger logger = LoggerFactory.getLogger(UnderwritingCheckServiceImpl.class);        @Override        public TaskResponseModel<Object> underwritingCheck(String key, PolicyModel policyModel) {            try {                //假设耗时50ms                Thread.sleep(50);                            return TaskResponseModel.success().setKey(key).setData(policyModel);            } catch (InterruptedException e) {                logger.warn(e.getMessage());                            return TaskResponseModel.failure().setKey(key).setResultMessage(e.getMessage());            }        }    }

核保外部调用服务类:ExternalCallServiceImpl,假设耗时200ms

    @Service    public class ExternalCallServiceImpl implements ExternalCallService {        private static final Logger logger = LoggerFactory.getLogger(ExternalCallServiceImpl.class);        @Override        public TaskResponseModel<Object> externalCall(String key, Insured insured) {            try {                //假设耗时200ms                Thread.sleep(200);                ExternalCallResultModel externalCallResultModel = new ExternalCallResultModel();                externalCallResultModel.setIdcard(insured.getIdcard());                externalCallResultModel.setScore(200);                return TaskResponseModel.success().setKey(key).setData(externalCallResultModel);            } catch (InterruptedException e) {                logger.warn(e.getMessage());                return TaskResponseModel.failure().setKey(key).setResultMessage(e.getMessage());            }        }    }

试算服务类:TrialCalculationServiceImpl,假设耗时50ms

    @Service    public class TrialCalculationServiceImpl implements TrialCalculationService {        private static final Logger logger = LoggerFactory.getLogger(TrialCalculationServiceImpl.class);        @Override        public TaskResponseModel<Object> trialCalc(String key, Risk risk) {            try {                //假设耗时50ms                Thread.sleep(50);                return TaskResponseModel.success().setKey(key).setData(risk);            } catch (InterruptedException e) {                logger.warn(e.getMessage());                return TaskResponseModel.failure().setKey(key).setResultMessage(e.getMessage());            }        }    }

统一返回接口类:TaskResponseModel, 上面4个服务的方法统一返回TaskResponseModel

  @Data  @ToString  @NoArgsConstructor  @AllArgsConstructor  @EqualsAndHashCode  @Accessors(chain = true)  public class TaskResponseModel<T extends Object> implements Serializable {      private String key;           //唯一调用标志      private String resultCode;    //结果码      private String resultMessage; //结果信息      private T data;               //业务处理结果

      public static TaskResponseModel<Object> success() {          TaskResponseModel<Object> taskResponseModel = new TaskResponseModel<>();          taskResponseModel.setResultCode("200");          return taskResponseModel;      }      public static TaskResponseModel<Object> failure() {          TaskResponseModel<Object> taskResponseModel = new TaskResponseModel<>();          taskResponseModel.setResultCode("400");          return taskResponseModel;      }  }

注:

  1. key为这次调用的唯一标识,由调用者传进来
  2. resultCode结果码,200为成功,400表示有异常
  3. resultMessage信息,表示不成功或者异常信息
  4. data业务处理结果,如果成功的话
  5. 这些服务类都是单例模式

要使用用CompletionService的话,需要创建实现了Callable接口的线程

投保Callable:

    @Data    @AllArgsConstructor    public class InsuranceVerificationCommand implements Callable<TaskResponseModel<Object>> {        private String key;        private PolicyModel policyModel;        private final InsuranceVerificationService insuranceVerificationService;        @Override        public TaskResponseModel<Object> call() throws Exception {            return insuranceVerificationService.insuranceCheck(key, policyModel);        }    }

核保常规校验Callable:

    @Data    @AllArgsConstructor    public class UnderwritingCheckCommand implements Callable<TaskResponseModel<Object>> {        private String key;        private PolicyModel policyModel;        private final UnderwritingCheckService underwritingCheckService;        @Override        public TaskResponseModel<Object> call() throws Exception {            return underwritingCheckService.underwritingCheck(key, policyModel);        }    }

核保外部调用Callable:

    @Data    @AllArgsConstructor    public class ExternalCallCommand implements Callable<TaskResponseModel<Object>> {        private String key;        private Insured insured;        private final ExternalCallService externalCallService;        @Override        public TaskResponseModel<Object> call() throws Exception {            return externalCallService.externalCall(key, insured);        }    }

试算调用Callable:

    @Data    @AllArgsConstructor    public class TrialCalculationCommand implements Callable<TaskResponseModel<Object>> {        private String key;        private Risk risk;        private final TrialCalculationService trialCalculationService;        @Override        public TaskResponseModel<Object> call() throws Exception {            return trialCalculationService.trialCalc(key, risk);        }    }

  1. 每一次调用,需要创建这4种Callable
  2. 返回统一接口TaskResopnseModel

异步执行的类:TaskExecutor

  @Component  public class TaskExecutor {      private static final Logger logger = LoggerFactory.getLogger(TaskExecutor.class);      //线程池      private final ExecutorService executorService;

      public TaskExecutor(ExecutorService executorService) {          this.executorService = executorService;      }

      //异步执行,获取所有结果后返回      public List<TaskResponseModel<Object>> execute(List<Callable<TaskResponseModel<Object>>> commands) {          //创建异步执行对象          CompletionService<TaskResponseModel<Object>> completionService = new ExecutorCompletionService<>(executorService);          for (Callable<TaskResponseModel<Object>> command : commands) {              completionService.submit(command);          }          //获取所有异步执行线程的结果          int taskCount = commands.size();          List<TaskResponseModel<Object>> params = new ArrayList<>(taskCount);          try {              for (int i = 0; i < taskCount; i++) {                  Future<TaskResponseModel<Object>> future = completionService.take();                  params.add(future.get());              }          } catch (InterruptedException | ExecutionException e) {              //异常处理              params.clear();              params.add(TaskResponseModel.failure().setKey("error").setResultMessage("异步执行线程错误"));          }          //返回,如果执行中发生error, 则返回相应的key值:error          return params;      }  }

  1. 为单例模式
  2. 接收参数为List<Callable<TaskResponseModel<Object>>>,也就是上面定义的4种Callable的列表
  3. 返回List<TaskResponseModel<Object>>,也就是上面定义4种Callable返回的结果列表
  4. 我们的业务是对返回结果统一判断,业务返回结果有因果关系
  5. 如果线程执行有异常,也返回List<TaskResponseModel>,这个时候列表中只有一个TaskResponseModelkey为error, 后续调用者可以通过这个来判断线程是否执行成功;

调用方:CompletionServiceController

  @RestController  public class CompletionServiceController {      //投保key      private static final String INSURANCE_KEY = "insurance_";      //核保key      private static final String UNDERWRITING_KEY = "underwriting_";      //外部调用key      private static final String EXTERNALCALL_KEY = "externalcall_";      //试算key      private static final String TRIA_KEY = "trial_";

      private static final Logger logger = LoggerFactory.getLogger(CompletionServiceController.class);

      private final ExternalCallService externalCallService;      private final InsuranceVerificationService insuranceVerificationService;      private final TrialCalculationService trialCalculationService;      private final UnderwritingCheckService underwritingCheckService;      private final TaskExecutor taskExecutor;

      public CompletionServiceController(ExternalCallService externalCallService, InsuranceVerificationService insuranceVerificationService, TrialCalculationService trialCalculationService, UnderwritingCheckService underwritingCheckService, TaskExecutor taskExecutor) {          this.externalCallService = externalCallService;          this.insuranceVerificationService = insuranceVerificationService;          this.trialCalculationService = trialCalculationService;          this.underwritingCheckService = underwritingCheckService;          this.taskExecutor = taskExecutor;      }

      //多线程异步并发接口      @PostMapping(value = "/async", headers = "Content-Type=application/json;charset=UTF-8")      public String asyncExec(@RequestBody PolicyModel policyModel) {          long start = System.currentTimeMillis();

          asyncExecute(policyModel);          logger.info("异步总共耗时:" + (System.currentTimeMillis() - start));          return "ok";      }

      //串行调用接口      @PostMapping(value = "/sync", headers = "Content-Type=application/json;charset=UTF-8")      public String syncExec(@RequestBody PolicyModel policyModel) {          long start = System.currentTimeMillis();          syncExecute(policyModel);          logger.info("同步总共耗时:" + (System.currentTimeMillis() - start));          return "ok";      }      private void asyncExecute(PolicyModel policyModel) {          List<Callable<TaskResponseModel<Object>>> baseTaskCallbackList = new ArrayList<>();          //根据被保人外部接口调用          for (Insured insured : policyModel.getInsuredList()) {              ExternalCallCommand externalCallCommand = new ExternalCallCommand(EXTERNALCALL_KEY + insured.getIdcard(), insured, externalCallService);              baseTaskCallbackList.add(externalCallCommand);          }          //投保校验          InsuranceVerificationCommand insuranceVerificationCommand = new InsuranceVerificationCommand(INSURANCE_KEY, policyModel, insuranceVerificationService);          baseTaskCallbackList.add(insuranceVerificationCommand);          //核保校验          UnderwritingCheckCommand underwritingCheckCommand = new UnderwritingCheckCommand(UNDERWRITING_KEY, policyModel, underwritingCheckService);          baseTaskCallbackList.add(underwritingCheckCommand);          //根据险种进行保费试算          for(Risk risk : policyModel.getRiskList()) {              TrialCalculationCommand trialCalculationCommand = new TrialCalculationCommand(TRIA_KEY + risk.getRiskcode(), risk, trialCalculationService);              baseTaskCallbackList.add(trialCalculationCommand);          }          List<TaskResponseModel<Object>> results = taskExecutor.execute(baseTaskCallbackList);          for (TaskResponseModel<Object> t : results) {              if (t.getKey().equals("error")) {                  logger.warn("线程执行失败");                  logger.warn(t.toString());              }              logger.info(t.toString());          }

      }      private void syncExecute(PolicyModel policyModel) {          //根据被保人外部接口调用          for (Insured insured : policyModel.getInsuredList()) {              TaskResponseModel<Object> externalCall = externalCallService.externalCall(insured.getIdcard(), insured);              logger.info(externalCall.toString());          }          //投保校验          TaskResponseModel<Object> insurance = insuranceVerificationService.insuranceCheck(INSURANCE_KEY, policyModel);          logger.info(insurance.toString());          //核保校验          TaskResponseModel<Object> underwriting = underwritingCheckService.underwritingCheck(UNDERWRITING_KEY, policyModel);          logger.info(underwriting.toString());          //根据险种进行保费试算          for(Risk risk : policyModel.getRiskList()) {              TaskResponseModel<Object> risktrial = trialCalculationService.trialCalc(risk.getRiskcode(), risk);              logger.info(risktrial.toString());          }

      }  }

1.为测试方便,提供两个接口调用:一个是串行执行,一个是异步并发执行

2.在异步并发执行函数asyncExecute中:

  1. 根据有多少个被保人,创建多少个外部调用的Callable实例,key值为EXTERNALCALL_KEY + insured.getIdcard(),在一次保单投保调用中,每一个被保人Callablekey是不一样的。
  2. 根据有多少个险种,创建多少个试算的Callable实例,keyTRIA_KEY + risk.getRiskcode(),在一次保单投保调用中,每一个险种的Callable的key是不一样的
  3. 创建投保校验的Callable实例,业务上只需要一个
  4. 创建核保校验的Callable实例,业务上只需要一个
  5. 将Callable列表传入到TaskExecutor执行异步并发调用
  6. 根据返回结果来判断,通过判断返回的TaskResponseModelkey值可以知道是哪类业务校验,分别进行判断,还可以交叉判断(公司的业务就是要交叉判断)

验证

验证数据:

{"insuredList":[{"idcard":"laza","name":"320106"},{"idcard":"ranran","name":"120102"}],"policyHolder":"lazasha","policyNo":"345000987","riskList":[{"mainFlag":1,"premium":300,"riskcode":"risk001","riskname":"险种一"},{"mainFlag":0,"premium":400,"riskcode":"risk002","riskname":"险种二"}]}

上面数据表明:有两个被保人,两个险种。按照我们上面的定义,会调用两次外部接口,两次试算,一次投保,一次核保。而在样例代码中,一次外部接口调用耗时为200ms, 其他都为50ms.

本地开发的配置为8C16G:

  • 同步串行接口调用计算:2 * 200 + 2 * 50 + 50 + 50 = 600ms
  • 多线程异步执行调用计算:按照多线程并发执行原理,取耗时最长的200ms

验证:同步接口

输出耗时:可以看到耗时601ms

验证:多线程异步执行接口

输出耗时:可以看到为204ms

结果:基本和我们的预期相符合。

结束

这是将实际生产中的例子简化出来,具体生产的业务比较复杂,不便于展示。

实际情况下,原来的接口需要1000ms以上才能完成单次调用,有的需要2000-3000ms。现在的接口,在生产两台8c16g的虚拟机, 经过4个小时的简单压测能够支持2000用户并发,单次返回时长为350ms左右,服务很稳定,完全能够满足公司的业务发展需求。

原文地址:https://www.cnblogs.com/itps/p/12340082.html

时间: 2024-11-07 09:54:53

实际项目中使用CompletionService提升系统性能的一次实践的相关文章

异步并发利器:实际项目中使用CompletionService提升系统性能的一次实践

场景 随着互联网应用的深入,很多传统行业也都需要接入到互联网.我们公司也是这样,保险核心需要和很多保险中介对接,比如阿里.京东等等.这些公司对于接口服务的性能有些比较高的要求,传统的核心无法满足要求,所以信息技术部领导高瞻远瞩,决定开发互联网接入服务,满足来自性能的需求. 概念 CompletionService将Executor和BlockingQueue的功能融合在一起,将Callable任务提交给CompletionService来执行,然后使用类似于队列操作的take和poll等方法来获

架构师提升篇:分布式系统中,如何提升系统性能?

在分布式系统中,平衡业务计算的压力分布,减少网络上的数据流动,是一种提升性能的手段,请看下面的例子. 1)案例背景 某"机械设计研究所"历史上在管理模式上采用传统的层次化垂直结构.但是近年来,随着用户对产品更新换代的要求越来越快.质量要求越来越高,在竞争日益剧烈.外部压力日益增大的形势下,该所在管理模型上重新定位,打破长久以来形成的垂直结构,形成一种趋向于水平集成的业务模型,使企业能更专注于自己的业务特长,在产品研发时,能更好地利用国内更先进的技术力量,以实现合作方异地协同设计. 为此

DotNet项目中的一些常用验证操作

在项目中需要对用户输入的信息,以及一些方法生成的结果进行验证,一般在项目中较多的采用js插件或js来进行有关信息的校验,但是从项目安全性的角度进行考虑,可对系统进行js注入. 如果在后台对用户输入的信息进行验证会相对的安全,在出现信息验证不合法时,可以直接在程序中抛出异常,终止程序的运行. 现在提供几种较为常用的验证方法,可以减少在项目中开发时间和错误性: 1.判断域名:         /// <summary>         /// 普通的域名         /// </summ

Android在项目中接入腾讯TBS浏览器WebView的教程与注意的地方

腾讯TBS浏览器服务 我们都知道,在Android开发中,经常会用到Webview,而且WebView是出了名的坑的,各种bug.这时候腾讯老哥站出来了,搞了一个TBS浏览器服务这个东西. 说得这么屌,其实就是一个webView控件,然后解析解析网页的内核是他自己做的,叫X5内核(系统原生的WebView用的是WebKit内核),所以我们开发者用的时候,主要就是用这个com.tencent.smtt.sdk.WebView控件 当然这个控件有很多功能,当然也有些要注意的地方. 官网地址:http

如何在程序开发项目中选择合适的 JavaScript 框架,节省时间和成本的9款极佳的JavaScript框架介绍

从技术上来看,iOS,Android 和 Windows Phone 上的移动应用是使用不同的程序语言开发的,iOS 应用使用 Objective-C,Android 应用使用 Java,而 Windows Phone 应用使用 .NET. .随着 JavaScript,CSS 和 HTML 知识技能的提升,相信你也可以构建一个超赞的移动应用.在这篇博客里,我们将会介绍一些极好的 JavaScript 移动应用程序开发框架. 说到网络开发,就不得不说 JavaScript,这是一款很有前途的程序

记一次项目中的css样式复用

本文同步至微信公众号:http://mp.weixin.qq.com/s?__biz=MzAxMzgwNDU3Mg==&mid=401616238&idx=1&sn=3c6e965283c632e9035875be43e6a305&scene=0#wechat_redirect 二维码: 一直觉得css是一个不被重视,或者说是重视不够的饭后甜点.因为它太“简单”,门槛低,不能彰显或提升广大闷骚程序猿的逼格...一直都想聊聊css相关的一些杂碎.正好借最近的一次项目实践来侃侃

NIO提升系统性能

前言 在软件系统中,I/O的速度要比内存的速度慢很多,因此I/O经常会称为系统的瓶颈.所有,提高I/O速度,对于提升系统的整体性能有很大的作用. 在java标准的I/O中,是基于流的I/O的实现,即InputStream和OutPutStream,这种基于流的实现以字节为基本单元,很容易实现各种过滤器. NIO和new I/O的简称,在java1.4纳入JDK中,具有以下特征: 1.为所有的原始类型提供(buffer)缓存支持: 2.使用Charset作为字符集编码解码解决方案: 3.增加了通道

SSH的项目中,使用getHibernateTemplate 与 getSession有什么的区别?优点与缺点是什么

SSH的项目中,使用getHibernateTemplate 与 getSession有什么的区别?优点与缺点是什么,谢谢回答! 悬赏分:0 - 解决时间:2008-10-7 09:42 SSH的项目中,使用getHibernateTemplate 与 getSession有什么的区别?优点与缺点是什么,谢谢回答! 问题补充:谢谢 451182 主要是想了解一下这两个关于事物这一块有什么不同?谢谢! 提问者: zhongbin007 - 试用期 一级 最佳答案 getHibernateTempl

【Atom】在一个中/大型项目中,那些好用而强大的atom功能

作为一个学生党,一开始使用atom时候并没有意识到atom一些小功能的巨大作用,直到自己实习参与了项目,才知道这些功能在一个项目中是能极大提高工作效率的开发利器 下面是一位不愿意透露其姓名的彭湖湾同学(其实就是我啦)的使用体会,我们姑且称之为W同学 1.通过关键字段全项目检索目标代码文件command+shift+F(mac)ctrl+shift+F(windows) [场景一]:W同学在tower(一个团队协作工具)上接到了一个小任务,他要在公司运营的一个站点上站点模块里,对一个公告栏的bug