redis队列

1、lpush+rpop

  采用rpop需要不停调用rpop方法查看list中是否有待处理消息。每调用一次都会发起一次连接,造成不必要浪费

  代码:

      producer: 

package com.eval.mind.service.redis;

import java.util.UUID;
import java.util.concurrent.TimeUnit;

import redis.clients.jedis.Jedis;
public class Producer_Lpush extends Thread{

    public static final String MESSAGE_KEY = "message:queue";
    private Jedis jedis;
    private String producerName;
    private volatile int count;

    public Producer_Lpush(String name) {
        this.producerName = name;
        init();
    }

    private void init() {
        jedis=new Jedis("192.168.80.4",6379);
    }

    public void putMessage(String message) {
        Long size = jedis.lpush(MESSAGE_KEY, message);
        System.out.println(producerName + ": 当前未被处理消息条数为:" + size);
        count++;
    }

    public int getCount() {
        return count;
    }

    @Override
    public void run() {
        try {
            while (true) {
                putMessage(UUID.randomUUID().toString());
                TimeUnit.SECONDS.sleep(1);
            }
        } catch (InterruptedException e) {

        } catch (Exception e) {
            e.printStackTrace();
        }
    }

    public static void main(String[] args) throws InterruptedException{
        Producer_Lpush producer = new Producer_Lpush("myProducer");
        producer.start();

        for(; ;) {
            System.out.println("main : 已存储消息条数:" + producer.getCount());
            TimeUnit.SECONDS.sleep(10);
        }
    }
}

    

         consumer:

package com.eval.mind.service.redis;

import redis.clients.jedis.Jedis;

/*
 * rpop从redis队列中pop处一个String类型元素,String mes=jedis.rpop(key)
 * 缺点是消费端需要不停的调用rpop方法查看list是否有待处理消息。每调用依次都会发起依次连接,这会造成不必要浪费;为解决此参考Consumer_Brpop.java方法
 */
public class Consumer_Rpop extends Thread {
    private String customerName;
    private volatile int count;
    private Jedis jedis;

    public Consumer_Rpop(String name) {
            this.customerName = name;
            init();
        }

    private void init() {
        jedis = new Jedis("192.168.80.4", 6379);
    }

    public void processMessage() {
        String message = jedis.rpop(Producer_Lpush.MESSAGE_KEY); //如果redis队列中没有数据,也会一直调用
        if (message != null) {
            count++;
            handle(message);
        }
    }

    public void handle(String message) {
        System.out.println(customerName + " 正在处理消息,消息内容是: " + message + " 这是第" + count + "条");
    }

    @Override
    public void run() {
        while (true) {
            processMessage();
        }
    }

    public static void main(String[] args) {
        Consumer_Rpop customer = new Consumer_Rpop("yamikaze");
        customer.start();
    }
}

2、lpush+brpop

  brpop:blocking rpop,采用brpop时,如果redis队列中不存在数据则调用List<String> messages=jedis.brpop(int i,key1,key2)时会阻塞,不会往下执行,一直等待redis队列中再次push进数据后继续执行pop操作

      * brpop支持多个列表(队列)

   * brpop指令是支持队列优先级的,比如这个例子中MESSAGE_KEY的优先级大于testKey(依据brpop中顺序决定)。
   * 如果两个列表中都有元素,会优先返回优先级高的列表中的元素,所以这儿优先返回MESSAGE_KEY
   * 0表示不限制等待,会一直阻塞在这儿

  代码:

      producer:  

package com.eval.mind.service.redis;

import java.util.UUID;
import java.util.concurrent.TimeUnit;

import redis.clients.jedis.Jedis;
public class Producer_Lpush extends Thread{

    public static final String MESSAGE_KEY = "message:queue";
    private Jedis jedis;
    private String producerName;
    private volatile int count;

    public Producer_Lpush(String name) {
        this.producerName = name;
        init();
    }

    private void init() {
        jedis=new Jedis("192.168.80.4",6379);
    }

    public void putMessage(String message) {
        Long size = jedis.lpush(MESSAGE_KEY, message);
        System.out.println(producerName + ": 当前未被处理消息条数为:" + size);
        count++;
    }

    public int getCount() {
        return count;
    }

    @Override
    public void run() {
        try {
            while (true) {
                putMessage(UUID.randomUUID().toString());
                TimeUnit.SECONDS.sleep(1);
            }
        } catch (InterruptedException e) {

        } catch (Exception e) {
            e.printStackTrace();
        }
    }

    public static void main(String[] args) throws InterruptedException{
        Producer_Lpush producer = new Producer_Lpush("myProducer");
        producer.start();

        for(; ;) {
            System.out.println("main : 已存储消息条数:" + producer.getCount());
            TimeUnit.SECONDS.sleep(10);
        }
    }
}

        consumer:   

package com.eval.mind.service.redis;

import java.util.List;

import org.springframework.util.CollectionUtils;

import redis.clients.jedis.Jedis;

/*
 * brpop:blocking rpop,这个指令只有在有元素时才返回,没有则会阻塞到超时返回null;
 * brpop支持多个列表(队列),支持队列优先级,比如messagekey优先级大于testkey,如果两个列表中都有元素,会优先返回messagekey列表中元素
 * 返回List<String>类型  List<String> messages=jedis.brpop(key);
 */
public class Consumer_Brpop extends Thread{
    private volatile int count;
    String name;
    private Jedis jedis;
    public Consumer_Brpop(String name) {
        this.name=name;
        init();
    }
    private void init() {
        jedis = new Jedis("192.168.80.4", 6379);
    }

    private void processMessage() {
        List<String> messages=jedis.brpop(0, "message:queue","message:tmp"); //如果redis队列中没有数据,此处会阻塞不会往下执行,直到redis队列又push进数据
        if(!CollectionUtils.isEmpty(messages)) {
            String keyName=messages.get(0);
            String messageValue=messages.get(1);
            count++;
            handle(messageValue);
        }
    }

    public void handle(String message) {
        System.out.println(name + " 正在处理消息,消息内容是: " + message + " 这是第" + count + "条");
    }

    @Override
    public void run() {
        while (true) {
            processMessage();
        }
    }

    public static void main(String[] args) {
        Consumer_Brpop customer = new Consumer_Brpop("yamikaze");
        customer.start();
    }
}

  

3、publish+subscribe

   redis除了对消息队列提供支持外,还提供一组命令用于支持发布/订阅模式

   发布:

      publish指令可用于发布一条消息:publish channel message

   订阅:

      subscribe指令用于接收一条消息:subscribe channel

      可以看到使用subscribe指令进入订阅模式后,并没有接收到publish发送的消息,这是因为只有在消息发送出去前才会收到,也就是说订阅subscribe启动要在          publish之前执行

   订阅发布模式和消息队列模式区别:消息队列模式是通过key方式实现,取出就删除了,其他进程取不到。订阅发布可以支持多客户端获取同一个频道                                (channel)发布的消息

    

      代码:

    publish:  

package com.eval.mind.service.redis;

import org.apache.commons.lang3.StringUtils;

import redis.clients.jedis.Jedis;

public class Producer_Publish {
    public static final String Channel_key="channel:message";
    private Jedis jedis;

    public Producer_Publish() {
        jedis=new Jedis("192.168.80.4",6379);
    }

    public void publishMessage(String message) {
        if(StringUtils.isNoneBlank(message)) {
            return;
        }
        jedis.publish(Channel_key, message); //publish方法 发布消息
    }

    public static void main(String[] args) {
        Producer_Publish publisher=new Producer_Publish();
        publisher.publishMessage("Hello Publish Redis");
    }
}

    subscribe:   

package com.eval.mind.service.redis;

import org.apache.commons.lang3.StringUtils;

import redis.clients.jedis.Jedis;
import redis.clients.jedis.JedisPubSub;

public class Consumer_Subscribe {

    private Jedis jedis;
    private static final String EXIT_COMMAND = "exit";

    public Consumer_Subscribe() {
        jedis = new Jedis("192.168.80.4", 6379);
    }

    public void subscribe(String channel) { // subscribe方法订阅消息
        if (StringUtils.isBlank(channel)) {
            return;
        }
        // JedisPubSub类是一个没有抽象方法的抽象类,里边方法时一些空实现,可以选择需要的方法覆盖,这里使用的是subscribe指令,所以覆盖了onMessage
        // 如果使用pubsubscribe指令则覆盖onPmessage方法

        JedisPubSub jps = new JedisPubSub() {

            @Override
            public void onMessage(String channel, String message) {
                while (true) {       //相对于redis队列只能消费一次,此处会对channel_key一直订阅,
                    if (Producer_Publish.Channel_key.equals(channel)) {
                        System.out.println("接收到消息: channel : " + message);
                        // 接收到exit消息后退出
                        if (EXIT_COMMAND.equals(message)) {
                            System.exit(0);
                        }
                    }
                }
            }

            /**
             * 订阅时
             */
            @Override
            public void onSubscribe(String channel, int subscribedChannels) {
                if (Producer_Publish.Channel_key.equals(channel)) {
                    System.out.println("订阅了频道:" + channel);
                }
            }
        };
        jedis.subscribe(jps, channel);
    }

    public static void main(String[] args) {
        Consumer_Subscribe client = new Consumer_Subscribe();
        client.subscribe(Producer_Publish.Channel_key);
    }
}

  

原文地址:https://www.cnblogs.com/enhance/p/11118714.html

时间: 2024-08-29 17:11:36

redis队列的相关文章

PHP电商订单自动确认收货redis队列

一.场景 之前做的电商平台,用户在收到货之后,大部分都不会主动的点击确认收货,导致给商家结款的时候,商家各种投诉,于是就根据需求,要做一个订单在发货之后的x天自动确认收货.所谓的订单自动确认收货,就是在在特定的时间,执行一条update语句,改变订单的状态. 二.思路 最笨重的做法,通过linux后台定时任务,查询符合条件的订单,然后update.最理想情况下,如果每分钟都有需要update的订单,这种方式也还行.奈何平台太小,以及卖家发货时间大部分也是密集的,不会分散在24小时的每分钟.那么,

转载:【高并发简单解决方案 | 靠谱崔小拽 】redis队列缓存 + mysql 批量入库 + php离线整合

需求背景:有个调用统计日志存储和统计需求,要求存储到mysql中:存储数据高峰能达到日均千万,瓶颈在于直接入库并发太高,可能会把mysql干垮. 问题分析 思考:应用网站架构的衍化过程中,应用最新的框架和工具技术固然是最优选择:但是,如果能在现有的框架的基础上提出简单可依赖的解决方案,未尝不是一种提升自我的尝试. 解决: 问题一:要求日志最好入库:但是,直接入库mysql确实扛不住,批量入库没有问题,done.[批量入库和直接入库性能差异参考文章] 问题二:批量入库就需要有高并发的消息队列,决定

c#之Redis队列在邮件提醒中的应用

场景 有这样一个场景,一个邮件提醒的windows服务,获取所有开启邮件提醒的用户,循环获取这些用户的邮件,发送一条服务号消息.但问题来了,用户比较少的情况下,轮询一遍时间还能忍受,如果用户多了,那用户名称排序靠后的人,收到邮件提醒的消息,延迟时间就非常长了. 准备 c#之Redis实践list,hashtable c#之Redis队列 方案 1.生产者线程一获取所有开启邮件提醒的用户. 2.根据配置来决定使用多少个队列,以及每个队列的容量. 3.线程一,获取未满的队列,将当前用户入队.如果所有

(3)redis队列功能

Redis队列功能介绍 List 常用命令: Blpop删除,并获得该列表中的第一元素,或阻塞,直到有一个可用 Brpop删除,并获得该列表中的最后一个元素,或阻塞,直到有一个可用 Brpoplpush Lindex获取一个元素,通过其索引列表 Linsert在列表中的另一个元素之前或之后插入一个元素 Llen获得队列(List)的长度 Lpop从队列的左边出队一个元素 Lpush从队列的左边入队一个或多个元素 Lpushx当队列存在时,从队到左边入队一个元素 Lrange从列表中获取指定返回的

高并发简单解决方案————redis队列缓存+mysql 批量入库

问题分析 思考:应用网站架构的衍化过程中,应用最新的框架和工具技术固然是最优选择:但是,如果能在现有的框架的基础上提出简单可依赖的解决方案,未尝不是一种提升自我的尝试. 解决: 问题一:要求日志最好入库:但是,直接入库mysql确实扛不住,批量入库没有问题,done.[批量入库和直接入库性能差异] 问题二:批量入库就需要有高并发的消息队列,决定采用redis list 仿真实现,而且方便回滚. 问题三:日志量毕竟大,保存最近30条足矣,决定用php写个离线统计和清理脚本. done,下面是小拽的

redis队列处理文件并发(日志处理)

多线程操作同一个文件时会出现并发问题.解决的一个办法就是给文件加锁(lock),但是这样的话,一个线程操作文件时,其它的都得等待,这样的话性能非常差. 另外一个解决方案,就是先将数据放在队列中,然后开启一个线程,负责从队列中取出数据,再写到文件中. using log4net; using RedisMvcApp.Models; using System; using System.Collections.Generic; using System.IO; using System.Linq;

【高并发简单解决方案】redis队列缓存 + mysql 批量入库 + php离线整合

原文地址 :https://segmentfault.com/a/1190000004136250需求背景:有个调用统计日志存储和统计需求,要求存储到mysql中:存储数据高峰能达到日均千万,瓶颈在于直接入库并发太高,可能会把mysql干垮. 问题分析 思考:应用网站架构的衍化过程中,应用最新的框架和工具技术固然是最优选择:但是,如果能在现有的框架的基础上提出简单可依赖的解决方案,未尝不是一种提升自我的尝试. 解决: 问题一:要求日志最好入库:但是,直接入库mysql确实扛不住,批量入库没有问题

laravel中redis队列的使用

一.配置文件 首先我们需要在配置文件中配置默认队列驱动为Redis,队列配置文件是config/queue.php: return [ 'default' => env('QUEUE_DRIVER', 'sync'), 'connections' => [ 'sync' => [ 'driver' => 'sync', ], 'database' => [ 'driver' => 'database', 'table' => 'jobs', 'queue' =&g

redis 队列缓存 + mysql 批量入库 + php 离线整合

问题分析 思考:应用网站架构的衍化过程中,应用最新的框架和工具技术固然是最优选择:但是,如果能在现有的框架的基础上提出简单可依赖的解决方案,未尝不是一种提升自我的尝试. 解决: 问题一:要求日志最好入库:但是,直接入库mysql确实扛不住,批量入库没有问题,done.[批量入库和直接入库性能差异参考文章] 问题二:批量入库就需要有高并发的消息队列,决定采用redis list 仿真实现,而且方便回滚. 问题三:日志量毕竟大,保存最近30条足矣,决定用php写个离线统计和清理脚本. 一:设计数据库

c#之Redis队列

摘要 这两天一直在考虑redis队列:一个生产者,多个消费者的情况,这里弄了一个demo进行测试. 一个例子 关于如何引用Redisclient 可以参考之前的这篇文章:c#之Redis实践list,hashtable 生产者一个线程,然后开启多个线程用来消费数据. 代码如下: using System; using System.Collections.Generic; using System.Linq; using System.Text; using System.Threading.T