背景:
我们的组件(简称A),在业务链中属于数据支撑节点。其中与组件B存在接口同步数据的直接关系(API接口直接调用进行数据交互)
问题:
我们的上游有另一个组件C(带有界面),调用A(us)进行数据的变更操作,此时需要A调用B服务接口进行同步,问题出在这里,C调用
A通常速度比较快,比较稳定,但是A调用B经常超时或者失败,网络原因or 组件B自己的设计原因吧,反正是推不动
方案:经沟通考察,这条数据的变更在可接受的时间范围只要最终一致即可,于是首先,我们先将事物中的调用B服务的一系列逻辑抽出来,
做成异步的,让C操作数据后能及时返回,在这个异步调用B服务接口同步过程中,出现异常时自动记录这次接口调用失败的日志,再开一个
Worker线程任务去轮询调用
设计:
1、第三方接口调用中,涉及增,删,改的逻辑脱离事物管理,异步执行
2、接口调用后出现异常,记录下该方法调用的详细日志到数据库表中
3、开启一个单独的任务轮询改表中待重试状态的记录,依次重试,重试成功或失败,均更改状态,对于重试若干次仍然失败的,界面上展示,通知排查
实现:
接口的异步调用就不讲了,springboot的异步方案很多,这里主要讲异常日志如何自动入库,并提供补偿
1、日志获取思路
(1)调用B服务的api接口异常时,需要抛出具体的异常,这里假设叫BusinessException,该异常继承RuntimeException,异常中可以带出错误码,错误描述等信息
(2)自定义收集日志的注解,作用在方法上,收集日志
(3)异常信息入库,注意使用摘要加密保证唯一性
2、单独开启一个线程处理收集的状态为待重试的记录,对调用失败的进行retry
编码:
1、自定义注解
import java.lang.annotation.ElementType; import java.lang.annotation.Retention; import java.lang.annotation.RetentionPolicy; import java.lang.annotation.Target; /** * 自定义注解-处理调用第三方接口数据交互异常时日志收集 * <p> * RetentionPolicy.RUNTIME JVM在运行期也保留注解,可以通过反射机制读取注解信息 * ElementType.METHOD 方法上生效 */ @Retention(RetentionPolicy.RUNTIME) @Target(ElementType.METHOD) public @interface ExceptionCollect { String value() default ""; String beanName() default ""; }自定义的属性可自行调整
2、定义注解的方法签名
import org.aspectj.lang.annotation.Pointcut; /** * description 公用的pointCut * 定义各个pointCut方法签名*/ public class PointCuts {/** * 切入点:注解@ExceptionCollect */ @Pointcut("@annotation(com.xxx.config.aop.ExceptionCollect)") public void exceptionPointCut() { } }
3、定义具体的切面类执行逻辑
/** * description 切面类 * 打印日志,采集异常日志入库 * @see ExceptionCollect*/ @Slf4j @Aspect @Component public class AspectService { @Autowired private ExceptionLogService exceptionLogService;/** * 方法上注解@ExceptionCollect,抛出异常后将收集日志信息入库 * * @param point 切面 * @param e 抛出的异常 */ @AfterThrowing(value = "com.xxx.config.aop.PointCuts.exceptionPointCut()", throwing = "e") public void afterThrowing(JoinPoint point, BusinessException e) { MethodInfo methodInfo = new MethodInfo(point); ExceptionLog exceptionLog = convert(methodInfo, e); String beanName = methodInfo.getMethod().getAnnotation(ExceptionCollect.class).beanName(); exceptionLog.setBeanName(beanName); //保证幂等性 ExceptionLog oldLog = exceptionLogService.findById(exceptionLog.getId()); //新记录为待重试 if (oldLog == null) { exceptionLog.setRetryCount(1); exceptionLog.setStatus(ExceptionStatusEnum.RETRY.getType()); exceptionLogService.create(exceptionLog); } else { //已有记录,说明是重试后仍失败 oldLog.setRetryCount(oldLog.getRetryCount() + 1); oldLog.setStatus(ExceptionStatusEnum.FAIL.getType()); oldLog.setUpdateTime(CommonUtils.localDateTimeNow()); exceptionLogService.update(oldLog); } } /** * @param methodInfo * @param e * @return */ private ExceptionLog convert(MethodInfo methodInfo, BusinessException e) { methodInfo.init(); ExceptionLog exceptionLog = new ExceptionLog(); exceptionLog.setClassName(methodInfo.getClassName()); exceptionLog.setMethodName(methodInfo.getMethodName()); exceptionLog.setJsonArgs(methodInfo.getJsonArgs()); exceptionLog.setErrorCode(e.getCode()); exceptionLog.setErrorMsg(e.getMessage()); exceptionLog.setCreateTime(CommonUtils.localDateTimeNow()); exceptionLog.setUpdateTime(CommonUtils.localDateTimeNow()); //唯一键 String id = Md5Util.getStrMD5(methodInfo.getClassName() + methodInfo.getMethodName() + methodInfo.getJsonArgs()); exceptionLog.setId(id); return exceptionLog; } }
4、补上异常日志的实体
import lombok.Data; import java.io.Serializable; import java.time.LocalDateTime; /** * description 接口异常日志实体 */ @Data public class ExceptionLog implements Serializable { private static final long serialVersionUID = 1L; /** * 根据类名,方法名,入参生成一个摘要值 */ private String id; /** * 类型(定义处理方式,定时补偿或人工补偿) */ private Integer type; /** * 处理状态 */ private Integer status; /** * 重试次数 */ private Integer retryCount; /** * 错误码 */ private String errorCode; /** * 错误描述 */ private String errorMsg; /** * 完整类名 */ private String className; /** * bean名 */ private String beanName; /** * 方法名 */ private String methodName; /** * 方法入参 */ private String jsonArgs; /** * 备注 */ private String remark; /** * 创建时间 */ private LocalDateTime createTime; /** * 修改时间 */ private LocalDateTime updateTime; }
5、辅助类
package com.hikvision.idatafusion.dglist.config.aop; import com.alibaba.fastjson.JSONObject; import com.alibaba.fastjson.serializer.SerializerFeature; import lombok.Data; import org.aspectj.lang.JoinPoint; import org.aspectj.lang.reflect.MethodSignature; import java.lang.reflect.Method; /** * description 接口方法信息 * aop中获取某个方法的参数信息*/ @Data public class MethodInfo { /** * 切入点信息 */ private JoinPoint joinPoint; /** * 方法签名 */ private MethodSignature signature; /** * 方法信息 */ private Method method; /** * 类信息 */ private Class<?> targetClass; /** * 参数信息 */ private Object[] args; /** * 参数信息String */ private String jsonArgs; /** * 类名 */ private String className; /** * 方法名 */ private String methodName; public MethodInfo(JoinPoint joinPoint) { this.joinPoint = joinPoint; } public void init() { this.signature = (MethodSignature) joinPoint.getSignature(); this.method = signature.getMethod(); this.methodName = method.getName(); this.targetClass = method.getDeclaringClass(); this.className = targetClass.getName(); this.args = joinPoint.getArgs(); this.jsonArgs = JSONObject.toJSONString(args, SerializerFeature.WriteMapNullValue, SerializerFeature.WriteNullStringAsEmpty); } }
接下来,我们在具体的调用外部API接口的方法上加上注解@ExceptionCollect(beanName = "xxxService")
6、api接口调用方法
@Override @ExceptionCollect(beanName = "xxxService") @Retryable(value = {BusinessException.class}, maxAttempts = 5, backoff = @Backoff(delay = 5000, multiplier = 2)) public void test(TestBean person) { String url= getUrl();//api完整请求路径 String result; try { result = HttpUtils.post(url, param, header).body(); } catch (HttpException e) { log.error("post API test failed!", e); throw new BusinessException(ErrorCodeEnum.API_INTERFACE_EXCEPTION.getCode()); } //解析请求结果的逻辑简化如下 Result result = JSON.parseObject(result, new TypeReference<Result>() { }); if (ErrorCodeEnum.SUCCESS.getCode().equals(result.getCode())) { log.info("XFACE add person success!"); } else { log.info("call API:test exception,code={},msg={}", result.getCode(), result.getMsg()); throw new BusinessException(ErrorCodeEnum.ADD_PERSON_EXCEPTION.getCode()); } }
在这里,标注了@ExceptionCollect的方法test()会在exceptionPointCut()方法签名的切入点被切入
如果执行中抛出异常,由AspectService 类中,标注了@AfterThrowing的afterThrowing()方法来处理异常要做的逻辑
这里我们对异常的执行做了四种状态:-100(失败),-1(取消),0(待重试),100(成功)
初次入库的记录均为待重试(0),在重试了若干次仍失败后改为-100,成功改为100
@Retryable,定义改方法如果抛出异常,自动重试,最大重试5次,下一次重试执行与上一次间隔按倍数(2)增加,5s,10s,20s.......重试
7、ExceptionWorker线程轮询补偿调用
/** * description 接口调用异常work线程补偿 * 服务启动后定时扫描t_exception_log表status=0的记录 * 间隔5分钟*/ @Component @Slf4j public class ExceptionWorkerTask { @Autowired private ExceptionLogService exceptionLogService; @Autowired private xxxService xxxService; /** * 任务只重试status=0的 * 每隔5分钟一次,每次每条记录重试1次 */ @Scheduled(initialDelay = 10000, fixedDelay = 300000) public void retry() { List<ExceptionLog> list = exceptionLogService.getRetryList(); for (ExceptionLog e : list) { String methodName = e.getMethodName(); String jsonArgs = e.getJsonArgs(); JSONObject argsJo = JSON.parseObject(jsonArgs); xxxService.test(argsJo);//如果调用成功,更新状态为 e.setRetryCount(e.getRetryCount()+1); e.setStatus(ExceptionStatusEnum.CONFIRM.getType()); e.setUpdateTime(CommonUtils.localDateTimeNow()); exceptionLogService.update(e); } } }
设计缺点:
1、不通用,业务耦合比较强
2、ExceptionLog的定义还待改进
3、重试的机制还可以设计得更复杂点,初步设计是有人工重试的情景
原文地址:https://www.cnblogs.com/yb38156/p/12013309.html