【Redis】jedis客户端实现redis消息的发布订阅(实时消息中间件)

发布

package com.chiwei.redis;

import java.io.BufferedReader;
import java.io.InputStreamReader;

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import redis.clients.jedis.Jedis;

public class RedisPublisher {

	private static final Logger log = LoggerFactory.getLogger(RedisPublisher.class);

	private final Jedis pubJedis;

	private final String[] channel;

	public RedisPublisher(Jedis pubJedis, String[] channel) {
		this.pubJedis = pubJedis;
		this.channel = channel;
	}

	public void start() {
		log.debug("Type your message (type quit to exit)");
		int channelLen = channel.length;
		try {
			BufferedReader br = new BufferedReader(new InputStreamReader(System.in));
			while (true) {
				String line = br.readLine();
				if (!"quit".equals(line)) {
					for (int i = 0; i < channelLen; i++) {
						if (channel[i].matches("^chiwei.*")) {
							log.debug("Match...");
							pubJedis.publish(channel[i], line + "haha");
						} else {
							pubJedis.publish(channel[i], line);
						}
						log.debug("Publish to {}", channel[i]);
					}
				} else {
					break;
				}
			}
		} catch (Exception e) {
			log.error("IO fail while reading input", e);
		}
	}
}

以上发布类,发布的频道是一个数组,即同时将内容发布到多个频道中,你可以根据内容去判断,不同的内容发布到不同的频道中。

订阅

package com.chiwei.redis;

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import redis.clients.jedis.JedisPubSub;

public class RedisSubscriber extends JedisPubSub{

	private static final Logger log = LoggerFactory.getLogger(RedisSubscriber.class);

	//取得订阅的消息后的处理
	public void onMessage(String s, String s1) {
		// TODO Auto-generated method stub
		log.debug("Message received,Channel:{},Msg:{}",s,s1);
	}

	//取得按表达式的方式订阅的消息后的处理
	public void onPMessage(String s, String s1, String s2) {
		// TODO Auto-generated method stub
		log.debug("Pattern:{}",s);
		log.debug("Pattern Message received,Channel:{},Msg:{}",s1,s2);
	}

	//初始化按表达式的方式订阅时候的处理
	public void onPSubscribe(String s, int i) {
		// TODO Auto-generated method stub
		log.debug("Pattern Subscribe,Pattern:{},ChannelNum:{}",s,i);
	}

	//取消按表达式的方式订阅时候的处理
	public void onPUnsubscribe(String s, int i) {
		// TODO Auto-generated method stub
		log.debug("Pattern Unsubscribe,Pattern:{},ChannelNum:{}",s,i);
	}

	//初始化订阅时候的处理
	public void onSubscribe(String s, int i) {
		// TODO Auto-generated method stub
		log.debug("Subscribe,Channel:{},ChannelNum:{}",s,i);
	}

	//取消订阅时候的处理
	public void onUnsubscribe(String s, int i) {
		// TODO Auto-generated method stub
		log.debug("Unsubscribe,Channel:{},ChannelNum:{}",s,i);
	}

}

该类就是订阅的实现类,对于订阅的各项操作实现具体的处理方法。

启动主类

package com.chiwei.redis;

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import redis.clients.jedis.Jedis;
import redis.clients.jedis.JedisPool;
import redis.clients.jedis.JedisPoolConfig;

public class RedisPubSubMain {

	public static final String[] CHANNEL_NAME = new String[] { "chiwei.momo", "chiwei.nono","taotao"};

	private static final Logger log = LoggerFactory.getLogger(RedisPubSubMain.class);

	public static void main(String[] args) {
		// TODO Auto-generated method stub
		log.debug("=========================");
		JedisPoolConfig config = new JedisPoolConfig();
		config = new JedisPoolConfig();
		config.setMaxTotal(100);
		config.setMaxIdle(10);
		config.setMaxWaitMillis(1000L);
		config.setTestOnBorrow(true);
		config.setTestOnReturn(true);
		JedisPool jedisPool = new JedisPool(config, "192.168.11.176", 7379);
		final Jedis subJedis = jedisPool.getResource();
		final RedisSubscriber sub = new RedisSubscriber();
		new Thread(new Runnable() {

			public void run() {
				try {
					//subJedis.subscribe(sub, CHANNEL_NAME);
					subJedis.psubscribe(sub, "^chiwei.*");
					log.debug("Subscribe ended");
				} catch (Exception e) {
					log.error("Subscribe failed", e);
				}
			}

		}).start();

		Jedis pubJedis = jedisPool.getResource();
		new RedisPublisher(pubJedis, CHANNEL_NAME).start();
		sub.unsubscribe();
		jedisPool.returnResourceObject(subJedis);
		jedisPool.returnResourceObject(pubJedis);
		jedisPool.close();
	}

}

subJedis.psubscribe(sub, "^chiwei.*");按照正则匹配订阅的频道

由于订阅类会阻塞当前线程的执行,所以在main线程中另起一个线程来启动订阅,然后启动发布线程去发布内容。

2015-04-17 10:35:43,751 - com.chiwei.redis.RedisPubSubMain[18] -0    [main] DEBUG  - =========================

2015-04-17 10:35:43,843 - com.chiwei.redis.RedisSubscriber[29] -92   [Thread-3] DEBUG  - Pattern Subscribe,Pattern:^chiwei.*,ChannelNum:1

2015-04-17 10:35:43,848 - com.chiwei.redis.RedisPublisher[25] -97   [main] DEBUG  - Type your message (type quit to exit)

3

2015-04-17 10:35:53,132 - com.chiwei.redis.RedisPublisher[34] -9381 [main] DEBUG  - Match...

2015-04-17 10:35:53,138 - com.chiwei.redis.RedisPublisher[39] -9387 [main] DEBUG  - Publish to chiwei.momo

2015-04-17 10:35:53,140 - com.chiwei.redis.RedisPublisher[34] -9389 [main] DEBUG  - Match...

2015-04-17 10:35:53,146 - com.chiwei.redis.RedisPublisher[39] -9395 [main] DEBUG  - Publish to chiwei.nono

2015-04-17 10:35:53,153 - com.chiwei.redis.RedisPublisher[39] -9402 [main] DEBUG  - Publish to taotao

时间: 2024-10-17 03:27:54

【Redis】jedis客户端实现redis消息的发布订阅(实时消息中间件)的相关文章

《Redis设计与实现》学习笔记-发布订阅与事务

发布与订阅 Redis通过发布订阅提供一对多甚至是多对多的节点消息通信,发布订阅由PUBLISH.SUBSCRIBE.PSUBSCRIBE.PUBSUB等命令组成. SUBSCRIBE命令:订阅某频道,在redisServer结构中通过pubsub_channels字典属性保存当前服务器所有频道的订阅关系,字典键时频道名称,字典值是一个链表,记录了所有订阅这个频道的客户端. UNSUBSCRIBE命令:退订频道,调用该命令之后,会把订阅关系从pubsub_channels中删掉,如果键对应的链表

Redis瑞士军刀:慢查询,Pipeline和发布订阅

1.慢查询 1.1 慢查询的生命周期 步骤一:client通过网络向Redis发送一条命令 步骤二:由于Redis是单线程应用,可以把Redis想像成一个队列,client执行的所有命令都在排队等着server端执行 步骤三:Redis服务端按顺序执行命令 步骤四:server端把命令结果通过网络返回给client 说明: 慢查询发生在命令执行过程中,不包含网络延迟时间及排除等待执行的时间 客户端超时不一定慢查询,但慢查询是客户端超时的一个可能因素 1.2 慢查询的配置项 slowlog-max

Redis05——Redis高级运用(管道连接,发布订阅,布隆过滤器)

Redis高级运用 一.管道连接redis(一次发送多个命令,节省往返时间) 1.安装nc yum install nc -y 2.通过nc连接redis nc localhost 6379 3.通过echo向nc发送指令 echo -e "set k2 99\nincr k2\n get k2" |nc localhost 6379 二.发布订阅(pub/Sub) publish channel message subscribe channel 三.事务(transactions)

Redis学习笔记07Redis命令之(6) 发布订阅

1.1.1. subscribe 订阅一个或多个频道.当所订阅的频道上发布了消息时,本客户端连接将收到这个消息. redis.coe2coe.me:6379> subscribe abc Reading messages... (press Ctrl-C to quit) 1) "subscribe" 2) "abc" 3) (integer) 1 成功订阅了一个频道. 也可以一次订阅多个频道: redis.coe2coe.me:6379> subscr

Redis_发布订阅(Spring Boot)

目录 前言 生产者和消费者 发布和订阅 Java实现 注意 转至 http://www.tianmaying.com/tutorial/springboot-redis-message 前言 利用Spring Data对Redis的支持来实现消息的发布订阅机制.使用StringRedisTemplate来发布一个字符串消息,同时基于MessageListenerAdapter使用一个POJO来订阅和响应该消息.Receiver类将会被注册为一个消息监听者时.给Receiver的构造函数通过@Au

高性能网站架构设计之缓存篇(1)- Redis C#客户端

一.什么 Redis REmote DIctionary Server,简称 Redis,是一个类似于Memcached的Key-Value存储系统.相比Memcached,它支持更丰富的数据结构,包括string(字符串).list(链表).set(集合).zset(sorted set --有序集合)和hash(哈希类型),并提供了数据持久化机制,在某些场景下,你完全可以把它当做非关系型数据库来使用.它是一个高性能的存储系统,能支持超过 100K+ 每秒的读写频率.同时还支持消息的发布/订阅

StackExchange.Redis 使用-发布订阅 (二)

使用Redis的发布订阅功能 redis另一个常见的用途是发布订阅功能 . 它非常的简单 ,当连接失败时 ConnectionMultiplexer 会自动重新进行订阅 . ISubscriber sub = redis.GetSubscriber(); GetSubscriber 方法返回一个 ISubscriber 类型的实例 .发布订阅功能没有数据库的概念,我们可以为其提供一个 async-state .所有的订阅都是全局的:ISubscriber 实例不是他们的生命周期 , 发布订阅的特

Redis七(发布订阅)

发布与订阅(pub/sub) 介绍 Redis 通过 PUBLISH . SUBSCRIBE 等命令实现了订阅与发布模式, 这个功能提供两种信息机制, 分别是订阅/发布到频道和订阅/发布到模式 订阅者可以订阅一个或多个频道,发布者向一个频道发送消息后,所有订阅这个频道的订阅者都将收到消息,而发布者也将收到一个数值,这个数值是收到消息的订阅者的数量.订阅者只能收到自它开始订阅后发布者所发布的消息,而之前发布的消息是收不到的. 运行原理: Redis 的 SUBSCRIBE 命令可以让客户端订阅任意

redis学习3--redis功能 发布订阅,HyperLogLog,GEO,Lua等等

1 HyperLogLog HyperLogLog是一种字符串类型的数据结构,一种利用极小的内存完成大量独立用户数据统计,但是存在误差(官方0数字.81%). 2 发布与订阅 redis 不能做消息队列堆积,就是说最新的订阅者无法收到订阅前的消息 消息队列与发布订阅,消息队列是多个客户端抢消息,发布订阅是每个客户端都可以收到消息,这是他们的区别. 3 GEO GEO 支持存储位置信息,来实现附件的人,摇一摇等功能. 原文地址:https://www.cnblogs.com/tjqBlog/p/9