在学习了Redis做为消息队列之后研究 了redis聊天的功能。
其实用关系型数据库也可以实现消息功能,自己就曾经用mysql写过一个简单的消息的功能。RDB中思路如下:
** 在实际中可以完全借助mysql数据库实现聊天功能,建立一个表,保存接收人的username、message、isConsumed等信息,用户登录之后采用心跳机制不停的检测数据库并消费消息。 心跳可以做好多事,比如检测检测当前用户是否已经登录,如果已经登录剔除之前已经登录的用户,实现一个用户一次登录的功能。 心跳可以采用JS的周期函数不停的向后台发起异步请求,后台查询未消息的消息 **
1.Redis实现一对一的聊天功能(基于lpush和brpop实现)
简单的实现一个用户向另一个用户发送多条信息,实现的思路是:
一对一聊天的思路:(采用Lpush和Brpop实现)
1.消息生产者生产消息到redis中:生产消息的时候根据接收人的userName与消息的类型发送到对应的key,采用lpush发送消息(根据userName生成key)
2.消息的消费者根据userName,从userName的key中消费对应的消息。如果有必要可以将消息写到RDB中避免数据的丢失。(根据userName生成key的规则获取用户对应的消息)
3.消息的内容头部加入发送者,例如原来消息内容是:hello,为了知道消息的发送者可以改为:张三*-*hello(为了获取消息的发送者)
下面直接上代码:
User.java(只有一个userName有用)
package cn.xm.jwxt.bean.system; import java.util.List; import java.util.Set; public class User { private String username;//用户姓名 public String getUsername() { return username; } public void setUsername(String username) { this.username = username == null ? null : username.trim(); } }
redis-chat.properties
redis.url=127.0.0.1 redis.port=6379 redis.maxIdle=30 redis.minIdle=10 redis.maxTotal=100 redis.maxWait=20000
Jedis工具类:(返回Jedis连接)
package cn.xm.redisChat.util; import redis.clients.jedis.Jedis; import redis.clients.jedis.JedisPool; import redis.clients.jedis.JedisPoolConfig; import java.io.IOException; import java.io.InputStream; import java.util.Properties; /** * @Author: qlq * @Description * @Date: 21:32 2018/10/9 */ public class JedisPoolUtils { private static JedisPool pool = null; static { //加载配置文件 InputStream in = JedisPoolUtils.class.getClassLoader().getResourceAsStream("redis-chat.properties"); Properties pro = new Properties(); try { pro.load(in); } catch (IOException e) { e.printStackTrace(); } //获得池子对象 JedisPoolConfig poolConfig = new JedisPoolConfig(); poolConfig.setMaxIdle(Integer.parseInt(pro.get("redis.maxIdle").toString()));//最大闲置个数 poolConfig.setMaxWaitMillis(Integer.parseInt(pro.get("redis.maxWait").toString()));//最大闲置个数 poolConfig.setMinIdle(Integer.parseInt(pro.get("redis.minIdle").toString()));//最小闲置个数 poolConfig.setMaxTotal(Integer.parseInt(pro.get("redis.maxTotal").toString()));//最大连接数 pool = new JedisPool(poolConfig, pro.getProperty("redis.url"), Integer.parseInt(pro.get("redis.port").toString())); } //获得jedis资源的方法 public static Jedis getJedis() { return pool.getResource(); } }
消息生产者:(处理消息头部加上消息的发送者,并且根据接受者的userName生成key)
package cn.xm.redisChat.one2one; import cn.xm.jwxt.bean.system.User; import cn.xm.redisChat.util.JedisPoolUtils; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import redis.clients.jedis.Jedis; /** * @Author: qlq * @Description 消息生产者(根据消息的) * @Date: 23:02 2018/10/13 */ public class RedisMessageProducer { private static final Logger log = LoggerFactory.getLogger(RedisMessageProducer.class); /** * 发送消息的方法 * * @param sendUser 发送消息的用户 * @param sendToUser 接收消息的用户 * @param messages 可变参数返送多条消息 * @return */ public static boolean sendMessage(User sendUser, User sendToUser, String... messages) { Jedis jedis = JedisPoolUtils.getJedis(); try { String key = sendToUser.getUsername() + ":msg"; //将消息的内容加上消息的发送人以 *-* 分割,不能用增强for循环 for (int i = 0, length_1 = messages.length; i < length_1; i++) { messages[i] = sendUser.getUsername() + "*-*" + messages[i]; } Long lpush = jedis.lpush(key, messages);//返回值是还有多少消息未消费 log.debug("user {} send message [{}] to {}", sendUser.getUsername(), messages, sendToUser.getUsername()); log.debug("user {} has {} messages ", sendToUser.getUsername(), lpush); } catch (Exception e) { log.error("sendMessage error", e); } finally { jedis.close(); } return true; } }
消息的消费者:(采用线程池获取消息,根据接收消息的userName从对应的key中获取对应的消息,并解析消息的key和发送者和内容)
package cn.xm.redisChat.one2one; import cn.xm.jwxt.bean.system.User; import cn.xm.redisChat.util.JedisPoolUtils; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import redis.clients.jedis.Jedis; import java.util.List; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; import java.util.concurrent.ScheduledExecutorService; import java.util.concurrent.TimeUnit; /** * @Author: qlq * @Description 消息的消费者 * @Date: 23:44 2018/10/13 */ public class RedisMessageConsumer { private static final Logger log = LoggerFactory.getLogger(RedisMessageConsumer.class); /** * 参数是初始化线程池子的大小 */ private static final ScheduledExecutorService batchTaskPool = Executors.newScheduledThreadPool(2); /** * 消费消息 * * @param consumerUser 接收消息的用户 */ public static void consumerMessage(final User consumerUser) { final Jedis jedis = JedisPoolUtils.getJedis(); //新建一个线程,线程池获取消息 Runnable runnable = new Runnable() { @Override public void run() { while (true){ List<String> messages = jedis.brpop(0, consumerUser.getUsername() + ":msg");//0是timeout,返回的是一个集合,第一个是消息的key,第二个是消息的内容 String key = messages.get(0);//第一个是key String message = messages.get(1);//第二个是消息 String sendUserName = message.substring(0, message.indexOf("*-*"));//获取消息的发送者 message = message.substring(message.indexOf("*-*")+3);//获取消息内容 log.debug("ThreadName is {},user {} consumer message {} ,sended by {}", Thread.currentThread().getName(),consumerUser.getUsername(), message, sendUserName); } } }; //线程池中获取消息 //第一个参数是需要执行的任务,第二个参数是第一次的延迟时间,第三个参数是两次执行的时间间隔,第四个参数是时间的单位 batchTaskPool.scheduleWithFixedDelay(runnable, 3,5, TimeUnit.SECONDS); } }
测试类:(lisi和wangwu消费消息)
package cn.xm.redisChat.one2one; import cn.xm.jwxt.bean.system.User; /** * @Author: qlq * @Description 消息消息 * @Date: 0:04 2018/10/14 */ public class ConsumerMessageApp { public static void main(String[] args) { User sndToUser = new User(); sndToUser.setUsername("lisi"); User sndToUser2 = new User(); sndToUser2.setUsername("wangwu"); RedisMessageConsumer.consumerMessage(sndToUser); RedisMessageConsumer.consumerMessage(sndToUser2); } }
zhangsan给lisi和wangwu发送消息
package cn.xm.redisChat.one2one; import cn.xm.jwxt.bean.system.User; /** * @Author: qlq * @Description 生产消息测试 * @Date: 23:59 2018/10/13 */ public class ProducerMessageApp { public static void main(String[] args) { User sndUser = new User(); sndUser.setUsername("zhangsan"); User sndToUser = new User(); sndToUser.setUsername("lisi"); User sndToUser2 = new User(); sndToUser2.setUsername("wangwu"); RedisMessageProducer.sendMessage(sndUser, sndToUser, "给李四的消息一", "给李四的消息二"); RedisMessageProducer.sendMessage(sndUser, sndToUser2, "给王五的消息一", "给王五的消息二"); } }
1.先启动消费者
2.启动消费者之后
消费者控制台如下:
生产者控制台如下:
3.再次启动消费者之后
消费者控制台:
生产者控制台:
至此实现了简单的一对一聊天,实际上就是简单的一个用户给另一个用户发送消息。上面采用这种方式实现的即使用户上线也会接受之前未接受的消息。只有BRPOP之后消息才会消失。
实际中可以根据需求进行实际的开发,实际中有消息类型、内容等。
有时间的话可以用kindeditor实现一个简单的一对一web聊天系统,这个功能待完成。==============
原文地址:https://www.cnblogs.com/qlqwjy/p/9784956.html