Redis之上的分布式Java队列

通过优锐课的java架构学习分享中,讨论了让我们使用Redisson Java框架讨论六种不同类型的基于Redis的分布式队列。

在Redis中使用队列

Redis是一个功能强大的工具,支持从字符串和列表到映射和流的许多不同类型的数据结构。 开发人员将Redis用于多种目的,包括用于数据库,缓存和消息代理。

像任何消息代理一样,Redis需要以正确的顺序发送消息。 可以根据消息的年龄或某些其他预定义的优先级等级发送消息。

为了存储这些未决消息,Redis开发人员需要队列数据结构。 Redisson是使用Redis和Java进行分布式编程的框架,它提供了许多分布式数据结构(包括队列)的实现。

Redisson通过提供Java API使Redis开发更加容易。 Redisson不需要开发人员学习Redis命令,而是包括所有众所周知的Java接口,例如Queue和BlockingQueue。 Redisson还处理Redis中繁琐的幕后工作,例如连接管理,故障转移处理和数据序列化。

基于Redis的分布式Java队列

Redisson提供了Java中基本队列数据结构的多个基于Redis的实现,每种实现都有不同的功能。 这使你可以选择最适合你目的的队列类型。

下面,我们将使用Redisson Java框架讨论六种不同类型的基于Redis的分布式队列。

队列
Redisson中的RQueue对象实现了java.util.Queue接口。 队列用于需要从最早的最早的元素开始处理(也称为“先进先出”或FIFO)的情况。

与普通Java一样,可以使用peek()方法检查RQueue的第一个元素,或者使用poll()方法检查和删除RQueue的第一个元素:

RQueue<SomeObject> queue = redisson.getQueue("anyQueue");
queue.add(new SomeObject());
SomeObject obj = queue.peek();
SomeObject someObj = queue.poll();

阻塞队列

Redisson中的RBlockingQueue对象实现了java.util.BlockingQueue接口。

BlockingQueues是阻塞线程的队列,这些线程试图从空队列中进行轮询,或者试图在已满的队列中插入元素。 该线程将被阻塞,直到另一个线程将一个元素插入到空队列中,或从完整队列中轮询第一个元素为止。

下面的示例代码演示了RBlockingQueue的正确实例化和使用。 特别是,你可以使用参数指定对象将等待线程变得可用的时间来调用poll()方法:

RBlockingQueue<SomeObject> queue = redisson.getBlockingQueue("anyQueue");
queue.offer(new SomeObject());
SomeObject obj = queue.peek();
SomeObject someObj = queue.poll();
SomeObject ob = queue.poll(10, TimeUnit.MINUTES);

在故障转移或重新连接到Redis服务器的过程中,将自动重新预订poll(),pollFromAny(),pollLastAndOfferFirstTo()和take()Java方法。

BoundedBlockingQueue
Redisson中的RBoundedBlockingQueue

对象实现了有界的阻塞队列结构。 有界阻塞队列是容量已受限制(即有限)的阻塞队列。

以下代码演示了如何在Redisson中实例化和使用RBoundedBlockingQueue。 trySetCapacity()方法用于尝试设置阻塞队列的容量。 trySetCapacity()返回布尔值“ true”或“ false”,这取决于是否成功设置了容量或是否已经设置了容量:

RBoundedBlockingQueue<SomeObject> queue = redisson.getBoundedBlockingQueue("anyQueue");
queue.trySetCapacity(2);
queue.offer(new SomeObject(1));
queue.offer(new SomeObject(2));
// will be blocked until free space available in queue
queue.put(new SomeObject());
SomeObject obj = queue.peek();
SomeObject someObj = queue.poll();
SomeObject ob = queue.poll(10, TimeUnit.MINUTES);

延迟排队

Redisson中的RDelayedQueue对象允许你在Redis中实现延迟队列。 当使用诸如指数补偿的策略将消息传递给消费者时,这可能会很有用。 每次尝试发送邮件失败后,重试之间的时间将成倍增加。

在与元素一起指定的延迟之后,延迟队列中的每个元素将被转移到目标队列。 此目标队列可以是实现RQueue接口的任何队列,例如RBlockingQueue或RBoundedBlockingQueue。

RQueue<String> destinationQueue = redisson.getQueue("anyQueue");
RDelayedQueue<String> delayedQueue = getDelayedQueue(destinationQueue);
// move object to destinationQueue in 10 seconds
delayedQueue.offer("msg1", 10, TimeUnit.SECONDS);
// move object to destinationQueue in 1 minute
delayedQueue.offer("msg2", 1, TimeUnit.MINUTES);

在不再需要队列之后,通过使用destroy()方法销毁延迟的队列是一个好主意。 但是,如果要关闭Redisson,则没有必要。
PriorityQueue

Redisson中的RPriorityQueue对象实现了java.util.Queue接口。 优先级队列是不是按元素的使用期限而是按照与每个元素相关联的优先级排序的队列。
如下面的示例代码所示,RPriorityQueue使用比较器对队列中的元素进行排序:

RPriorityQueue<Integer> queue = redisson.getPriorityQueue("anyQueue");
queue.trySetComparator(new MyComparator()); // set object comparator
queue.add(3);
queue.add(1);
queue.add(2);
queue.removeAsync(0);
queue.addAsync(5);
queue.poll();

PriorityBlockingQueue
Redisson中的RPriorityBlockingQueue对象结合了RPriorityQueue和RBlockingQueue的功能。 与RPriorityQueue一样,RPriorityBlockingQueue也使用Comparator对队列中的元素进行排序。

RPriorityBlockingQueue<Integer> queue = redisson.getPriorityBlockingQueue("anyQueue");
queue.trySetComparator(new MyComparator()); // set object comparator
queue.add(3);
queue.add(1);
queue.add(2);
queue.removeAsync(0);
queue.addAsync(5);
queue.take();

在故障转移或重新连接到Redis服务器的过程中,将自动重新预订poll(),pollLastAndOfferFirstTo()和take()Java方法。

原文地址:https://blog.51cto.com/14634606/2462636

时间: 2024-11-08 23:07:27

Redis之上的分布式Java队列的相关文章

灵感来袭,基于Redis的分布式延迟队列

延迟队列 延迟队列,也就是一定时间之后将消息体放入队列,然后消费者才能正常消费.比如1分钟之后发送短信,发送邮件,检测数据状态等. Redisson Delayed Queue 如果你项目中使用了redisson,那么恭喜你,使用延迟队列将非常的简单. 基于Redis的Redisson分布式延迟队列(Delayed Queue)结构的RDelayedQueue Java对象在实现了RQueue接口的基础上提供了向队列按要求延迟添加项目的功能.该功能可以用来实现消息传送延迟按几何增长或几何衰减的发

使用Redis的分布式Java锁

通过优锐课的java学习分享中,了解有关分布式锁定以及如何在项目中实现它的更多信息! 什么是分布式锁定? 在多线程程序中,不同的线程可能需要访问相同的资源.但是,允许所有线程同时访问资源可能导致争用情况,错误和其他意外行为. 为了确保没有两个线程可以同时访问同一资源,并确保以可预测的顺序对资源进行操作,程序员使用一种称为锁的机制.每个线程首先获取锁,然后对资源进行操作,最后将锁释放给其他线程. 在Java中,由于多种原因,锁定对象通常比使用同步块更灵活.首先,Lock API可以以不同的方法运行

JavaWeb项目架构之Redis分布式日志队列-SpringBoot实例

架构.分布式.日志队列,标题自己都看着唬人,其实就是一个日志收集的功能,只不过中间加了一个Redis做消息队列罢了. ? 为什么需要消息队列? 当系统中出现“生产“和“消费“的速度或稳定性等因素不一致的时候,就需要消息队列,作为抽象层,弥合双方的差异.比如我们系统中常见的邮件.短信发送,把这些不需要及时响应的功能写入队列,异步处理请求,减少响应时间. 如何实现? 成熟的JMS消息队列中间件产品市面上有很多,但是基于目前项目的架构以及部署情况,我们采用Redis做消息队列. 为什么用Redis?

基于Redis实现分布式消息队列(1)

1.为什么需要消息队列? 当系统中出现"生产"和"消费"的速度或稳定性等因素不一致的时候,就需要消息队列,作为抽象层,弥合双方的差异. 举个例子:业务系统触发短信发送申请,但短信发送模块速度跟不上,需要将来不及处理的消息暂存一下,缓冲压力. 再举个例子:调远程系统下订单成本较高,且因为网络等因素,不稳定,攒一批一起发送. 再举个栗子,交互模块5:00到24:00和电商系统联通,和内部ERP断开.1:00到4:00和ERP联通,和电商系统断开. 再举个例子,服务员点菜

基于Redis实现分布式消息队列(汇总目录)

基于Redis实现分布式消息队列(1)– 缘起 http://blog.csdn.net/stationxp/article/details/45595733 基于Redis实现分布式消息队列(2)– 分布式消息队列功能设计 http://blog.csdn.net/stationxp/article/details/45596619 基于Redis实现分布式消息队列(3)– Redis功能分析 http://blog.csdn.net/stationxp/article/details/457

[转载] 基于Redis实现分布式消息队列

转载自http://www.linuxidc.com/Linux/2015-05/117661.htm 1.为什么需要消息队列?当系统中出现“生产“和“消费“的速度或稳定性等因素不一致的时候,就需要消息队列,作为抽象层,弥合双方的差异. 举个例子:业务系统触发短信发送申请,但短信发送模块速度跟不上,需要将来不及处理的消息暂存一下,缓冲压力. 再举个例子:调远程系统下订单成本较高,且因为网络等因素,不稳定,攒一批一起发送. 再举个栗子,交互模块5:00到24:00和电商系统联通,和内部ERP断开.

阿里JAVA面试题剖析:一般实现分布式锁都有哪些方式?使用 Redis 如何设计分布式锁?

面试原题 一般实现分布式锁都有哪些方式?使用 redis 如何设计分布式锁?使用 zk 来设计分布式锁可以吗?这两种分布式锁的实现方式哪种效率比较高? 面试官心理分析 其实一般问问题,都是这么问的,先问问你 zk,然后其实是要过度到 zk 关联的一些问题里去,比如分布式锁.因为在分布式系统开发中,分布式锁的使用场景还是很常见的. 面试题剖析 Redis 分布式锁 官方叫做 RedLock 算法,是 Redis 官方支持的分布式锁算法. 这个分布式锁有 3 个重要的考量点: 互斥(只能有一个客户端

RabbitMQ,Apache的ActiveMQ,阿里RocketMQ,Kafka,ZeroMQ,MetaMQ,Redis也可实现消息队列,RabbitMQ的应用场景以及基本原理介绍,RabbitMQ基础知识详解,RabbitMQ布曙

消息队列及常见消息队列介绍 2017-10-10 09:35操作系统/客户端/人脸识别 一.消息队列(MQ)概述 消息队列(Message Queue),是分布式系统中重要的组件,其通用的使用场景可以简单地描述为: 当不需要立即获得结果,但是并发量又需要进行控制的时候,差不多就是需要使用消息队列的时候. 消息队列主要解决了应用耦合.异步处理.流量削锋等问题. 当前使用较多的消息队列有RabbitMQ.RocketMQ.ActiveMQ.Kafka.ZeroMQ.MetaMq等,而部分数据库如Re

Spark生态系统解析及基于Redis的开源分布式服务Codis

摘要:在第九期"七牛开发者最佳实践日"上,陈超就Spark整个生态圈进行了讲解,而刘奇则分享豌豆荚在Redis上的摸索和实践. 1月24日,一场基于Spark和Redis组成的分布式系统实践分享由Spark资深布道者陈超和豌豆荚资深系统架构师刘奇联手打造. 陈超:Spark Ecosystem & Internals 陈超(@CrazyJvm),Spark布道者 在分享中,陈超首先简短的介绍了Spark社区在2014年的发展:目前Spark的发布版本是1.2,整个2014年Sp