使用redis的发布订阅模式实现消息队列

配置文件

<?xml version="1.0" encoding="UTF-8"?>
<beans xmlns="http://www.springframework.org/schema/beans"
    xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xmlns:aop="http://www.springframework.org/schema/aop"
    xmlns:tx="http://www.springframework.org/schema/tx" xmlns:util="http://www.springframework.org/schema/util"
    xmlns:context="http://www.springframework.org/schema/context" xmlns:mongo="http://www.springframework.org/schema/data/mongo"
    xsi:schemaLocation="http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans-3.0.xsd
        http://www.springframework.org/schema/aop http://www.springframework.org/schema/aop/spring-aop-3.0.xsd
    http://www.springframework.org/schema/tx http://www.springframework.org/schema/tx/spring-tx-3.0.xsd
    http://www.springframework.org/schema/util http://www.springframework.org/schema/util/spring-util-3.0.xsd
    http://www.springframework.org/schema/data/mongo http://www.springframework.org/schema/data/mongo/spring-mongo-1.0.xsd
    http://www.springframework.org/schema/context http://www.springframework.org/schema/context/spring-context-3.0.xsd">

    <context:component-scan base-package=‘com.iwhere.redis.sub‘/>
    <!-- 获取配置资源 -->
<!--     <context:property-placeholder location="classpath:redis-context-config.properties" /> -->

    <!-- redis  START -->
    <bean id="propertyConfigurerRedis" class="org.springframework.beans.factory.config.PropertyPlaceholderConfigurer">
        <property name="order" value="1" />
        <property name="ignoreUnresolvablePlaceholders" value="true" />
        <property name="locations">
            <list>
                <value>classpath:redis-context-config.properties</value>
            </list>
        </property>
    </bean>

        <!-- jedis pool配置 -->
    <bean id="jedisPoolConfig" class="redis.clients.jedis.JedisPoolConfig">
        <property name="maxTotal" value="${redis.maxActive}" />
        <property name="maxIdle" value="${redis.maxIdle}" />
        <property name="maxWaitMillis" value="${redis.maxWait}" />
        <property name="testOnBorrow" value="${redis.testOnBorrow}" />
    </bean>  

    <!-- spring data redis -->
    <bean id="jedisConnectionFactory" class="org.springframework.data.redis.connection.jedis.JedisConnectionFactory">
        <property name="usePool" value="false"></property>
        <property name="hostName" value="${redis.host}" />
        <property name="port" value="${redis.port}" />
        <property name="password" value="${redis.pass}" />
        <property name="timeout" value="${redis.timeout}" />
<!--         <property name="database" value="${redis.default.db}"></property> -->
         <constructor-arg index="0" ref="jedisPoolConfig" />
    </bean>  

    <bean id="redisTemplate" class="org.springframework.data.redis.core.StringRedisTemplate">
        <property name="connectionFactory" ref="jedisConnectionFactory"></property>
        <property name="keySerializer">
            <bean class="org.springframework.data.redis.serializer.StringRedisSerializer"/>
        </property>
        <property name="valueSerializer">
            <bean class="org.springframework.data.redis.serializer.StringRedisSerializer"/>
        </property>
    </bean>
    <!-- redis  END -->

    <bean id="serialization" class="org.springframework.data.redis.serializer.JdkSerializationRedisSerializer" />  

    <bean id="messageDelegateListener" class="com.iwhere.redis.sub.MessageDelegateListenerImpl" />  

    <bean id="messageListener" class="org.springframework.data.redis.listener.adapter.MessageListenerAdapter">
        <property name="delegate" ref="messageDelegateListener" />
        <property name="serializer" ref="serialization" />
    </bean>  

    <bean id=‘messageContainer‘ class=‘org.springframework.data.redis.listener.RedisMessageListenerContainer‘
          destroy-method=‘destroy‘>
         <property name=‘connectionFactory‘ ref=‘jedisConnectionFactory‘ />
        <property name=‘messageListeners‘>
             <map>
                <entry key-ref=‘messageListener‘>
                    <list>
                        <ref bean=‘amap‘ />
                    </list>

                </entry>
            </map>
        </property>
    </bean>
    <!-- Channel设置 -->
<!--     <bean id=‘sendTopic‘ class=‘org.springframework.data.redis.listener.ChannelTopic‘> -->
<!--           <constructor-arg value=‘send‘ /> -->
<!--     </bean> -->
    <!-- Channel设置 -->
    <bean id=‘amap‘ class=‘org.springframework.data.redis.listener.ChannelTopic‘>
          <constructor-arg value=‘amap‘ />
    </bean>
</beans>

Demo演示:

消息发布端:

package com.iwhere.testredis;

import org.junit.Before;
import org.junit.Test;
import org.springframework.context.support.ClassPathXmlApplicationContext;
import org.springframework.data.redis.core.StringRedisTemplate;

/**
 * 测试redis做消息
 * @author 231
 *
 */
public class TestRedis {

    private StringRedisTemplate redisTemplate;
    @Before
    public void before() {
        ClassPathXmlApplicationContext context = new ClassPathXmlApplicationContext("classpath:redis-context.xml");
        redisTemplate = context.getBean(StringRedisTemplate.class);
    }

    private String channel = "amap";
    /**
     * 测试连接
     */
    @Test
    public void test1() {
        String message = "c26c4ac0-37a3-4277-9020-bfa274c058f5|526548902996869120|Success";
        redisTemplate.convertAndSend(channel, message);
        System.out.println("发送完成");
    }
}

消息接收端

package com.iwhere.redis.sub;

import java.io.UnsupportedEncodingException;

import org.aspectj.bridge.Message;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.data.redis.connection.MessageListener;
import org.springframework.data.redis.listener.ChannelTopic;

/**
 *
 */
public class MessageDelegateListenerImpl implements MessageListener {

    @Autowired
    private ChannelTopic channelTopic;

    @Override
    public void onMessage(org.springframework.data.redis.connection.Message message, byte[] pattern) {

        byte[] bytes = message.getBody();// ""里面的参数为需要转化的编码,一般是ISO8859-1
        try {
            String str = new String(bytes, "utf-8");
            System.out.println("I am here" + str + ": " +  channelTopic.getTopic());
        } catch (UnsupportedEncodingException e) {
            // TODO Auto-generated catch block
            e.printStackTrace();
        }
        System.out.println(message);
    }

}

redis的资源文件

#redis.host=dev.iwhere.com
redis.host=192.168.1.110
redis.port=6379
redis.pass=redis密码, 没有密码就注释掉
redis.default.db=0
redis.timeout=100000
redis.maxActive=300
redis.maxIdle=100
redis.maxWait=1000
redis.testOnBorrow=true
时间: 2024-11-07 12:50:21

使用redis的发布订阅模式实现消息队列的相关文章

redis的发布订阅模式

概要 redis的每个server实例都维护着一个保存服务器状态的redisServer结构 struct redisServer { /* Pubsub */ // 字典,键为频道,值为链表 // 链表中保存了所有订阅某个频道的客户端 // 新客户端总是被添加到链表的表尾 dict *pubsub_channels;  /* Map channels to list of subscribed clients */ // 这个链表记录了客户端订阅的所有模式的名字 list *pubsub_pa

RabbitMQ学习第三记:发布/订阅模式(Publish/Subscribe)

工作队列模式是直接在生产者与消费者里声明好一个队列,这种情况下消息只会对应同类型的消费者. 举个用户注册的列子:用户在注册完后一般都会发送消息通知用户注册成功(失败).如果在一个系统中,用户注册信息有邮箱.手机号,那么在注册完后会向邮箱和手机号都发送注册完成信息.利用MQ实现业务异步处理,如果是用工作队列的话,就会声明一个注册信息队列.注册完成之后生产者会向队列提交一条注册数据,消费者取出数据同时向邮箱以及手机号发送两条消息.但是实际上邮箱和手机号信息发送实际上是不同的业务逻辑,不应该放在一块处

观察者模式 vs 发布-订阅模式

我曾经在面试中被问道,_“观察者模式和发布订阅模式的有什么区别?” _我迅速回忆起“Head First设计模式”那本书: 发布 + 订阅 = 观察者模式 “我知道了,我知道了,别想骗我” 我微笑着回答:“没有区别,它们是一样的.” 但是面试官笑了,“不,它们不一样.” 我当时的表情: 所以是我错了吗? 之后我回到家打开google查找答案.这篇文章就是我google后的总结. 在深入探讨区别之前,我们先来讨论下“观察者模式”和“发布订阅模式”. 观察者设计模式: 我认为大多数人都会同意观察者模

Redis研究(十六)—发布/订阅模式

在上一篇中我们写了Redis的任务队列. 除了实现任务队列外,Redis还提供了一组命令可以让开发者实现"发布/订阅"(publish/subscribe)模式."发布/订阅"模式同样可以实现进程间的消息传递,其原理是这样的: "发布/订阅"模式中包含两种角色,分别是发布者和订阅者.订阅者可以订阅一个或若干个频道(channel),而发布者可以向指定的频道发送消息,所有订阅此频道的订阅者都会收到此消息. 发布者发布消息的命令是PUBLISH,用法

Sprint Boot如何基于Redis发布订阅实现异步消息系统的同步调用?

前言 在很多互联网应用系统中,请求处理异步化是提升系统性能一种常用的手段,而基于消息系统的异步处理由于具备高可靠性.高吞吐量的特点,因而在并发请求量比较高的互联网系统中被广泛应用.与此同时,这种方案也带来了调用链路处理上的问题,因为大部分应用请求都会要求同步响应实时处理结果,而由于请求的处理过程已经通过消息异步解耦,所以整个调用链路就变成了异步链路,此时请求链路的发起者如何同步拿到响应结果,就需要进行额外的系统设计考虑. 为了更清晰地理解这个问题,小码哥以最近正在做的共享单车的IOT系统为例,给

redis 发布/订阅 模式

发布/订阅模式的命令如下: * 进入发布订阅模式的客户端,不能执行除发布订阅模式以上命令的其他命令,否则出错.

redis(3)发布订阅

一.发布/订阅模式 在软件工程里面,发布/订阅是一种消息模式,这种模式旨在将消息发送者和消息接收者解耦.发送者不需要关心将消息发送给谁,接收者也不需要知道消息的发送者是谁.发送者将消息发布以后就结束动作,接收者可以订阅自己感兴趣的消息. 除了发布/订阅模式还有一种和它很类似的,消息队列,是一种典型的面向消息中间件的系统.许多消息系统都会同时支持发布/订阅和消息队列模型,例如Java Message Service(JMS) 参见:维基百科 二.redis的发布/订阅 我们从一个简单的示例开始,首

发布-订阅模式

1.什么是发布订阅模式 发布订阅模式 又叫观察者模式,他是定义对象间的一种一对多的依赖关系,当一个对象的状态发生改变,所有依赖他的对象都将得到通知. 在javascript开发中,我们一般用事件模型来替代传统的发布-订阅模式. 2.Dom事件 实际上,只要我们曾经在dom节点上绑定过事件函数,那么我们就曾经使用过发布-订阅模式. document.getElementById('test').addEventListener('click',function(){ alert(2)},fasle

JavaScript设计模式与开发实践---读书笔记(8) 发布-订阅模式

发布-订阅模式又叫观察者模式,它定义对象间的一种一对多的依赖关系,当一个对象的状态发生改变时,所有依赖于它的对象都将得到通知. 发布-订阅模式可以广泛应用于异步编程中,这是一种替代传递回调函数的方案. 可以取代对象之间硬编码的通知机制,一个对象不用再显式地调用另外一个对象的某个接口. 自定义事件 首先要指定好谁充当发布者: 然后给发布者添加一个缓存列表,用于存放回调函数以便通知订阅者: 最后发布消息时,发布者会遍历这个缓存列表,依次触发里面存放的订阅者回调函数. 另外,我们还可以往回调函数里填入