基于Redis实现分布式消息队列(4)

1、访问Redis的工具类

public class RedisManager {

    private static Pool<Jedis> pool;

    protected final static Logger logger = Logger.getLogger(RedisManager.class);

    static{
        try {
            init();
        } catch (Exception e) {
            e.printStackTrace();
        }
    }

    public static void init() throws Exception {

        Properties props = ConfigManager.getProperties("redis");
        logger.debug("初始化Redis连接池。");
        if(props==null){
            throw new RuntimeException("没有找到redis配置文件");
        }
        // 创建jedis池配置实例
        JedisPoolConfig jedisPoolConfig = new JedisPoolConfig();
        // 设置池配置项值
        int poolMaxTotal = Integer.valueOf(props.getProperty("redis.pool.maxTotal").trim());
        jedisPoolConfig.setMaxTotal(poolMaxTotal);

        int poolMaxIdle = Integer.valueOf(props.getProperty("redis.pool.maxIdle").trim());
        jedisPoolConfig.setMaxIdle(poolMaxIdle);

        long poolMaxWaitMillis = Long.valueOf(props.getProperty("redis.pool.maxWaitMillis").trim());
        jedisPoolConfig.setMaxWaitMillis(poolMaxWaitMillis);

        logger.debug(String.format("poolMaxTotal: %s , poolMaxIdle : %s , poolMaxWaitMillis : %s ",
                poolMaxTotal,poolMaxIdle,poolMaxWaitMillis));

        // 根据配置实例化jedis池
        String connectMode = props.getProperty("redis.connectMode");
        String hostPortStr = props.getProperty("redis.hostPort");

        logger.debug(String.format("host : %s ",hostPortStr));
        logger.debug(String.format("mode : %s ",connectMode));

        if(StringUtils.isEmpty(hostPortStr)){
            throw new OptimusException("redis配置文件未配置主机-端口集");
        }
        String[] hostPortSet = hostPortStr.split(",");
        if("single".equals(connectMode)){
            String[] hostPort = hostPortSet[0].split(":");
            pool = new JedisPool(jedisPoolConfig, hostPort[0], Integer.valueOf(hostPort[1].trim()));
        }else if("sentinel".equals(connectMode)){
            Set<String> sentinels = new HashSet<String>();
            for(String hostPort : hostPortSet){
                sentinels.add(hostPort);
            }
            pool = new JedisSentinelPool("mymaster", sentinels, jedisPoolConfig);
        }
    }

    /**
     * 使用完成后,必须调用 returnResource 还回。
     * @return 获取Jedis对象
     */
    public static Jedis getResource(){
        Jedis jedis = pool.getResource();
        if(logger.isDebugEnabled()){
            logger.debug("获得链接:" + jedis);
        }
        return jedis;
    }

    /**
     * 获取Jedis对象。
     *
     * 用完后,需要调用returnResource放回连接池。
     *
     * @param db 数据库序号
     * @return
     */
    public static Jedis getResource(int db){
        Jedis jedis = pool.getResource();
        jedis.select(db);
        if(logger.isDebugEnabled()){
            logger.debug("获得链接:" + jedis);
        }
        return jedis;
    }

    /**
     * @param jedis
     */
    public static void returnResource(Jedis jedis){
        if(jedis!=null){
            pool.returnResource(jedis);
            if(logger.isDebugEnabled()){
                logger.debug("放回链接:" + jedis);
            }
        }
    }

    /**
     * 需要通过Spring确认这个方法被调用。
     * @throws Exception
     */
    public static void destroy() throws Exception {
        pool.destroy();
    }
}

这个类没有通过技术手段强制调用returnResource和destroy,需要想想办法。

2、队列接口

public interface TaskQueue {

    /**
     * 获取队列名
     * @return
     */
    String getName();

    /**
     * 往队列中添加任务
     * @param task
     */
    void pushTask(String task);

    /**
     * 从队列中取出一个任务
     * @return
     */
    String popTask();

}

用String类型描述任务,也可以考虑byte[],要求对每个任务描述的数据尽可能短。

3、队列的Redis实现类

/**
 * 任务队列Redis实现。
 *
 * 采用每次获取Jedis并放回pool的方式。
 * 如果获得Jedis后一直不放手,反复重用,两个操作耗时可以降低1/3。
 * 暂时先忍受这种低性能,不明确Jedis是否线程安全。
 *
 */
public class TaskQueueRedisImpl implements TaskQueue {

    private final static int REDIS_DB_IDX = 9;

    private final static Logger logger = Logger.getLogger(TaskQueueRedisImpl.class);

    private final String name; 

    /**
     * 构造函数。
     *
     * @param name
     */
    public TaskQueueRedisImpl(String name) {
        this.name = name;
    }

    /* (non-Javadoc)
     * @see com.gwssi.common.mq.TaskQueue#getName()
     */
    public String getName() {
        return this.name;
    }
    /* (non-Javadoc)
     * @see com.gwssi.common.mq.TaskQueue#pushTask(String)
     */
    public void pushTask(String task) {
        Jedis jedis = null;
        try{
            jedis = RedisManager.getResource(REDIS_DB_IDX);
            jedis.lpush(this.name, task);
        }catch(Throwable e){
            logger.error(e.getMessage(),e);
        }finally{
            if(jedis!=null){
                RedisManager.returnResource(jedis);
            }
        }
    }

    /* (non-Javadoc)
     * @see com.gwssi.common.mq.TaskQueue#popTask()
     */
    public String popTask() {
        Jedis jedis = null;
        String task = null;
        try{
            jedis = RedisManager.getResource(REDIS_DB_IDX);
            task = jedis.rpop(this.name);
        }catch(Throwable e){
            logger.error(e.getMessage(),e);
        }finally{
            if(jedis!=null){
                RedisManager.returnResource(jedis);
            }
        }
        return task;
    }

}

4、获取队列实例的工具类

/**
 * <pre>
 *  // 获得队列
 *  TaskQueue tq = TaskQueueManager.get(TaskQueueManager.SMS_QUEUE);
 *
 *  // 添加任务到队列
 *  String task = "task id";
 *  tq.pushTask(task);
 *
 *  // 从队列中取出任务执行
 *  String taskToDo = tq.popTask();
 * </pre>
 * @author liuhailong
 */
public class TaskQueueManager {

    protected final static Logger logger = Logger.getLogger(TaskQueueManager.class);

    private static Map<String, TaskQueueRedisImpl> queneMap = new ConcurrentHashMap<String, TaskQueueRedisImpl>();

    /**
     * 短信队列名。
     */
    public static final String SMS_QUEUE = "SMS_QUEUE";

    /**
     * 规则队列名。
     */
    public static final String RULE_QUEUE = "RULE_QUEUE";

    private static void initQueneMap() {
        logger.debug("初始化任务队列...");
        queneMap.put(RULE_QUEUE, new TaskQueueRedisImpl(RULE_QUEUE));
        logger.debug("建立队列:"+RULE_QUEUE);
        queneMap.put(SMS_QUEUE, new TaskQueueRedisImpl(SMS_QUEUE));
        logger.debug("建立队列:"+SMS_QUEUE);
    }

    static {
        initQueneMap();
    }

    public static TaskQueue get(String name){
        return getRedisTaskQueue(name);
    }

    public static TaskQueue getRedisTaskQueue(String name){
        return queneMap.get(name);
    }

}

和具体的队列过于紧耦合,但简单好用。

先跑起来再说。

5、向队列中添加任务的代码

TaskQueue tq = TaskQueueManager.get(TaskQueueManager.SMS_QUEUE);
tq.pushTask(smsMessageId);

6、从队列中取出任务执行的代码

public class SmsSendTask{

    protected final static Logger logger = Logger.getLogger(SmsSendTask.class);

    protected static SmsSendService smsSendService = new SmsSendServiceUnicomImpl();
    /**
     * 入口方法。
     */
    public void execute()  {
        TaskQueue taskQueue = null;
        String task = null;
        try {
            taskQueue = TaskQueueManager.get(TaskQueueManager.SMS_QUEUE);

            // 非线程安全
            Set<Serializable> executedTaskSet = new HashSet<Serializable>();

            task = taskQueue.popTask();
            while(task!=null){
                // 判断是否把所有任务都执行一遍了,避免死循环
                if(executedTaskSet.contains(task)){
                    taskQueue.pushTask(task);
                    break;
                }

                executeSingleTask(taskQueue,task);

                task = taskQueue.popTask();
            }
        }catch(Throwable e){
            logger.error(e.getMessage(),e);
            e.printStackTrace();
        }
    }

    /**
     * 发送单条短信。
     *
     * 取出任务并执行,如果失败,放回任务列表。
     *
     * @param taskQueue
     * @param task
     */
    @SuppressWarnings({ "rawtypes", "unchecked" })
    private void executeSingleTask(TaskQueue taskQueue, String task) {
        try {
            // do the job
            String smsId = task;
            Map<String,String> sms = smsSendService.getSmsList(smsId);

            smsSendService.send(sms);

            smsSendService.updateSmsStatus(task,SmsSendService.STATUS_SENT);

            String opType = "2";
            TaskQueueUtil.taskLog(taskQueue.getName(), opType, task);
        } catch (Throwable e) {
            if(task!=null){
                taskQueue.pushTask(task);
                smsSendService.updateSmsStatus(task,SmsSendService.STATUS_WAIT);
                if(logger.isDebugEnabled()){
                    logger.error(String.format("任务%s执行失败:%s,重新放回队列", task, e.getMessage()));
                }
            }else {
                e.printStackTrace();
            }
        }
    }

}

这部分代码是固定模式,而且不这样做存在重大缺陷,会有任务执行失败,被丢弃,这部分代码应该写到队列实现中。

有空再改。

时间: 2024-12-17 05:08:45

基于Redis实现分布式消息队列(4)的相关文章

基于Redis实现分布式消息队列(汇总目录)

基于Redis实现分布式消息队列(1)– 缘起 http://blog.csdn.net/stationxp/article/details/45595733 基于Redis实现分布式消息队列(2)– 分布式消息队列功能设计 http://blog.csdn.net/stationxp/article/details/45596619 基于Redis实现分布式消息队列(3)– Redis功能分析 http://blog.csdn.net/stationxp/article/details/457

基于Redis实现分布式消息队列(1)

1.为什么需要消息队列? 当系统中出现"生产"和"消费"的速度或稳定性等因素不一致的时候,就需要消息队列,作为抽象层,弥合双方的差异. 举个例子:业务系统触发短信发送申请,但短信发送模块速度跟不上,需要将来不及处理的消息暂存一下,缓冲压力. 再举个例子:调远程系统下订单成本较高,且因为网络等因素,不稳定,攒一批一起发送. 再举个栗子,交互模块5:00到24:00和电商系统联通,和内部ERP断开.1:00到4:00和ERP联通,和电商系统断开. 再举个例子,服务员点菜

[转载] 基于Redis实现分布式消息队列

转载自http://www.linuxidc.com/Linux/2015-05/117661.htm 1.为什么需要消息队列?当系统中出现“生产“和“消费“的速度或稳定性等因素不一致的时候,就需要消息队列,作为抽象层,弥合双方的差异. 举个例子:业务系统触发短信发送申请,但短信发送模块速度跟不上,需要将来不及处理的消息暂存一下,缓冲压力. 再举个例子:调远程系统下订单成本较高,且因为网络等因素,不稳定,攒一批一起发送. 再举个栗子,交互模块5:00到24:00和电商系统联通,和内部ERP断开.

基于Redis实现分布式消息队列(3)

1.Redis是什么鬼? Redis是一个简单的,高效的,分布式的,基于内存的缓存工具. 假设好服务器后,通过网络连接(类似数据库),提供Key-Value式缓存服务. 简单,是Redis突出的特色. 简单可以保证核心功能的稳定和优异. 2.性能 性能方面:Redis是足够高效的. 和Memecached对比,在数据量较小大情况下,Redis性能更优秀. 数据量大到一定程度的时候,Memecached性能稍好. 简单结论:但总体上讲Redis性能已经足够好. // Ref: Redis性能测试

基于Redis实现分布式消息队列(2)

1.消息队列需提供哪些功能? 在功能设计上,我崇尚奥卡姆剃刀法则. 对于消息队列,只需要两个方法: 生产 和 消费. 具体的业务场景是任务队列,代码设计如下: public abstract class TaskQueue{ private final String name ; public String getName(){return this.name;} public abstract void addTask(Serializable taskId); public abstract

基于redis的延迟消息队列设计

需求背景 用户下订单成功之后隔20分钟给用户发送上门服务通知短信 订单完成一个小时之后通知用户对上门服务进行评价 业务执行失败之后隔10分钟重试一次 类似的场景比较多 简单的处理方式就是使用定时任务 假如数据比较多的时候 有的数据可能延迟比较严重,而且越来越多的定时业务导致任务调度很繁琐不好管理. 队列设计 目前可以考虑使用rabbitmq来满足需求 但是不打算使用,因为目前太多的业务使用了另外的MQ中间件. 开发前需要考虑的问题? 及时性 消费端能按时收到 同一时间消息的消费权重 可靠性 消息

灵感来袭,基于Redis的分布式延迟队列

延迟队列 延迟队列,也就是一定时间之后将消息体放入队列,然后消费者才能正常消费.比如1分钟之后发送短信,发送邮件,检测数据状态等. Redisson Delayed Queue 如果你项目中使用了redisson,那么恭喜你,使用延迟队列将非常的简单. 基于Redis的Redisson分布式延迟队列(Delayed Queue)结构的RDelayedQueue Java对象在实现了RQueue接口的基础上提供了向队列按要求延迟添加项目的功能.该功能可以用来实现消息传送延迟按几何增长或几何衰减的发

基于Docker搭建分布式消息队列Kafka

本文基于Docker搭建一套单节点的Kafka消息队列,Kafka依赖Zookeeper为其管理集群信息,虽然本例不涉及集群,但是该有的组件都还是会有,典型的kafka分布式架构如下图所示.本例搭建的示例包含Zookeeper + Kafka + Kafka-manger #获取镜像 ·         zookeeper镜像:zookeeper:3.4.9 ·         kafka镜像:wurstmeister/kafka:0.10.2.0 ·         kafka-manager

Netty构建分布式消息队列(AvatarMQ)设计指南之架构篇

目前业界流行的分布式消息队列系统(或者可以叫做消息中间件)种类繁多,比如,基于Erlang的RabbitMQ.基于Java的ActiveMQ/Apache Kafka.基于C/C++的ZeroMQ等等,都能进行大批量的消息路由转发.它们的共同特点是,都有一个消息中转路由节点,按照消息队列里面的专业术语,这个角色应该是broker.整个消息系统通过这个broker节点,进行从消息生产者Producer到消费者Consumer的消息路由.当然了,生产者和消费者可以是多对多的关系.消息路由的时候,可以