简易版的生产者消费者实现业务异步事务分离

定义一个model类

/**
 * 版权所有:
 * 项目名称:
 * 创建者:
 * 创建日期: 2018年5月10日
 * 文件说明: 见类描述
 */
package com.huaxin.acws.component.message.model;

import java.util.Date;
import java.util.Map;

/**
 * @author
 *
 */
public class MessageModel {
    /**
     * 事件ID
     */
    private String eventId;
    /**
     * 事件类型
     */
    private String eventType;
    /**
     * 事件内容,保存数据库json格式,
     */
    private Map<String, Object> payload;
    /**
     * 预留,同步发送的失败消息,定时发送重试次数
     */
    private int retryCount = 0;
    /**
     * 预留,定时最后发送事件
     */
    private Date lastRetryTime = null;
    /**
     * 事件开始的事件
     */
    private Date startTime = null;

    /**
     * 有效性校验
     * @return
     */
    public boolean isValid() {
        if (this.getEventType() == null || this.payload == null) {
            return false;
        }

        return true;
    }
    /**
     * @return the eventId
     */
    public String getEventId() {
        return eventId;
    }
    /**
     * @param eventId the eventId to set
     */
    public void setEventId(String eventId) {
        this.eventId = eventId;
    }
    /**
     * @return the eventType
     */
    public String getEventType() {
        return eventType;
    }
    /**
     * @param eventType the eventType to set
     */
    public void setEventType(String eventType) {
        this.eventType = eventType;
    }
    /**
     * @return the payload
     */
    public Map<String, Object> getPayload() {
        return payload;
    }
    /**
     * @param payload the payload to set
     */
    public void setPayload(Map<String, Object> payload) {
        this.payload = payload;
    }
    /**
     * @return the retryCount
     */
    public int getRetryCount() {
        return retryCount;
    }
    /**
     * @param retryCount the retryCount to set
     */
    public void setRetryCount(int retryCount) {
        this.retryCount = retryCount;
    }
    /**
     * @return the lastRetryTime
     */
    public Date getLastRetryTime() {
        return lastRetryTime;
    }
    /**
     * @param lastRetryTime the lastRetryTime to set
     */
    public void setLastRetryTime(Date lastRetryTime) {
        this.lastRetryTime = lastRetryTime;
    }
    /**
     * @return the startTime
     */
    public Date getStartTime() {
        return startTime;
    }
    /**
     * @param startTime the startTime to set
     */
    public void setStartTime(Date startTime) {
        this.startTime = startTime;
    }
}

定义一个接口

/**
 * 版权所有:
 * 项目名称:
 * 创建者:
 * 创建日期: 2018年5月10日
 * 文件说明: 见类描述
 */
package com.huaxin.acws.component.message.service;

import com.huaxin.acws.component.message.model.MessageModel;

/**
 * 事件消息实际发送类
 *
 * @author dengxf
 */
public interface MessageHandler {
    /**
     * 是否支持消息类型处理
     *
     * @param eventType
     * @return
     */
    boolean isSupportEventType(String eventType);
    /**
     * 消息处理
     * @param message
     */
    void hander(MessageModel message);

}

定义一个抽象基类---通过2张表来做消息处理,实现补发等操作

/**
 * 版权所有:
 * 项目名称:
 * 创建者:
 * 创建日期: 2018年5月10日
 * 文件说明: 见类描述
 */
package com.huaxin.acws.component.message.service;

import java.util.Date;
import java.util.HashMap;
import java.util.Map;

import javax.annotation.Resource;

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.transaction.support.TransactionSynchronizationAdapter;
import org.springframework.transaction.support.TransactionSynchronizationManager;

import com.alibaba.fastjson.JSONObject;
import com.huaxin.acws.common.exception.AcwsGenerelException;
import com.huaxin.acws.common.util.DateUtils;
import com.huaxin.acws.component.message.dao.WfoHiEventDao;
import com.huaxin.acws.component.message.dao.WfoRuEventDao;
import com.huaxin.acws.component.message.model.MessageModel;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;

/**
 * 消息生产者基类
 * @author
 */
public abstract class AbstractMessageProducerService {
    private ExecutorService threadPool = Executors.newFixedThreadPool(5);

    /**
     * 日志记录对象
     */
    private static final Logger logger = LoggerFactory.getLogger(AbstractMessageProducerService.class);

    /**
     * 未发送事件消息
     */
    @Resource
    private WfoRuEventDao wfoRuEventDao;
    /**
     * 已发送事件消息
     */
    @Resource
    private WfoHiEventDao wfoHiEventDao;

    /**
     * 发送消息
     *
     * @param message 消息对象
     * @param handler 消息实际发送对象
     */
    protected void sendMessage(final MessageModel message, final MessageHandler handler) {
        if (!message.isValid()) {
            throw new AcwsGenerelException("消息类型或内容为空,发送失败");
        }

        message.setStartTime(new Date());

        //1、MessageModel 保存
        final Map<String, Object> record = new HashMap<String, Object>();
        record.put("EVENT_TYPE", message.getEventType());
        record.put("EVENT_PAYLOAD", JSONObject.toJSONString(message.getPayload(), true));
        record.put("RETRY_COUNT", 0);
        record.put("START_TIME", message.getStartTime());

        String messageId = wfoRuEventDao.save(record);
        message.setEventId(messageId);
        record.put("WFO_RU_EVENT_ID", messageId);

        TransactionSynchronizationManager.registerSynchronization(new TransactionSynchronizationAdapter() {
            @Override
            public void afterCommit() {
                //https://segmentfault.com/a/1190000004235193
                threadPool.execute(new Runnable() {
                    @Override
                    public void run() {
                        try {
                            //1、发送事件
                            handler.hander(message);
                            logger.info("messageId={},发送成功", message.getEventId());

                            //2、处理成功事件迁移历史表
                            record.put("END_TIME", new Date());
                            wfoHiEventDao.save(record);
                            wfoRuEventDao.deleteByPrimaryKey(message.getEventId());
                        } catch (RuntimeException e) {
                            logger.error("发送消息错误,EventId=" + message.getEventId(), e);
                        }
                    }
                });
//                try {
//                    //1、发送事件
//                    handler.hander(message);
//                    logger.info("messageId={},发送成功", message.getEventId());
//
//                    //2、处理成功事件迁移历史表
//                    record.put("END_TIME", new Date());
//                    wfoHiEventDao.save(record);
//                    wfoRuEventDao.deleteByPrimaryKey(message.getEventId());
//                } catch (RuntimeException e) {
//                    logger.error("发送消息错误,EventId=" + message.getEventId(), e);
//                }
            }
        });
    }

    /**
     * 发送成功归档
     *
     * @param messageId
     */
    protected void archiveForSuccess(String messageId) {
        Map<String, String> ruRecord = wfoRuEventDao.getByPrimaryKey(messageId);
        ruRecord.put("END_TIME", DateUtils.formatDate(new Date(), DateUtils.DATE_FORMAT_YMDHMS));

        wfoHiEventDao.save(ruRecord);
        wfoRuEventDao.deleteByPrimaryKey(messageId);
    }

    /**
     * 发送失败更新重发计数
     *
     * @param messageId
     */
    protected void updateCountForFailed(String messageId) {
        Map<String, String> messageMap = wfoRuEventDao.getByPrimaryKey(messageId);
        String retryCount = messageMap.get("RETRY_COUNT");

        Map<String, Object> record = new HashMap<String, Object>();
        record.put("WFO_RU_EVENT_ID", messageId);
        record.put("RETRY_COUNT", Integer.valueOf(retryCount) + 1);
        record.put("LAST_RETRY_TIME", new Date());

        wfoRuEventDao.save(record);
    }
}

业务实现类

/**
 * 版权所有:
 * 项目名称:
 * 创建者:
 * 创建日期: 2019年2月12日
 * 文件说明: demo
 */
package com.huaxin.gxgc.gxprocess.service;

import com.alibaba.fastjson.JSONObject;
import com.huaxin.acws.bpm.event.ProcessCompletedEvent;
import com.huaxin.acws.common.exception.AcwsGenerelException;
import com.huaxin.acws.component.message.model.MessageModel;
import com.huaxin.acws.component.message.service.AbstractMessageProducerService;
import com.huaxin.acws.component.message.service.MessageHandler;
import com.huaxin.gxgc.mq.constant.MqEventTypeEnum;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.stereotype.Service;

import java.util.HashMap;
import java.util.Map;

/**
 *
 * @author diaoby
 *
 */
@Service
public class GxMqProducerService extends AbstractMessageProducerService implements MessageHandler{

    /** 日志实例 */
    private static final Logger logger = LoggerFactory.getLogger(GxMqProducerService.class);

    /* (non-Javadoc)
     * @see com.huaxin.acws.component.message.service.MessageHandler#isSupportEventType(java.lang.String)
     * @author diaoby
     */
    @Override
    public boolean isSupportEventType(String eventType) {
        return MqEventTypeEnum.MqEventType.PROCESS_COMPLETED_EVENT_GXPROCESSDEMO_MQ.getValue().equals(eventType);
    }

    /* (non-Javadoc)
     * @see com.huaxin.acws.component.message.service.MessageHandler#hander(com.huaxin.acws.component.message.model.MessageModel)
     * @author diaoby
     */
    @Override
    public void hander(MessageModel message) {
        Map<String, Object> payload = message.getPayload();
        ProcessCompletedEvent processCompletedEvent = null;
        Object object = payload.get("PROCESS_COMPLETED_EVENT");
        if( object instanceof JSONObject) {
            //ProcessCompletedEvent中 source 不能为空,随便设置一个值
            ((JSONObject) object).put("source", "1");
            JSONObject jsonobject = (JSONObject) object;
            processCompletedEvent = JSONObject.toJavaObject(jsonobject,ProcessCompletedEvent.class);
        } else {
            processCompletedEvent = (ProcessCompletedEvent) object;
        }
        //假设前10次都不成功
        if(message.getRetryCount() >= 10) {
            //业务操作
            logger.info("appId={}", processCompletedEvent.getAppId());
            logger.info("instanceId={}" +processCompletedEvent.getInstanceId());
        }else {
            logger.error("发送消息错误,EventId=" + message.getEventId());
            throw new AcwsGenerelException("GxMqProducerService流程结束事件失败");
        }
    }
    /**
     * 流程结束后触发完成后触发mq
     * @param processCompletedEvent
     * @author diaoby
     */
    public void completedProcess(ProcessCompletedEvent processCompletedEvent){
        Map<String, Object> payload = new HashMap<String, Object>();
        payload.put("PROCESS_COMPLETED_EVENT", processCompletedEvent);
        MessageModel message = new MessageModel();
        message.setEventType(MqEventTypeEnum.MqEventType.PROCESS_COMPLETED_EVENT_GXPROCESSDEMO_MQ.getValue());
        message.setPayload(payload);
        super.sendMessage(message, this);
    }

}

本例子中 ProcessCompletedEvent 是定义的一个 流程完成事件,流程完成事件后触发后续业务,通过流程也业务分离

业务调用处

    /* (non-Javadoc)
     * @see com.huaxin.gxgc.process.service.IProcessCompleted#completedProcess(com.huaxin.acws.bpm.event.ProcessCompletedEvent)
     * @author diaoby
     */
    @Override
    public void completedProcess(ProcessCompletedEvent processCompletedEvent) {
        gxMqProducerService.completedProcess(processCompletedEvent);
    }

原文地址:https://www.cnblogs.com/diaobiyong/p/11350766.html

时间: 2024-08-16 09:16:44

简易版的生产者消费者实现业务异步事务分离的相关文章

springboot学习入门简易版七---springboot2.0使用@Async异步执行方法(17)

1启动类开启异步调用注解 @SpringBootApplication @EnableAsync //开启异步调用 public class StartApplication { 不开启则异步调用无效 2编写异步调用方法 @RestController public class AsyncController { private final static Logger logger=LoggerFactory.getLogger(WebLogAspect.class); @Autowired p

生产者消费者模型实现多线程异步交互

[Python之旅]第六篇(五):生产者消费者模型实现多线程异步交互 消息队列 生产者消费者模型 多线程异步交互 摘要:  虽然标题是"生产者消费者模型实现多线程异步交互",但这里要说的应该还包括Python的消息队列,因为这里多线程异步交互是通过Python的消息队列来实现的,因此主要内容如下: 1 2 3 4 1.生产者消费者模型:厨师做包子与顾客吃包子 2.Python的消息队列 3.利用... 虽然标题是"生产者消费者模型实现多线程异步交互",但这里要说的应

生产者消费者问题——C++ windows版 多生产者多消费者的队列实现

最进要写一个多线程加载资源的资源管理器(ResourceManager)和多线程音频解码器(MultiThread Decoder).因为距最近一次用到多线程放下好久了,所以今天把生产者消费者问题练一下手. 为什么选择生产者消费者问题,因为他比较接近资源管理器和多线程音频解码器的原型. 比如,对于音频解码器,音频线程去流式的解码一段MP3格式的内存,就类似生产者生产产品的过程;而音频播放API(如OpenAL,OpenSL)通常需要的是PCM数据,也就是生产者生产的产品,所以播放逻辑充当消费者的

MVC5+EF6 简易版CMS(非接口) 第三章:数据存储和业务处理

目录 简易版CMS后台管理系统开发流程 MVC5+EF6 简易版CMS(非接口) 第一章:新建项目 MVC5+EF6 简易版CMS(非接口) 第二章:建数据模型 MVC5+EF6 简易版CMS(非接口) 第三章:数据存储和业务处理 MVC5+EF6 简易版CMS(非接口) 第四章:使用业务层方法,以及关联表解决方案 先来了解下各项的引用关系 FytCms.DALMSSQL=>Domain.Entity.EntityFramework BusinessLogic.Server=>FytCms.D

MVC5+EF6 简易版CMS(非接口) 第四章:使用业务层方法,以及关联表解决方案

目录 简易版CMS后台管理系统开发流程 MVC5+EF6 简易版CMS(非接口) 第一章:新建项目 MVC5+EF6 简易版CMS(非接口) 第二章:建数据模型 MVC5+EF6 简易版CMS(非接口) 第三章:数据存储和业务处理 MVC5+EF6 简易版CMS(非接口) 第四章:使用业务层方法,以及关联表解决方案 上一章介绍了,如何建数据层和业务,以及各层之间的引用过关系 这章主要讲解怎么使用业务层的方法. 以及普遍遇到的EF关联查询的问题解决方案 1.在FytMsys.Web文件夹下建Fyt

使用 LinkedBlockingQueue 实现简易版线程池

前一阵子在做联系人的导入功能,使用POI组件解析Excel文件后获取到联系人列表,校验之后批量导入.单从技术层面来说,导入操作通常情况下是一个比较耗时的操作,而且如果联系人达到几万.几十万级别,必须拆分成为子任务来执行.综上,可以使用线程池来解决问题.技术选型上,没有采用已有的 ThreadPoolExecutor 框架,而使用了自制的简易版线程池.该简易版的线程池,其实也是一个简易版的[生产者-消费者]模型,任务的加入就像是生产的过程,任务的处理就像是消费的过程.我们在这里不去讨论方案的合理性

生产者/消费者问题的多种Java实现方式--转

实质上,很多后台服务程序并发控制的基本原理都可以归纳为生产者/消费者模式,而这是恰恰是在本科操作系统课堂上老师反复讲解,而我们却视而不见不以为然的.在博文<一种面向作业流(工作流)的轻量级可复用的异步流水开发框架的设计与实现>中将介绍一种生产者/消费者模式的具体应用. 生产者消费者问题是研究多线程程序时绕不开的经典问题之一,它描述是有一块缓冲区作为仓库,生产者可以将产品放入仓库,消费者则可以从仓库中取走产品.解决生产者/消费者问题的方法可分为两类:(1)采用某种机制保护生产者和消费者之间的同步

Android学习之路——简易版微信为例(二)

1 概述 从这篇博文开始,正式进入简易版微信的开发.深入学习前,想谈谈个人对Android程序开发一些理解,不一定正确,只是自己的一点想法.Android程序开发不像我们在大学时候写C控制台程序那样,需要从main开始写代码逻辑,大部分逻辑控制代码都由自己来实现.事实上,Android已经为我们提供了一个程序运行的框架,我们只需要往框架中填入我们所需的内容即可,这里的内容主要是:四大组件——Activity.Service.ContentProvider.BroadCast.在这四大组件中,可以

生产者/消费者问题的多种Java实现方式

实质上,很多后台服务程序并发控制的基本原理都可以归纳为生产者/消费者模式,而这是恰恰是在本科操作系统课堂上老师反复讲解,而我们却视而不见不以为然的.在博文<一种面向作业流(工作流)的轻量级可复用的异步流水开发框架的设计与实现>中将介绍一种生产者/消费者模式的具体应用. 生产者消费者问题是研究多线程程序时绕不开的经典问题之一,它描述是有一块缓冲区作为仓库,生产者可以将产品放入仓库,消费者则可以从仓库中取走产品.解决生产者/消费者问题的方法可分为两类:(1)采用某种机制保护生产者和消费者之间的同步