【并发】8、借助redis 实现多线程生产消费阻塞队列

顾名思义这个就是再消费的时候,不是之前的那哥用yield进行线程切换的操作,而是用线程等待阻塞的方式去执行,说实话我感觉效率不一定有之前那个好,

因为我对这种阻塞队列使用的时候,之前有发现阻塞队列,塞着塞着线程就会进入假死状态,这个很奇怪,但是有的时候又是好的,这个也不清楚到底是为什么

但是毕竟也是一种实现,我就写出来了看看吧

生产者

package queue.redisQueue;

import queue.fqueue.vo.TempVo;
import redis.clients.jedis.Jedis;

import java.io.ByteArrayOutputStream;
import java.io.ObjectOutputStream;
import java.util.UUID;

/**
 * @ProjectName: cutter-point
 * @Package: queue.redisQueue
 * @ClassName: RedisQueueProducter2
 * @Author: xiaof
 * @Description: ${description}
 * @Date: 2019/6/12 16:29
 * @Version: 1.0
 */
public class RedisQueueProducter2 implements Runnable {

    private Jedis jedis;
    private String queueKey;

    public RedisQueueProducter2(Jedis jedis, String queueKey) {
        this.jedis = jedis;
        this.queueKey = queueKey;
    }

    @Override
    public void run() {

        while(true) {

            try {
                Thread.sleep((long) (Math.random() * 1000));

                //不存在则创建,存在则直接插入
                //向redis队列中存放数据
                //生成数据
                TempVo tempVo = new TempVo();
                tempVo.setName(Thread.currentThread().getName() + ",time is:" + UUID.randomUUID());
                //序列化为字节
                ByteArrayOutputStream arrayOutputStream = new ByteArrayOutputStream();
                ObjectOutputStream objectOutputStream = new ObjectOutputStream(arrayOutputStream);
                objectOutputStream.writeObject(tempVo);
                arrayOutputStream.flush();

                try {
                    int i = 0;
                    while(i < 10) {
                        long num = jedis.lpush(queueKey.getBytes(), arrayOutputStream.toByteArray());
                        if(num > 0) {
                            System.out.println("成功!");
                            break;
                        }

                        ++i;
                    }
                } catch (Exception e) {
                    System.out.println("失败!");
//                    long num = jedis.lpush(queueKey.getBytes(), arrayOutputStream.toByteArray());
                }

            } catch (Exception e) {
                e.printStackTrace();
            }
        }

    }
}

消费者

package queue.redisQueue;

import queue.fqueue.vo.EventVo;
import redis.clients.jedis.Jedis;

import java.io.ByteArrayInputStream;
import java.io.IOException;
import java.io.ObjectInputStream;
import java.util.List;

/**
 * @ProjectName: cutter-point
 * @Package: queue.redisQueue
 * @ClassName: RedisQueueConsume2
 * @Author: xiaof
 * @Description: ${description}
 * @Date: 2019/6/12 16:40
 * @Version: 1.0
 */
public class RedisQueueConsume2 implements Runnable {

    private Jedis jedis;
    private String queueKey;

    public RedisQueueConsume2(Jedis jedis, String queueKey) {
        this.jedis = jedis;
        this.queueKey = queueKey;
    }

    @Override
    public void run() {

        while(true) {
            List<byte[]> bytesList = null;
            try{
                //这种就是阻塞队列模式
                bytesList = jedis.blpop(0, queueKey.getBytes());
            } catch (Exception e) {

            }

            //反序列化对象
            if(bytesList == null || bytesList.size() <= 0) {
                Thread.yield();
                continue;
            }

            //获取第二个对象,就是我们的字节数组
            System.out.println(new String(bytesList.get(0)));
            ByteArrayInputStream byteArrayInputStream = new ByteArrayInputStream(bytesList.get(1));
            try {
                ObjectInputStream objectInputStream = new ObjectInputStream(byteArrayInputStream);
                EventVo eventVo = (EventVo) objectInputStream.readObject();

                eventVo.doOperater();

            } catch (IOException e) {
                e.printStackTrace();
            } catch (ClassNotFoundException e) {
                e.printStackTrace();
            }
        }
    }
}

测试代码

消费队列

接下来我们把生产线程停掉

此时队列还有

我们把它消费完

当只剩最后一个的时候

可以进入下一步,好当队列为空的时候,我们再尝试去取数据的时候

队列会阻塞再这个地方,相当于是挂起线程

原文地址:https://www.cnblogs.com/cutter-point/p/11011084.html

时间: 2024-10-11 05:47:45

【并发】8、借助redis 实现多线程生产消费阻塞队列的相关文章

Java核心知识点学习----多线程中的阻塞队列,ArrayBlockingQueue介绍

1.什么是阻塞队列? 所谓队列,遵循的是先进先出原则(FIFO),阻塞队列,即是数据共享时,A在写数据时,B想读同一数据,那么就将发生阻塞了. 看一下线程的四种状态,首先是新创建一个线程,然后,通过start方法启动线程--->线程变为可运行可执行状态,然后通过数据产生共享,线程产生互斥---->线程状态变为阻塞状态---->阻塞状态想打开的话可以调用notify方法. 这里Java5中提供了封装好的类,可以直接调用然后构造阻塞状态,以保证数据的原子性. 2.如何实现? 主要是实现Blo

Java多线程-新特征-阻塞队列ArrayBlockingQueue

阻塞队列是Java5线程新特征中的内容,Java定义了阻塞队列的接口java.util.concurrent.BlockingQueue,阻塞队列的概念是,一个指定长度的队列,如果队列满了,添加新元素的操作会被阻塞等待,直到有空位为止.同样,当队列为空时候,请求队列元素的操作同样会阻塞等待,直到有可用元素为止. 有了这样的功能,就为多线程的排队等候的模型实现开辟了便捷通道,非常有用. java.util.concurrent.BlockingQueue继承了java.util.Queue接口,可

python多线程生产消费

#!/usr/bin/env python# -*- coding: utf-8 -*- from threading import Threadfrom Queue import Queueimport time class Producer(Thread): def __init__(self,name,queue): self.__name = name self.__queue = queue super(Producer,self).__init__() def run(self):

Java并发(10)- 简单聊聊JDK中的七大阻塞队列

引言 JDK中除了上文提到的各种并发容器,还提供了丰富的阻塞队列.阻塞队列统一实现了BlockingQueue接口,BlockingQueue接口在java.util包Queue接口的基础上提供了put(e)以及take()两个阻塞方法.他的主要使用场景就是多线程下的生产者消费者模式,生产者线程通过put(e)方法将生产元素,消费者线程通过take()消费元素.除了阻塞功能,BlockingQueue接口还定义了定时的offer以及poll,以及一次性移除方法drainTo. //插入元素,队列

redis和memcached有什么区别?redis的线程模型是什么?为什么单线程的redis比多线程的memcached效率要高得多(为什么redis是单线程的但是还可以支撑高并发)?

1.redis和memcached有什么区别? 这个事儿吧,你可以比较出N多个区别来,但是我还是采取redis作者给出的几个比较吧 1)Redis支持服务器端的数据操作:Redis相比Memcached来说,拥有更多的数据结构和并支持更丰富的数据操作,通常在Memcached里,你需要将数据拿到客户端来进行类似的修改再set回去.这大大增加了网络IO的次数和数据体积.在Redis中,这些复杂的操作通常和一般的GET/SET一样高效.所以,如果需要缓存能够支持更复杂的结构和操作,那么Redis会是

多线程编程:阻塞、并发队列的使用总结

最近,一直在跟设计的任务调度模块周旋,目前终于完成了第一阶段的调试.今天,我想借助博客园平台把最近在设计过程中,使用队列和集合的一些基础知识给大家总结一下,方便大家以后直接copy.本文都是一些没有技术含量的东西,只是做个总结,牛哥还请绕路. 老习惯,还是先跟各位纸上谈会儿兵,首先说说队列,他主要分为并发队列和阻塞队列,在多线程业务场景中使用最为普遍,我就主要结合我所做过的业务谈谈我对它们的看法,关于它们的API和官方解释就不提了. 并发队列 并发队列:最常见的业务场景就是多个线程共享同一个队列

python-学习-python并发编程之多进程与多线程

一 multiprocessing模块介绍 python中的多线程无法利用多核优势,如果想要充分地使用多核CPU的资源(os.cpu_count()查看),在python中大部分情况需要使用多进程.Python提供了multiprocessing.    multiprocessing模块用来开启子进程,并在子进程中执行我们定制的任务(比如函数),该模块与多线程模块threading的编程接口类似.  multiprocessing模块的功能众多:支持子进程.通信和共享数据.执行不同形式的同步,

【JAVA】wait和notify用法,附生产/消费模型

关于wait和notify的用法,网上已经有很多详细解释了,我只是简单的总结下. wait用于释放锁A,并让wait所在的线程阻塞.除非被持有锁A的其它线程执行notify来唤醒,它才能重新"活"过来. notify用于唤醒因为等待锁A而阻塞的线程,让它们做好竞争锁A的准备.如果有多个线程因等待锁A而被阻塞,notify只唤醒一个,唤醒所有用notifyAll. 参考下面的线程状态图,对理解wait和notify有很大的帮助. 总结: wait和notify通常和synchronize

java多线程 生产消费者模型

[seriesposts sid=500] 下面的代码讲述了一个故事 一个面包生产铺里目前有30个面包,有三个人来买面包,第一个人要买50个,第二个要买20个,第三个要买30个. 第一个人不够,所以等着,让第二个买了.面包铺继续生产面包.有7个人在生产. package com.javaer.thread; public class CPMode { public static void main(String[] args) { Godown godown = new Godown(30);