Java利用Redis实现消息队列

应用场景

  • 为什么要用redis?
    二进制存储、java序列化传输、IO连接数高、连接频繁

一、序列化

  这里编写了一个java序列化的工具,主要是将对象转化为byte数组,和根据byte数组反序列化成java对象; 主要是用到了ByteArrayOutputStream和ByteArrayInputStream; 注意:每个需要序列化的对象都要实现Serializable接口; 
其代码如下:

 1 package Utils;
 2 import java.io.*;
 3 /**
 4  * Created by Kinglf on 2016/10/17.
 5  */
 6 public class ObjectUtil {
 7     /**
 8      * 对象转byte[]
 9      * @param obj
10      * @return
11      * @throws IOException
12      */
13     public static byte[] object2Bytes(Object obj) throws IOException{
14         ByteArrayOutputStream bo=new ByteArrayOutputStream();
15         ObjectOutputStream oo=new ObjectOutputStream(bo);
16         oo.writeObject(obj);
17         byte[] bytes=bo.toByteArray();
18         bo.close();
19         oo.close();
20         return bytes;
21     }
22     /**
23      * byte[]转对象
24      * @param bytes
25      * @return
26      * @throws Exception
27      */
28     public static Object bytes2Object(byte[] bytes) throws Exception{
29         ByteArrayInputStream in=new ByteArrayInputStream(bytes);
30         ObjectInputStream sIn=new ObjectInputStream(in);
31         return sIn.readObject();
32     }
33 }

二、消息类(实现Serializable接口)

package Model;

import java.io.Serializable;

/**
 * Created by Kinglf on 2016/10/17.
 */
public class Message implements Serializable {

    private static final long serialVersionUID = -389326121047047723L;
    private int id;
    private String content;
    public Message(int id, String content) {
        this.id = id;
        this.content = content;
    }
    public int getId() {
        return id;
    }
    public void setId(int id) {
        this.id = id;
    }
    public String getContent() {
        return content;
    }
    public void setContent(String content) {
        this.content = content;
    }
}

三、Redis的操作

  利用redis做队列,我们采用的是redis中list的push和pop操作; 
结合队列的特点: 
只允许在一端插入新元素只能在队列的尾部FIFO:先进先出原则 Redis中lpush头入(rpop尾出)或rpush尾入(lpop头出)可以满足要求,而Redis中list药push或 pop的对象仅需要转换成byte[]即可 
  java采用Jedis进行Redis的存储和Redis的连接池设置 
上代码:

package Utils;
import redis.clients.jedis.Jedis;
import redis.clients.jedis.JedisPool;
import redis.clients.jedis.JedisPoolConfig;
import java.util.List;
import java.util.Map;
import java.util.Set;
/**
 * Created by Kinglf on 2016/10/17.
 */
public class JedisUtil {
    private static String JEDIS_IP;
    private static int JEDIS_PORT;
    private static String JEDIS_PASSWORD;
    private static JedisPool jedisPool;
    static {
        //Configuration自行写的配置文件解析类,继承自Properties
        Configuration conf=Configuration.getInstance();
        JEDIS_IP=conf.getString("jedis.ip","127.0.0.1");
        JEDIS_PORT=conf.getInt("jedis.port",6379);
        JEDIS_PASSWORD=conf.getString("jedis.password",null);
        JedisPoolConfig config=new JedisPoolConfig();
        config.setMaxActive(5000);
        config.setMaxIdle(256);
        config.setMaxWait(5000L);
        config.setTestOnBorrow(true);
        config.setTestOnReturn(true);
        config.setTestWhileIdle(true);
        config.setMinEvictableIdleTimeMillis(60000L);
        config.setTimeBetweenEvictionRunsMillis(3000L);
        config.setNumTestsPerEvictionRun(-1);
        jedisPool=new JedisPool(config,JEDIS_IP,JEDIS_PORT,60000);
    }
    /**
     * 获取数据
     * @param key
     * @return
     */
    public static String get(String key){
        String value=null;
        Jedis jedis=null;
        try{
            jedis=jedisPool.getResource();
            value=jedis.get(key);
        }catch (Exception e){
            jedisPool.returnBrokenResource(jedis);
            e.printStackTrace();
        }finally {
            close(jedis);
        }
        return value;
    }

    private static void close(Jedis jedis) {
        try{
            jedisPool.returnResource(jedis);
        }catch (Exception e){
            if(jedis.isConnected()){
                jedis.quit();
                jedis.disconnect();
            }
        }
    }
    public static byte[] get(byte[] key){
        byte[] value = null;
        Jedis jedis = null;
        try {
            jedis = jedisPool.getResource();
            value = jedis.get(key);
        } catch (Exception e) {
            //释放redis对象
            jedisPool.returnBrokenResource(jedis);
            e.printStackTrace();
        } finally {
            //返还到连接池
            close(jedis);
        }

        return value;
    }

    public static void set(byte[] key, byte[] value) {

        Jedis jedis = null;
        try {
            jedis = jedisPool.getResource();
            jedis.set(key, value);
        } catch (Exception e) {
            //释放redis对象
            jedisPool.returnBrokenResource(jedis);
            e.printStackTrace();
        } finally {
            //返还到连接池
            close(jedis);
        }
    }

    public static void set(byte[] key, byte[] value, int time) {

        Jedis jedis = null;
        try {
            jedis = jedisPool.getResource();
            jedis.set(key, value);
            jedis.expire(key, time);
        } catch (Exception e) {
            //释放redis对象
            jedisPool.returnBrokenResource(jedis);
            e.printStackTrace();
        } finally {
            //返还到连接池
            close(jedis);
        }
    }

    public static void hset(byte[] key, byte[] field, byte[] value) {
        Jedis jedis = null;
        try {
            jedis = jedisPool.getResource();
            jedis.hset(key, field, value);
        } catch (Exception e) {
            //释放redis对象
            jedisPool.returnBrokenResource(jedis);
            e.printStackTrace();
        } finally {
            //返还到连接池
            close(jedis);
        }
    }

    public static void hset(String key, String field, String value) {
        Jedis jedis = null;
        try {
            jedis = jedisPool.getResource();
            jedis.hset(key, field, value);
        } catch (Exception e) {
            //释放redis对象
            jedisPool.returnBrokenResource(jedis);
            e.printStackTrace();
        } finally {
            //返还到连接池
            close(jedis);
        }
    }

    /**
     * 获取数据
     *
     * @param key
     * @return
     */
    public static String hget(String key, String field) {

        String value = null;
        Jedis jedis = null;
        try {
            jedis = jedisPool.getResource();
            value = jedis.hget(key, field);
        } catch (Exception e) {
            //释放redis对象
            jedisPool.returnBrokenResource(jedis);
            e.printStackTrace();
        } finally {
            //返还到连接池
            close(jedis);
        }

        return value;
    }
    /**
     * 获取数据
     *
     * @param key
     * @return
     */
    public static byte[] hget(byte[] key, byte[] field) {

        byte[] value = null;
        Jedis jedis = null;
        try {
            jedis = jedisPool.getResource();
            value = jedis.hget(key, field);
        } catch (Exception e) {
            //释放redis对象
            jedisPool.returnBrokenResource(jedis);
            e.printStackTrace();
        } finally {
            //返还到连接池
            close(jedis);
        }

        return value;
    }
    public static void hdel(byte[] key, byte[] field) {

        Jedis jedis = null;
        try {
            jedis = jedisPool.getResource();
            jedis.hdel(key, field);
        } catch (Exception e) {
            //释放redis对象
            jedisPool.returnBrokenResource(jedis);
            e.printStackTrace();
        } finally {
            //返还到连接池
            close(jedis);
        }
    }
    /**
     * 存储REDIS队列 顺序存储
     * @param  key reids键名
     * @param  value 键值
     */
    public static void lpush(byte[] key, byte[] value) {

        Jedis jedis = null;
        try {
            jedis = jedisPool.getResource();
            jedis.lpush(key, value);
        } catch (Exception e) {
            //释放redis对象
            jedisPool.returnBrokenResource(jedis);
            e.printStackTrace();
        } finally {
            //返还到连接池
            close(jedis);
        }
    }

    /**
     * 存储REDIS队列 反向存储
     * @param  key reids键名
     * @param  value 键值
     */
    public static void rpush(byte[] key, byte[] value) {

        Jedis jedis = null;
        try {

            jedis = jedisPool.getResource();
            jedis.rpush(key, value);

        } catch (Exception e) {

            //释放redis对象
            jedisPool.returnBrokenResource(jedis);
            e.printStackTrace();

        } finally {

            //返还到连接池
            close(jedis);

        }
    }

    /**
     * 将列表 source 中的最后一个元素(尾元素)弹出,并返回给客户端
     * @param  key reids键名
     * @param  destination 键值
     */
    public static void rpoplpush(byte[] key, byte[] destination) {

        Jedis jedis = null;
        try {

            jedis = jedisPool.getResource();
            jedis.rpoplpush(key, destination);

        } catch (Exception e) {

            //释放redis对象
            jedisPool.returnBrokenResource(jedis);
            e.printStackTrace();

        } finally {

            //返还到连接池
            close(jedis);

        }
    }

    /**
     * 获取队列数据
     * @param  key 键名
     * @return
     */
    public static List lpopList(byte[] key) {

        List list = null;
        Jedis jedis = null;
        try {

            jedis = jedisPool.getResource();
            list = jedis.lrange(key, 0, -1);

        } catch (Exception e) {

            //释放redis对象
            jedisPool.returnBrokenResource(jedis);
            e.printStackTrace();

        } finally {

            //返还到连接池
            close(jedis);

        }
        return list;
    }
    /**
     * 获取队列数据
     * @param  key 键名
     * @return
     */
    public static byte[] rpop(byte[] key) {

        byte[] bytes = null;
        Jedis jedis = null;
        try {

            jedis = jedisPool.getResource();
            bytes = jedis.rpop(key);

        } catch (Exception e) {

            //释放redis对象
            jedisPool.returnBrokenResource(jedis);
            e.printStackTrace();

        } finally {

            //返还到连接池
            close(jedis);

        }
        return bytes;
    }
    public static void hmset(Object key, Map hash) {
        Jedis jedis = null;
        try {
            jedis = jedisPool.getResource();
            jedis.hmset(key.toString(), hash);
        } catch (Exception e) {
            //释放redis对象
            jedisPool.returnBrokenResource(jedis);
            e.printStackTrace();

        } finally {
            //返还到连接池
            close(jedis);

        }
    }
    public static void hmset(Object key, Map hash, int time) {
        Jedis jedis = null;
        try {

            jedis = jedisPool.getResource();
            jedis.hmset(key.toString(), hash);
            jedis.expire(key.toString(), time);
        } catch (Exception e) {
            //释放redis对象
            jedisPool.returnBrokenResource(jedis);
            e.printStackTrace();

        } finally {
            //返还到连接池
            close(jedis);

        }
    }
    public static List hmget(Object key, String... fields) {
        List result = null;
        Jedis jedis = null;
        try {

            jedis = jedisPool.getResource();
            result = jedis.hmget(key.toString(), fields);

        } catch (Exception e) {
            //释放redis对象
            jedisPool.returnBrokenResource(jedis);
            e.printStackTrace();

        } finally {
            //返还到连接池
            close(jedis);

        }
        return result;
    }

    public static Set hkeys(String key) {
        Set result = null;
        Jedis jedis = null;
        try {
            jedis = jedisPool.getResource();
            result = jedis.hkeys(key);

        } catch (Exception e) {
            //释放redis对象
            jedisPool.returnBrokenResource(jedis);
            e.printStackTrace();

        } finally {
            //返还到连接池
            close(jedis);

        }
        return result;
    }
    public static List lrange(byte[] key, int from, int to) {
        List result = null;
        Jedis jedis = null;
        try {
            jedis = jedisPool.getResource();
            result = jedis.lrange(key, from, to);

        } catch (Exception e) {
            //释放redis对象
            jedisPool.returnBrokenResource(jedis);
            e.printStackTrace();

        } finally {
            //返还到连接池
            close(jedis);

        }
        return result;
    }
    public static Map hgetAll(byte[] key) {
        Map result = null;
        Jedis jedis = null;
        try {
            jedis = jedisPool.getResource();
            result = jedis.hgetAll(key);
        } catch (Exception e) {
            //释放redis对象
            jedisPool.returnBrokenResource(jedis);
            e.printStackTrace();

        } finally {
            //返还到连接池
            close(jedis);
        }
        return result;
    }

    public static void del(byte[] key) {

        Jedis jedis = null;
        try {
            jedis = jedisPool.getResource();
            jedis.del(key);
        } catch (Exception e) {
            //释放redis对象
            jedisPool.returnBrokenResource(jedis);
            e.printStackTrace();
        } finally {
            //返还到连接池
            close(jedis);
        }
    }

    public static long llen(byte[] key) {

        long len = 0;
        Jedis jedis = null;
        try {
            jedis = jedisPool.getResource();
            jedis.llen(key);
        } catch (Exception e) {
            //释放redis对象
            jedisPool.returnBrokenResource(jedis);
            e.printStackTrace();
        } finally {
            //返还到连接池
            close(jedis);
        }
        return len;
    }
}

四、Configuration主要用于读取Redis的配置信息

package Utils;

import java.io.IOException;
import java.io.InputStream;
import java.util.Properties;

/**
 * Created by Kinglf on 2016/10/17.
 */
public class Configuration extends Properties {

    private static final long serialVersionUID = -2296275030489943706L;
    private static Configuration instance = null;

    public static synchronized Configuration getInstance() {
        if (instance == null) {
            instance = new Configuration();
        }
        return instance;
    }

    public String getProperty(String key, String defaultValue) {
        String val = getProperty(key);
        return (val == null || val.isEmpty()) ? defaultValue : val;
    }

    public String getString(String name, String defaultValue) {
        return this.getProperty(name, defaultValue);
    }

    public int getInt(String name, int defaultValue) {
        String val = this.getProperty(name);
        return (val == null || val.isEmpty()) ? defaultValue : Integer.parseInt(val);
    }

    public long getLong(String name, long defaultValue) {
        String val = this.getProperty(name);
        return (val == null || val.isEmpty()) ? defaultValue : Integer.parseInt(val);
    }

    public float getFloat(String name, float defaultValue) {
        String val = this.getProperty(name);
        return (val == null || val.isEmpty()) ? defaultValue : Float.parseFloat(val);
    }

    public double getDouble(String name, double defaultValue) {
        String val = this.getProperty(name);
        return (val == null || val.isEmpty()) ? defaultValue : Double.parseDouble(val);
    }

    public byte getByte(String name, byte defaultValue) {
        String val = this.getProperty(name);
        return (val == null || val.isEmpty()) ? defaultValue : Byte.parseByte(val);
    }

    public Configuration() {
        InputStream in = ClassLoader.getSystemClassLoader().getResourceAsStream("config.xml");
        try {
            this.loadFromXML(in);
            in.close();
        } catch (IOException ioe) {

        }
    }
}

五、测试

import Model.Message;
import Utils.JedisUtil;
import Utils.ObjectUtil;
import redis.clients.jedis.Jedis;

import java.io.IOException;

/**
 * Created by Kinglf on 2016/10/17.
 */
public class TestRedisQueue {
    public static byte[] redisKey = "key".getBytes();
    static {
        try {
            init();
        } catch (IOException e) {
            e.printStackTrace();
        }
    }

    private static void init() throws IOException {
        for (int i = 0; i < 1000000; i++) {
            Message message = new Message(i, "这是第" + i + "个内容");
            JedisUtil.lpush(redisKey, ObjectUtil.object2Bytes(message));
        }

    }

    public static void main(String[] args) {
        try {
            pop();
        } catch (Exception e) {
            e.printStackTrace();
        }
    }

    private static void pop() throws Exception {
        byte[] bytes = JedisUtil.rpop(redisKey);
        Message msg = (Message) ObjectUtil.bytes2Object(bytes);
        if (msg != null) {
            System.out.println(msg.getId() + "----" + msg.getContent());
        }
    }
}

每执行一次pop()方法,结果如下:<br>1----这是第1个内容<br>2----这是第2个内容<br>3----这是第3个内容<br>4----这是第4个内容

总结

至此,整个Redis消息队列的生产者和消费者代码已经完成

  1. Message 需要传送的实体类(需实现Serializable接口)
  2. Configuration Redis的配置读取类,继承自Properties
  3. ObjectUtil 将对象和byte数组双向转换的工具类
  4. Jedis 通过消息队列的先进先出(FIFO)的特点结合Redis的list中的push和pop操作进行封装的工具类
时间: 2024-10-09 23:06:00

Java利用Redis实现消息队列的相关文章

PHP中利用redis实现消息队列处理高并发请求

将请求存入redis 为了模拟多个用户的请求,使用一个for循环替代 //redis数据入队操作 $redis = new Redis(); $redis->connect('127.0.0.1',6379); for($i=0;$i<50;$i++){ try{ $redis->LPUSH('click',rand(1000,5000)); }catch(Exception $e){ echo $e->getMessage(); } } 在后台进行数据处理 守护进程 //redi

Redis 做消息队列

一般来说,消息队列有两种场景,一种是发布者订阅者模式,一种是生产者消费者模式.利用redis这两种场景的消息队列都能够实现.定义: 生产者消费者模式:生产者生产消息放到队列里,多个消费者同时监听队列,谁先抢到消息谁就会从队列中取走消息:即对于每个消息只能被最多一个消费者拥有. 发布者订阅者模式:发布者生产消息放到队列里,多个监听队列的消费者都会收到同一份消息:即正常情况下每个消费者收到的消息应该都是一样的. 那么如此多的MQ产品,为什么要使用redis作消息队列呢?以下附上一份总结了别人的一些r

NoSQL初探之人人都爱Redis:(3)使用Redis作为消息队列服务场景应用案例

一.消息队列场景简介 “消息”是在两台计算机间传送的数据单位.消息可以非常简单,例如只包含文本字符串:也可以更复杂,可能包含嵌入对象.消息被发送到队列中,“消息队列”是在消息的传输过程中保存消息的容器. 在目前广泛的Web应用中,都会出现一种场景:在某一个时刻,网站会迎来一个用户请求的高峰期(比如:淘宝的双十一购物狂欢节,12306的春运抢票节等),一般的设计中,用户的请求都会被直接写入数据库或文件中,在高并发的情形下会对数据库服务器或文件服务器造成巨大的压力,同时呢,也使响应延迟加剧.这也说明

php+redis实现消息队列

原文地址:http://www.cnblogs.com/lisqiong/p/6039460.htmlphp+redis实现消息队列 ? 个人理解在项目中使用消息队列一般是有如下几个原因: 把瞬间服务器的请求处理换成异步处理,缓解服务器的压力 实现数据顺序排列获取 ?redis实现消息队列步骤如下: 1).redis函数rpush,lpop 2).建议定时任务入队列 3)创建定时任务出队列 文件:demo.php插入数据到redis队列 1 2 3 4 5 6 7 8 9 10 11 12 13

利用System V消息队列实现回射客户/服务器

一.介绍 在学习UNIX网络编程 卷1时,我们当时可以利用Socket套接字来实现回射客户/服务器程序,但是Socket编程是存在一些不足的,例如: 1. 服务器必须启动之时,客户端才能连上服务端,并与服务端进行通信: 2. 利用套接口描述符进行通信,必须知道对端的IP与端口. 二.相关函数介绍 下面,我们利用System V消息队列来实现进程间的通信: 首先,我们先来了解一下下面几个函数: 1. msgget: 该函数用于打开或创建消息队列,其作用相当与文件操作函数open. #include

为什么学习Redis作为消息队列服务器

使用Redis作为消息队列服务场景 " 消息 "是在两台计算机间传送的数据单位.消息可以非常简单,例如只包含文本字符串:也可以更复杂,可能包含嵌入对象.消息被发送到队列中," 消息队列 "是在消息的传输过程中保存消息的 容器 . 在目前广泛的Web应用中,都会出现一种场景:在某一个时刻,网站会迎来一个用户请求的高峰期(比如:淘宝的双十一购物狂欢节,12306的春运抢票节等),一般的设计中,用户的请求都会被直接写入数据库或文件中,在高并发的情形下会对数据库服务器或文件

PHP + Redis 实现消息队列

Redis做消息队列的好处在于它的轻量级,高并发,延迟敏感,应用场景有 即时数据分析.秒杀计数器.缓存等 Redis做消息队列待解决的问题: 1.消息的可靠性: 没有相应的机制保证消息的消费,当消费者消费失败的时候,消息体丢失,需要手动处理.生产者只管向队列中插入数据,不管消费者是否成功消费. 2.消费者挂掉消息不会丢失,但是需要重新触发一下消费者,才能够继续消费消息. 代码如下: lib.php 是工具文件,里面有数据库的连接.Redis的连接: <?php /** * 获取数据库连接 * *

用redis实现消息队列

为什么需要消息队列 系统中引入消息队列机制是对系统一个非常大的改善.例如一个web系统中,用户做了某项操作后需要发送邮件通知到用户邮箱中.你可以使用同步方式让用户等待邮件发送完成后反馈给用户,但是这样可能会因为网络的不确定性造成用户长时间的等待从而影响用户体验. 有些场景下是不可能使用同步方式等待完成的,那些需要后台花费大量时间的操作.例如极端例子,一个在线编译系统任务,后台编译完成需要30分钟.这种场景的设计不可能同步等待后在回馈,必须是先反馈用户随后异步处理完成,再等待处理完成后根据情况再此

jedis实现redis的消息队列、发布对象消息、字节数组与字符串相互转换

redis支持发布/订阅的消息队列机制,jedis提供了java访问redis的客户端,本文将描述如何用jedis实现简单的消息队列,并传输对象. redis支持发布.订阅的功能,基本的命令有publish.subscribe等.在jedis中,有对应的java方法,并且只能发布字符串消息.为了传输对象,需要将对象进行序列化,并封装成字符串进行处理.将对象序列化后,只能成为字节流,如何封装成字符串是一个难点,具体可参考下面的代码. 实现三个类,一个对应publish.一个对应subscribe.