Quartz定时向阿里云MQ发送数据(TCP模式)

针对公司业务逻辑,向阿里云MQ发送指定数据,消费端根据数据来做具体的业务,分两个项目,一个生产端(Producer)、一个消费端(Consumer)

生产端通过定时任务执行sql向阿里云MQ发送数据,消费端消费指定Topic上的数据

1:定时任务列表:

2:生产端表结构:

aliasName:定时任务别名;

cronExpression:定时任务轮询规则;

jobGroup:定时任务分组;

jobName:定时任务名称;

jobTrigger:定时任务触发器;

packageUrl:定时任务扫描具体封装类;

excuteSql:扫描类中执行的获取数据的脚本;

lastPramaryKey:最后一次获取数据时最大的主键;

topic:阿里云MQ的topic;

producerId:生产端的Id;

accessKey、securityKey:账号跟秘钥

dataBaseType:操作数据库类型(公司数据库类型比较多,执行脚本时,需要根据类型来指定具体的Service)

3:Java端核心代码,定时任务扫描如下配置的任务类来向阿里云MQ发送数据

public class SendPrimaryKeyListToMqTask implements Job{

    private final Logger logger = LoggerFactory.getLogger(SendPrimaryKeyListToMqTask.class);

    public void execute(JobExecutionContext context) throws JobExecutionException{
        JobDataMap data = context.getJobDetail().getJobDataMap();
        ScheduleJob scheduleJob = (ScheduleJob)data.get("jobParam");

        //最后一次获取数据时最大的主键
        int lastPramaryKey = scheduleJob.getLastPramaryKey();

        //执行sql
        String excuteSql = scheduleJob.getExcuteSql();
        excuteSql = excuteSql.replace("lastPramaryKey", String.valueOf(lastPramaryKey));

        //操作数据库类型(数据库配置)
        int dataBaseType = scheduleJob.getDataBaseType();

        //从游戏库获取数据
        LinkedList<ExcuteResultData> resultData = new LinkedList<ExcuteResultData>();
        if( dataBaseType == 1 ){
            GameService gameService = (GameService)SpringBeanFactory.getBean(GameService.class);
            resultData = gameService.getExcuteResultData(excuteSql);
        //从网站库获取数据
        }else if( dataBaseType == 2 ){
             SiteService siteService = (SiteService)SpringBeanFactory.getBean(SiteService.class);
             resultData = siteService.getExcuteResultData(excuteSql);
        }

        if ( resultData.size() > 0 ){
            scheduleJob.setPrimaryKeyList(resultData);
            QuartzService quartzService = (QuartzService)SpringBeanFactory.getBean(QuartzService.class);
            //将数据集中最大的主键更新
            scheduleJob.setLastPramaryKey(resultData.getLast().getPrimaryKey());
            quartzService.updateLastPramaryKey(scheduleJob);

            String topic = scheduleJob.getTopic();
            String producerId = scheduleJob.getProducerId();
            String ak = scheduleJob.getAccessKey();
            String sk = scheduleJob.getSecurityKey();

            //添加日志
            ScheduleJobLog scjLog = new ScheduleJobLog();
            scjLog.setDataSize(resultData.size());
            scjLog.setJobName(scheduleJob.getJobName());
            scjLog.setTopic(topic);
            int scjLogId = quartzService.addMqScheduleJobLog(scjLog);
            //消费端根据此日志主键更新日志状态
            scheduleJob.setScjLogId(scjLogId);

            Properties properties = new Properties();
            properties.put("ProducerId", producerId);
            properties.put("AccessKey", ak);
            properties.put("SecretKey", sk);
            Producer producer = ONSFactory.createProducer(properties);
            producer.start();

            Message msg = new Message(topic, "PRIMARY_KEY_" + String.valueOf(scjLogId), ObjectsTranscoder.serialize(scheduleJob));
            msg.setKey("PRIMARY_KEY_" + String.valueOf(scjLogId));
            SendResult sendResult = producer.send(msg);
            if ( ( sendResult != null ) && ( sendResult.getMessageId() != null ) ){
                scjLog.setMessageId(sendResult.getMessageId());
                scjLog.setStatus(2);
                quartzService.updateMqScheduleJobLog(scjLog);
            }

            producer.shutdown();

            logger.debug("=====>任务名称:" + scheduleJob.getJobName());
            logger.debug("=====>发送条数:" + resultData.size());
            logger.debug("=====>发送主键内容:" + resultData.toString());

        }
    }
}

4:消费端表结构:

5:消费端Java核心代码(通过监听器来做):

import java.util.List;
import java.util.Properties;
import javax.servlet.ServletContextEvent;
import javax.servlet.ServletContextListener;
import org.springframework.web.context.WebApplicationContext;
import org.springframework.web.context.support.WebApplicationContextUtils;
import com.aliyun.openservices.ons.api.Action;
import com.aliyun.openservices.ons.api.ConsumeContext;
import com.aliyun.openservices.ons.api.Consumer;
import com.aliyun.openservices.ons.api.Message;
import com.aliyun.openservices.ons.api.MessageListener;
import com.aliyun.openservices.ons.api.ONSFactory;
import com.aliyun.openservices.ons.api.PropertyKeyConst;
import com.odao.common.utils.ObjectsTranscoder;
import com.odao.entity.ScheduleJob;
import com.odao.entity.ScheduleJobLog;
import com.odao.service.consumer.ConsumerService;
import com.odao.service.message.MessageService;

/**
 * 阿里云游戏、网站 主键数据集消费监听器
 */
public class ConsumePrimaryKeyFromMqListener implements ServletContextListener {

    @Override
    public void contextInitialized(ServletContextEvent sce) {
        WebApplicationContext appctx = WebApplicationContextUtils.getWebApplicationContext(sce.getServletContext());
        MessageService messageService = (MessageService) appctx.getBean(MessageService.class);
        List<ScheduleJob> consumeList = messageService.getScheduleJobList();
        for(final ScheduleJob sjc : consumeList){

            String topic = sjc.getTopic();
            String consumerId= sjc.getConsumerId();
            String ak = sjc.getAccessKey();
            String sk = sjc.getSecurityKey();

            Properties properties = new Properties();
            properties.put(PropertyKeyConst.ConsumerId,consumerId);
            properties.put(PropertyKeyConst.AccessKey,ak);
            properties.put(PropertyKeyConst.SecretKey,sk);
            //properties.put(PropertyKeyConst.ONSAddr,"http://onsaddr-internal.aliyun.com:8080/rocketmq/nsaddr4client-internal");

            Consumer consumer = ONSFactory.createConsumer(properties);

            consumer.subscribe(topic, "*", new MessageListener() {
                @Override
                public Action consume(Message message, ConsumeContext context) {
                    ScheduleJob scheduleJob = (ScheduleJob) ObjectsTranscoder.deserialize(message.getBody());
                    if( scheduleJob !=null ){
                        //更新消息状态为3:消费消息成功
                        ScheduleJobLog scjLog = new ScheduleJobLog();
                        scjLog.setStatus(3);
                        scjLog.setMqScheduleJobLogId(scheduleJob.getScjLogId());
                        messageService.updateMqScheduleJobLog(scjLog);
                        try {
                            ConsumerService consumerService = (ConsumerService) Class.forName(sjc.getImplementClass()).newInstance();
                            boolean isSuccess = consumerService.consume(scheduleJob.getPrimaryKeyList());
                            if(isSuccess){
                                //更新消息状态为4:业务逻辑处理成功
                                scjLog.setStatus(4);
                                messageService.updateMqScheduleJobLog(scjLog);
                            }
                        } catch (InstantiationException e) {
                            e.printStackTrace();
                        } catch (IllegalAccessException e) {
                            e.printStackTrace();
                        } catch (ClassNotFoundException e) {
                            e.printStackTrace();
                        }
                    }
                    return Action.CommitMessage;
                }
            });

            consumer.start();
        }
    }

    @Override
    public void contextDestroyed(ServletContextEvent sce) {

    }
}
时间: 2024-11-03 12:26:18

Quartz定时向阿里云MQ发送数据(TCP模式)的相关文章

[SQL]阿里云RDS设置MSSQL恢复模式为“简单”

-- 取消数据库镜像ALTER DATABASE <database_name> SET PARTNER OFF-- 设置数据库镜像RESTORE DATABASE <database_name> WITH RECOVERY-- 设置恢复模式为简单alter database <database_name> set recovery simple   [SQL]阿里云RDS设置MSSQL恢复模式为"简单"

阿里云批量发送短信功能测试

package com.yongjie.ZhiJianSbpt.sms; import java.text.SimpleDateFormat; import java.util.Date; import com.aliyuncs.DefaultAcsClient; import com.aliyuncs.IAcsClient; import com.aliyuncs.dysmsapi.model.v20170525.QuerySendDetailsRequest; import com.aliy

使用mysqldump命令从阿里云备份数据库数据至本地

注:因为需要从阿里云云服务器备份数据库数据至本地(个人认为如非迁移服务器,此项操作显得有些多余) 又注:谢谢阿里云的帮助文档 1, 安装mysql命令行客户端 2,打开命令行提示符窗口,输入并执行如下命令: mysqldump -hhmd-021.my3w.com -uhdm0215246 -phdmxx00101 --default-character-set=utf8 bdm0254685_db > e:\backup\bdm0388542_db_utf8.sql

阿里云数据库表数据误删恢复

在日常对数据库的直接操作中,稍微有一点不小心,就可能造成数据的丢失!此时数据能否恢复以及如何恢复就成了至关重要的问题,本文讨论恢复阿里云数据库数据的几种方法 环境: 阿里云数据库RDS版 数据库market 被误删的表market_user 方法一: 进入到阿里云RDS管理控制台,点击左侧栏的"备份与恢复" 选择最近的一次备份恢复 后续操作参考阿里云文档操作即可! 方法二: 进入阿里云RDS管理控制台,登录到market数据库中.如果上次对误删的表进行过查询,并且没有关掉sql窗口,再

解决阿里云邮件发送不能使用25端口问题

本地测试发邮件功能很流畅,部署到阿里云上以后发现总是NOT FIND,这就很奇怪.开始以为是url写错了导致的,检查N多遍发现完全一毛一样的.后来各种百度,发现是因为阿里云禁用了25端口导致的.查看各种资料,解决的办法五花八门.试了几种解决方案,都解决了问题.现在整理如下:(不用去尝试申请解禁25端口的,可以很认真负责的告诉你,完全没有卵用) 首先,是阿里大大给的官方的解决方案,用SMTP发送邮件:代码如下 # -*- coding:utf-8 -*- import urllib, urllib

阿里云服务器 发送邮箱 STMP 25端口 465端口问题 Javamail 25被禁用

我们传统使用的比较简单的是 STMP 25端口收发邮件 今天发现刚购买的阿里云服务器不能作为客户端通过STMP 25端口发送邮件 开始在网上有说发现是JDK1.8的原因,然后自己也把JDK1.8换到了JDK1.7 还是不行,所以这里排除了JDK的原因. 那么问题来了,是否25端口不能连接适用喃? 然后在终端输入命令行 ,可测试25端口是否可连接. telnet smtp.163.com 25 返回成功 说明你的服务器 是没有封掉25端口的.上面的图是我自己的电脑,当然能连通. 当换到服务器测试的

在阿里云配置URL的REWRITE模式

今天买了阿里云的虚拟主机,然后把我的项目上传. 网站首页能够访问,但是点开任何页面,都是提示No input file specified,猜测应该是URL重写没有生效,我在浏览器地址把index.php加上去,就可以访问子页面了. 解决方法是: 1.关于URL重写,与ThinkPHP手册不同个地方,是入口文件的同级.htaccess文件中,index.php后面多个斜杠,如下面的代码 <IfModule mod_rewrite.c> Options +FollowSymlinks Rewri

阿里云 linux 挂载数据盘

1,查看数据盘 命令: fdisk -l 2,进行分区 命令:fdisk /dev/xvdb 注释:根据提示,依次输入"n"."p"."1".两次回车,"w",分区开始. 3,查看新的分区 命令:fdisk -l 4,格式化分区 命令: mkfs.ext3 /dev/xvdb1 注意: 格式化时间根据数据盘大小而定 5,添加分区信息 命令:echo '/dev/xvdb1              /data        e

阿里云RDS恢复数据到本地上

1.先从RDS下载实例备份到本地 下载外网和内网根据你实际的需求环境来.同个内网服务器下载就少点下载流量花费. 2.解压备份文件执行如下命令,下载数据备份文件. wget -c '<数据备份文件外网下载地址>' -O <自定义文件名>.tar.gz参数说明: -c:启用断点续传模式. -O:将下载的结果保存为指定的文件(使用URL中包含的文件名后缀 .tar.gz 或者 .xb.gz). 说明:若提示显示100%进度,则表示文件下载完成. 将下载的数据备份恢复到本地MySQL数据库