【并发】9、借助redis 实现生产消费,消息订阅发布模式队列

这个就是一个消息可以被多次消费的范例了

其实这个实现的方式可以参考我之前的设计模式,观察者模式

https://www.cnblogs.com/cutter-point/p/5249780.html

不过有一点需要注意一下啊,这个消息发布的时候,好像是不支持字节数据的,里面好像会对字节进行转换,这样的结果就是导致我最后无法吧相应的字节转换成我之前序列化的对象

不知道是不是ObjectInputStream和ObjectOutputStream实现不是很好的原因,还是什么,反正反序列化的时候,有些不可见的字符应该是被截掉了

消息发布者

package queue.redisQueue;

import queue.fqueue.vo.TempVo;
import redis.clients.jedis.Jedis;

import java.io.ByteArrayOutputStream;
import java.io.ObjectOutputStream;
import java.util.UUID;

/**
 * @ProjectName: cutter-point
 * @Package: queue.redisQueue
 * @ClassName: RedisQueueProducter3
 * @Author: xiaof
 * @Description: 订阅,发布模式 发布消息
 * @Date: 2019/6/12 16:47
 * @Version: 1.0
 */
public class RedisQueueProducter3 implements Runnable {

    private Jedis jedis;
    private String queueKey;

    public RedisQueueProducter3(Jedis jedis, String queueKey) {
        this.jedis = jedis;
        this.queueKey = queueKey;
    }

    public void putMessage() {
        try {
            Thread.sleep((long) (Math.random() * 1000));

            //不存在则创建,存在则直接插入
            //向redis队列中存放数据
            //生成数据
            TempVo tempVo = new TempVo();
            tempVo.setName(Thread.currentThread().getName() + ",time is:" + UUID.randomUUID());

            try {
                int i = 0;
                while(i < 10) {
                    //反馈订阅的数量
                    long num = jedis.publish(queueKey.getBytes(), tempVo.toString().getBytes());
                    if(num > 0) {
                        System.out.println("成功!num:" + num);
                        break;
                    }
                    ++i;
                }
            } catch (Exception e) {
                System.out.println("失败!");
            }

        } catch (Exception e) {
            e.printStackTrace();
        }
    }

    @Override
    public void run() {

        while(true) {
            putMessage();

        }

    }
}

消息消费者

package queue.redisQueue;

import queue.fqueue.vo.EventVo;
import redis.clients.jedis.Jedis;
import redis.clients.jedis.JedisPubSub;
import redis.clients.util.SafeEncoder;

import java.io.ByteArrayInputStream;
import java.io.IOException;
import java.io.ObjectInputStream;
import java.io.UnsupportedEncodingException;

/**
 * @ProjectName: cutter-point
 * @Package: queue.redisQueue
 * @ClassName: RedisQueueConsume3
 * @Author: xiaof
 * @Description: 发布订阅消息,订阅线程
 * @Date: 2019/6/12 16:53
 * @Version: 1.0
 */
public class RedisQueueConsume3 implements Runnable {

    private Jedis jedis;
    private String queueKey;

    class myJedisPubSub extends JedisPubSub {
        /** JedisPubSub类是一个没有抽象方法的抽象类,里面方法都是一些空实现
         * 所以可以选择需要的方法覆盖,这儿使用的是SUBSCRIBE指令,所以覆盖了onMessage
         * 如果使用PSUBSCRIBE指令,则覆盖onPMessage方法
         * 当然也可以选择BinaryJedisPubSub,同样是抽象类,但方法参数为byte[]
         **/
        @Override
        public void onMessage(String channel, String message) {
            System.out.println(Thread.currentThread().getName()+"-接收到消息:channel=" + channel + ",message=" + message);
            //接收到exit消息后退出
            System.out.println(message);
        }
    }

    public RedisQueueConsume3(Jedis jedis, String queueKey) {
        this.jedis = jedis;
        this.queueKey = queueKey;
    }

    public void consumerMessage() {
        jedis.subscribe(new myJedisPubSub(), queueKey);
    }

    @Override
    public void run() {
        while (true) {
            consumerMessage();
        }
    }
}

测试代码:

@Test
    public void test4() throws InterruptedException {

        //读写取数据
        for(int i = 0; i < 2; ++i) {
            System.out.println("输出测试" + i);
            RedisQueueProducter3 producter = new RedisQueueProducter3(jedisPool.getResource(), "xiaof");
            Thread t = new Thread(producter);
            t.start();
        }

        while(true) {
            Thread.sleep(1000);
        }
    }

    @Test
    public void test5() throws InterruptedException {

        //读写取数据
        for(int i = 0; i < 5; ++i) {
            System.out.println("输出测试" + i);
            //切记一定要重新获取Resource,不然无法并发操作
            RedisQueueConsume3 fqueueConsume = new RedisQueueConsume3(jedisPool.getResource(), "xiaof");
            Thread t = new Thread(fqueueConsume);
            t.setDaemon(true);
            t.start();
        }

        while(true) {
            Thread.sleep(1000);
        }
    }

效果展示

同一消息被多个订阅者同步消费

原文地址:https://www.cnblogs.com/cutter-point/p/11011122.html

时间: 2024-08-17 10:15:29

【并发】9、借助redis 实现生产消费,消息订阅发布模式队列的相关文章

基于Redis的消息订阅/发布

在工业生产设计中,我们往往需要实现一个基于消息订阅的模式,用来对非定时的的消息进行监听订阅. 这种设计模式在 总线设计模式中得到体现.微软以前的WCF中实现了服务总线 ServiceBus的设计模式.然并卵.WCF已经好像是上个世纪的产物................ 基于事件订阅的模式,比如 EventBus类的组件产品.但是往往设计比较复杂. 如果依赖于 Redis做事件消息推送.那就大大简化了这种设计模式,而且性能也比较客观. Redis在 2.0之后的版本中 实现了 事件推送的  pu

redis的消息订阅发布介绍

1.redis的消息订阅发布: 进程间的一种消息通信模式:发送者(pub)发送信息,订阅者(sub)接收信息. 注: 图1为 三个客户端 client2.client5.client1 通过 subscribe 命令订阅 频道 channel1 ,图二为 当有新消息通过 publish 命令发送给频道 channel1时,这个消息就会被发送给订阅它的三个客户端. 2.消息订阅发布的相关命令: PSUBSCRIBE pattern [pattern...]: 订阅一个或者多个符合给定模式的频道 P

python redis 实现简单的消息订阅

python + redis 实现简单的消息订阅 订阅端 import redis from functools import wraps class Subscribe: def __init__(self, channel: str, **kwargs): self.coon = redis.StrictRedis(**kwargs) self.channel = channel self.registerd = list() self.course = self.coon.pubsub()

RabbitMQ下的生产消费者模式与订阅发布模式

??所谓模式,就是在某种场景下,一类问题及其解决方案的总结归纳.生产消费者模式与订阅发布模式是使用消息中间件时常用的两种模式,用于功能解耦和分布式系统间的消息通信,以下面两种场景为例: 数据接入 ??假设有一个用户行为采集系统,负责从App端采集用户点击行为数据.通常会将数据上报和数据处理分离开,即App端通过REST API上报数据,后端拿到数据后放入队列中就立刻返回,而数据处理则另外使用Worker从队列中取出数据来做,如下图所示. ??这样做的好处有:第一,功能分离,上报的API接口不关心

Redis系列(三)—— 订阅/发布

Redis 订阅/发布 参考:http://www.cnblogs.com/mushroom/p/4470006.html,http://www.tuicool.com/articles/ABry2aj,http://www.cnblogs.com/tinywan/p/5903256.html,http://www.cnblogs.com/linjiqin/p/5733183.html,http://redisbook.readthedocs.io/en/latest/feature/pubsu

关于laravel5 消息订阅/发布的理解初

laravel5.4感觉官网文档说滴不够详细...安装predis官网很详细,这里略过.... 生成命令 直接使用 Artisan 命令 make:command,该命令会在 app/Console/Commands 目录下创建一个新的命令类.如果该目录不存在,不用担心,它将会在你首次运行 Artisan 命令 make:command 时被创建.生成的命令将会包含默认的属性设置以及所有命令都共有的方法, 这里我生成一个RedisSubscribe.php类,执行下面命令: php artisa

理解 Redis(9) - Publish Subscribe 消息订阅

在窗口1开通一个名为 redis 的通道: 127.0.0.1:6379> SUBSCRIBE redis Reading messages... (press Ctrl-C to quit) 1) "subscribe" 2) "redis" 3) (integer) 1 从窗口2传入信息: 127.0.0.1:6379> PUBLISH redis hi (integer) 1 此时窗口1会收到这条信息: 127.0.0.1:6379> SUB

【02】Redis for OPS:消息订阅和事务管理

写在前面的话 上一节谈了 Redis 的安装以及五种基本数据类型的一些简单的操作,本章节主要看看 Redis 的另外一些特征,虽然可能不常用,但是还是需要了解的.对于我们运维人员来讲,这些东西更像拓展的知识,可能我们工作很多年都不会用到,但是当你慢慢的需要往运维开发方向发展以后,这些东西就会成为你解决问题的又一方案.另外一种思路. 发布订阅 Redis 发布消息一般有两种方式,消息队列和发布订阅. 对于消息队列,其角色包含:生产者 --> 消息队列 --> 消费者 生产者讲需要处理的任务放到队

消息订阅发布系统Apache Kafka分布式集群环境搭建和简单测试

一.什么是kafka? kafka是LinkedIn开发并开源的一个分布式MQ系统,现在是Apache的一个孵化项目.在它的主页描述kafka为一个高吞吐量的分布式(能将消息分散到不同的节点上)MQ.Kafka仅仅由7000行Scala编写,据了解,Kafka每秒可以生产约25万消息(50 MB),每秒处理55万消息(110 MB) 二.kafka的官方网站在哪里? http://kafka.apache.org/ 三.在哪里下载?需要哪些组件的支持? kafka2.9.2在下面的地址可以下载: