Redis实现聊天功能

  在学习了Redis做为消息队列之后研究 了redis聊天的功能。

  其实用关系型数据库也可以实现消息功能,自己就曾经用mysql写过一个简单的消息的功能。RDB中思路如下:

**
在实际中可以完全借助mysql数据库实现聊天功能,建立一个表,保存接收人的username、message、isConsumed等信息,用户登录之后采用心跳机制不停的检测数据库并消费消息。
心跳可以做好多事,比如检测检测当前用户是否已经登录,如果已经登录剔除之前已经登录的用户,实现一个用户一次登录的功能。
心跳可以采用JS的周期函数不停的向后台发起异步请求,后台查询未消息的消息
**

1.Redis实现一对一的聊天功能(基于lpush和brpop实现)

  简单的实现一个用户向另一个用户发送多条信息,实现的思路是:

一对一聊天的思路:(采用Lpush和Brpop实现)
1.消息生产者生产消息到redis中:生产消息的时候根据接收人的userName与消息的类型发送到对应的key,采用lpush发送消息(根据userName生成key)
2.消息的消费者根据userName,从userName的key中消费对应的消息。如果有必要可以将消息写到RDB中避免数据的丢失。(根据userName生成key的规则获取用户对应的消息)
3.消息的内容头部加入发送者,例如原来消息内容是:hello,为了知道消息的发送者可以改为:张三*-*hello(为了获取消息的发送者)

下面直接上代码:

User.java(只有一个userName有用)

package cn.xm.jwxt.bean.system;

import java.util.List;
import java.util.Set;

public class User {

    private String username;//用户姓名
public String getUsername() {
        return username;
    }

    public void setUsername(String username) {
        this.username = username == null ? null : username.trim();
    }
}

redis-chat.properties

redis.url=127.0.0.1
redis.port=6379
redis.maxIdle=30
redis.minIdle=10
redis.maxTotal=100
redis.maxWait=20000

Jedis工具类:(返回Jedis连接)

package cn.xm.redisChat.util;

import redis.clients.jedis.Jedis;
import redis.clients.jedis.JedisPool;
import redis.clients.jedis.JedisPoolConfig;

import java.io.IOException;
import java.io.InputStream;
import java.util.Properties;

/**
 * @Author: qlq
 * @Description
 * @Date: 21:32 2018/10/9
 */
public class JedisPoolUtils {

    private static JedisPool pool = null;

    static {

        //加载配置文件
        InputStream in = JedisPoolUtils.class.getClassLoader().getResourceAsStream("redis-chat.properties");
        Properties pro = new Properties();
        try {
            pro.load(in);
        } catch (IOException e) {
            e.printStackTrace();
        }

        //获得池子对象
        JedisPoolConfig poolConfig = new JedisPoolConfig();
        poolConfig.setMaxIdle(Integer.parseInt(pro.get("redis.maxIdle").toString()));//最大闲置个数
        poolConfig.setMaxWaitMillis(Integer.parseInt(pro.get("redis.maxWait").toString()));//最大闲置个数
        poolConfig.setMinIdle(Integer.parseInt(pro.get("redis.minIdle").toString()));//最小闲置个数
        poolConfig.setMaxTotal(Integer.parseInt(pro.get("redis.maxTotal").toString()));//最大连接数
        pool = new JedisPool(poolConfig, pro.getProperty("redis.url"), Integer.parseInt(pro.get("redis.port").toString()));
    }

    //获得jedis资源的方法
    public static Jedis getJedis() {
        return pool.getResource();
    }
}

消息生产者:(处理消息头部加上消息的发送者,并且根据接受者的userName生成key)

package cn.xm.redisChat.one2one;

import cn.xm.jwxt.bean.system.User;
import cn.xm.redisChat.util.JedisPoolUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import redis.clients.jedis.Jedis;

/**
 * @Author: qlq
 * @Description 消息生产者(根据消息的)
 * @Date: 23:02 2018/10/13
 */
public class RedisMessageProducer {
    private static final Logger log = LoggerFactory.getLogger(RedisMessageProducer.class);

    /**
     * 发送消息的方法
     *
     * @param sendUser   发送消息的用户
     * @param sendToUser 接收消息的用户
     * @param messages   可变参数返送多条消息
     * @return
     */
    public static boolean sendMessage(User sendUser, User sendToUser, String... messages) {
        Jedis jedis = JedisPoolUtils.getJedis();
        try {
            String key = sendToUser.getUsername() + ":msg";
            //将消息的内容加上消息的发送人以 *-* 分割,不能用增强for循环
            for (int i = 0, length_1 = messages.length; i < length_1; i++) {
                messages[i] = sendUser.getUsername() + "*-*" + messages[i];
            }
            Long lpush = jedis.lpush(key, messages);//返回值是还有多少消息未消费
            log.debug("user {} send message [{}] to {}", sendUser.getUsername(), messages, sendToUser.getUsername());
            log.debug("user {} has {} messages ", sendToUser.getUsername(), lpush);
        } catch (Exception e) {
            log.error("sendMessage error", e);
        } finally {
            jedis.close();
        }
        return true;
    }
}

消息的消费者:(采用线程池获取消息,根据接收消息的userName从对应的key中获取对应的消息,并解析消息的key和发送者和内容)

package cn.xm.redisChat.one2one;

import cn.xm.jwxt.bean.system.User;
import cn.xm.redisChat.util.JedisPoolUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import redis.clients.jedis.Jedis;

import java.util.List;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;

/**
 * @Author: qlq
 * @Description 消息的消费者
 * @Date: 23:44 2018/10/13
 */
public class RedisMessageConsumer {
    private static final Logger log = LoggerFactory.getLogger(RedisMessageConsumer.class);

    /**
     * 参数是初始化线程池子的大小
     */
    private static final ScheduledExecutorService batchTaskPool = Executors.newScheduledThreadPool(2);

    /**
     * 消费消息
     *
     * @param consumerUser 接收消息的用户
     */
    public static void consumerMessage(final User consumerUser) {
        final Jedis jedis = JedisPoolUtils.getJedis();

        //新建一个线程,线程池获取消息
        Runnable runnable = new Runnable() {
            @Override
            public void run() {
                while (true){
                    List<String> messages = jedis.brpop(0, consumerUser.getUsername() + ":msg");//0是timeout,返回的是一个集合,第一个是消息的key,第二个是消息的内容
                    String key = messages.get(0);//第一个是key
                    String message = messages.get(1);//第二个是消息
                    String sendUserName = message.substring(0, message.indexOf("*-*"));//获取消息的发送者
                    message = message.substring(message.indexOf("*-*")+3);//获取消息内容
                    log.debug("ThreadName is {},user {} consumer message {} ,sended by {}", Thread.currentThread().getName(),consumerUser.getUsername(), message, sendUserName);
                }
            }
        };
        //线程池中获取消息
        //第一个参数是需要执行的任务,第二个参数是第一次的延迟时间,第三个参数是两次执行的时间间隔,第四个参数是时间的单位
        batchTaskPool.scheduleWithFixedDelay(runnable, 3,5, TimeUnit.SECONDS);
    }
}

测试类:(lisi和wangwu消费消息)

package cn.xm.redisChat.one2one;

import cn.xm.jwxt.bean.system.User;

/**
 * @Author: qlq
 * @Description 消息消息
 * @Date: 0:04 2018/10/14
 */
public class ConsumerMessageApp {

    public static void main(String[] args) {
        User sndToUser = new User();
        sndToUser.setUsername("lisi");

        User sndToUser2 = new User();
        sndToUser2.setUsername("wangwu");

        RedisMessageConsumer.consumerMessage(sndToUser);
        RedisMessageConsumer.consumerMessage(sndToUser2);
    }
}

zhangsan给lisi和wangwu发送消息

package cn.xm.redisChat.one2one;

import cn.xm.jwxt.bean.system.User;

/**
 * @Author: qlq
 * @Description 生产消息测试
 * @Date: 23:59 2018/10/13
 */

public class ProducerMessageApp {
    public static void main(String[] args) {
        User sndUser = new User();
        sndUser.setUsername("zhangsan");

        User sndToUser = new User();
        sndToUser.setUsername("lisi");

        User sndToUser2 = new User();
        sndToUser2.setUsername("wangwu");

        RedisMessageProducer.sendMessage(sndUser, sndToUser, "给李四的消息一", "给李四的消息二");
        RedisMessageProducer.sendMessage(sndUser, sndToUser2, "给王五的消息一", "给王五的消息二");
    }
}

1.先启动消费者

2.启动消费者之后

消费者控制台如下:

生产者控制台如下:

3.再次启动消费者之后

消费者控制台:

生产者控制台:

至此实现了简单的一对一聊天,实际上就是简单的一个用户给另一个用户发送消息。上面采用这种方式实现的即使用户上线也会接受之前未接受的消息。只有BRPOP之后消息才会消失。

实际中可以根据需求进行实际的开发,实际中有消息类型、内容等。

有时间的话可以用kindeditor实现一个简单的一对一web聊天系统,这个功能待完成。==============

原文地址:https://www.cnblogs.com/qlqwjy/p/9784956.html

时间: 2024-12-07 00:21:14

Redis实现聊天功能的相关文章

[Asp.net 开发系列之SignalR篇]专题二:使用SignalR实现酷炫端对端聊天功能

一.引言 在前一篇文章已经详细介绍了SignalR了,并且简单介绍它在Asp.net MVC 和WPF中的应用.在上篇博文介绍的都是群发消息的实现,然而,对于SignalR是为了实时聊天而生的,自然少了不像QQ一样的端对端的聊天了.本篇博文将介绍如何使用SignalR来实现类似QQ聊天的功能. 二.使用SignalR实现端对端聊天的思路 在介绍具体实现之前,我先来介绍了使用SignalR实现端对端聊天的思路.相信大家在前篇文章已经看到过Clients.All.sendMessage(name,

我们一起学习WCF 第九篇聊天功能

说到聊天,那么其实就是传输数据,把自己写的东西传给自己想发送的那么人.我总结一下传输有三种方式 1:就是我们常见的数据库传输 2:就是文件(流)传输 3:就是socket传输 今天我们说的wcf实现聊天其实是基于socket的聊天功能(QQ聊天发展到今天肯定是很牛的了,但是最初肯定也是这样的思想) 今天我先说说基于WCF聊天的原理 1:需要一个回调函数(当用户发送的时候会吧信息回调给客户端本身) 2:需要一个委托(把服务器传来的信息显示给前台) 3:需要一个触发点击事件(目的是为了触发把发送信息

安卓平台基于SIP协议实现注册,聊天功能

============问题描述============ 不涉及音频,视频发送,只要实现注册,和聊天功能就行, 网上下了sipdroid的源码,但是XML中配置的<uses-sdk android:minSdkVersion="3" android:targetSdkVersion="4"/> 这段没明白,资料里说安卓最低支持SIP协议的API level是9,但是这版本不匹配呀,有大牛指导下么 ============解决方案1============

Redis多机功能介绍

Redis多机功能目的:以单台Redis服务器过渡到多台Redis服务器 Redis单机在生产环境中存在的问题 1.内存容量不足 Redis使用内存来存书数据库中的数据,但是对于一台机器来说,硬件的内存容量是有限的,当我们需要存储的数据量超过机器的内存数量时,一台服务器就没办法满足我们的要求: 比如:要存储128G的数据,但是机器最大只支持64G内存: 2.处理能力不足 和内存数量限制类似.由于服务器硬件的限制(机器配置/网络限制等),一台服务器能够处理的命令请求数量也是有限的,当我们需要处理的

Java UDP实现聊天功能代码

我以前经常写的是基于TCP的网络编程,由于TCP建立连接鼻血要经过三次握手连接,服务器端需要阻塞式等待客户端的连接.而UDP则是可以直接向目的地址的目的端口上发送数据包,由于它只负责发送出去就好,不管对方是否正确接受到与否,所以当网络性能不好时它容易出现丢包的问题.(注意:UDP是基于数据报为单位进行传输的,而TCP是一种基于流进行传输的) 但是UDP很好的模拟了我们呢平时聊天的方式,可以很好的实现连续多次发送和接受,也就是简单的QQ聊天的功能. 现在来简要介绍Java中有关UDP编程相关的类:

Redis的AOF功能

引言:  Redis是基于内存的数据库,同时也提供了若干持久化的方案,允许用户把内存中的数据,写入本地文件系统,以备下次重启或者当机之后继续使用.本文将描述如何基于Redis来设置AOF功能 什么是Redis的AOF? AOF是AppendOnly File的缩写,是Redis系统提供了一种记录Redis操作的持久化方案,在AOF生成的文件中,将忠实记录发生在Redis的操作,从而达到在Redis服务器重启或者当机之后,继续恢复之前数据状态的机制. 以下我们来简要看看如何在Redis中使用AOF

软件工程腾讯QQ (1)优点:聊天功能比较强大。同时提供安全登陆通道,保障了用户信息的安全性。应用的人群范围比较广 缺点:要展示的信息量太多,用户个人信息容易泄露。有一些弄虚作假通过其欺骗用户上当。 (2)有使用需要自己下载 (3)版本更新修复bug,定期更新版本

1.软件工程未来的发展方向是什么? 2.编程基础不好,我能学习好软件工程吗? 3.软件工程跟物联网之间的区别与联系是什么? 4.软件工程在现实生活中有哪些应用? 5.在软件工程的分类中,如何选择一个适合自己的? 6.软件工程的学习中最难克服的阶段是什么? 7.怎样学好软件工程? 腾讯QQ (1)优点:聊天功能比较强大.同时提供安全登陆通道,保障了用户信息的安全性.应用的人群范围比较广 缺点:要展示的信息量太多,用户个人信息容易泄露.有一些弄虚作假通过其欺骗用户上当. (2)有使用需要自己下载 (

在线聊天功能 实现中俄双语实时翻译 原创 加精!!

要做一个在线实时聊天系统,而且还要实时双语翻译(中.俄).头疼 先找了一个php在线客服系统 带源码的.看懂源码后在上面加翻译功能,一共耗时4天弄出来的. 代码如下: if(mb_detect_encoding($fdata,"UTF-8, GBK") != "UTF-8"){     //$fdata = mb_detect_encoding($fdata,"UTF-8, GBK")     $fdata = mb_convert_encodi

提高逼格,给自己的网站加入智能聊天功能

提高逼格,给自己的网站加入智能聊天功能 引言 现在突然发现有很多 QQ 群都开启了群机器人的功能,其中有两个角色,他们分别是:Baby Q 和 QQ 小冰.在 Q 群中,你可以对他们进行任意程度的调戏,不过,遗憾的是鱼和熊掌不可得兼,一个群只能进行二选一.据说 Baby Q 来自图灵公司,而小冰却是出生自微软公司. 无论是Baby Q,还是小冰,向我们展现的都是人工智能技术.这些产品都是利用深度神经网络算法模仿人脑的运算方式,让机器可以持续的自我学习. 目录 简介 机器人看板 简单调用 API