一种消息和任务队列——beanstalkd

beanstalkd 是一个轻量级消息中间件,其主要特性:

  • 基于管道  (tube) 和任务 (job) 的工作队列 (work-queue):d

      管道(tube),tube类似于消息主题(topic),在一个beanstalkd中可以支持多个tube,每个tube都有自己的producer和consumer;

    任务(job),beanstalkd用job代替了message的概念,与消息不同,job有一系列状态: 

  • 内部实现采用了 libevent, 服务器-客户端之间用类似 memcached 的轻量级通讯协议,具有有很高的性能。
  • 尽管是内存队列,beanstalkd 提供了 binlog 机制,当重启 beanstalkd 时,当前任务状态能够从纪录的本地 binlog 中恢复。
  • 优先级(priority):job可以有0~2^32个优先级,0代表最高优先级,beanstalkd使用最大最小堆处理job的优先级排序,因此reserve命令的时间复杂度是O(logN);
  • 延时(delay),有两种方式可以执行延时任务:producer发布任务时指定延时;或者当任务处理完毕后, consumer再次将任务放入队列延时执行 (RELEASE with <delay>);
  • 超时重发(time-to-run),Beanstalkd 把job返回给consumer以后:consumer必须在预设的 TTR (time-to-run) 时间内发送 delete / release/ bury 改变任务状态;否则 Beanstalkd 会认为消息处理失败,然后把job交给另外的消费者节点执行。如果consumer预计在 TTR (time-to-run) 时间内无法完成任务, 也可以发送 touch 命令, 它的作用是让 Beanstalkd 从系统时间重新计算 TTR ;
  • 任务预留(buried),如果job因为某些原因无法执行, consumer可以把任务置为 buried 状态让 Beanstalkd 保留这些任务。管理员可以通过 peek buried 命令查询被保留的任务,并且进行人工干预。简单的, kick <n> 能够一次性把 n 条被保留的任务踢回队列。

job的状态

  • READY,需要立即处理的任务,当延时 (DELAYED) 任务到期后会自动成为当前任务;
  • DELAYED,延迟执行的任务, 当消费者处理任务后,可以用将消息再次放回 DELAYED 队列延迟执行;
  • RESERVED,已经被消费者获取, 正在执行的任务,Beanstalkd 负责检查任务是否在 TTR(time-to-run) 内完成;
  • BURIED,保留的任务: 任务不会被执行,也不会消失,除非有人把它 "踢" 回队列;
  • DELETED,消息被彻底删除。Beanstalkd 不再维持这些消息。

如下,是一个典型任务的生命周期:

producer执行put命令将job放入队列,consumer执行reserve命令从队列取出job,执行完毕后发送delete命令告诉beanstalkd删除该job。

如果没有执行delete命令,beanstalkd将在一个TTR周期(默认120s)后重新将该job加入队列;

 put            reserve               delete
  -----> [READY] ---------> [RESERVED] --------> *poof*

下面是一个使用beanstalkc(python客户端)操作beanstalkd的例子:

#!/usr/bin/env python
import beanstalkc

beanstalk=beanstalkc.Connection(host="127.0.0.1",port=11300)

# pruducer
beanstalk.put(‘hello‘)
beanstalk.put(‘world‘)

# consumer
job1=beanstalk.reserve()
print "job1: " + job1.body
job1.delete()

job2=beanstalk.reserve(timeout=1)
print "job2: " + job2.body
job2.delete()

job3=beanstalk.reserve(timeout=1)       # Error,队列中已经没有job了
print "job3: " + job3.body
job3.delete()

tube的管理

beanstalkd通过tube维护多个队列,每个tube都是一个独立的queue,可以使用use命令切换tube,如果切换的tube不存在,会自动创建一个:

print beanstalk.tubes()     # default
print beanstalk.using()     # default 

beanstalk.use(‘queue1‘)
print beanstalk.using()     # queue1

beanstalk.use(‘queue2‘)     # queue2
print beanstalk.using()

print beanstalk.tubes()     # default, queue2

如上面的例子,在最后的tubes()命令打印所有tube的时候,并没有看到queue1,这是因为没有任何client 在using或者watching的tube会自动消失。

可以使用watch命令让client同时处理多个tube,而不用担心tube会被销毁:

print beanstalk.tubes()     # default
print beanstalk.using()     # default

beanstalk.use(‘queue1‘)
beanstalk.watch(‘queue1‘)
print beanstalk.using()     # queue1

beanstalk.use(‘queue2‘)     # queue2
print beanstalk.using()

print beanstalk.tubes()     # default, queue1, queue2
print beanstalk.watching()  # default, queue1

watch的tube如果不存在,会被自动创建,可以用ignore命令取消关注tube:

beanstalk.watch(‘queue3‘)
print beanstalk.watching()  # default, queue3
beanstalk.ignore(‘queue3‘)
print beanstalk.watching()  # default

注意,watch和use是两个独立的动作,use一个tube不代表watching它了,反之watch一个tube也不代表using它;

beanstalkc命令

如下,是一个job更完整的状态变迁和生命周期:

put with delay               release with delay
  ----------------> [DELAYED] <------------.
                        |                   |
                        | (time passes)     |
                        |                   |
   put                  v     reserve       |       delete
  -----------------> [READY] ---------> [RESERVED] --------> *poof*
                       ^  ^                |  |
                       |   \  release      |  |
                       |    `-------------‘   |
                       |                      |
                       | kick                 |
                       |                      |
                       |       bury           |
                    [BURIED] <---------------‘
                       |
                       |  delete
                        `--------> *poof*

一些例子:

put的时候加上delay参数,可以延迟发布job:

# pruducer
beanstalk.put(‘hello‘, delay=10)
beanstalk.put(‘world‘)

# consumer
job1=beanstalk.reserve()
print "job1: " + job1.body  # World
job1.delete()

job2=beanstalk.reserve(timeout=0)
print "job2: " + job2.body  # Error,

put命令也支持优先级参数:

# pruducer
beanstalk.put(‘hello‘, priority=10)
beanstalk.put(‘world‘, priority=9)

# consumer
job1=beanstalk.reserve()
print "job1: " + job1.body  # world
job1.delete()

job2=beanstalk.reserve(timeout=0)
print "job2: " + job2.body  # hello
job2.delete()

release命令可以释放job回队列:

# pruducer
beanstalk.put(‘hello‘)
beanstalk.put(‘world‘)

# consumer
job1=beanstalk.reserve()
print "job1: " + job1.body  # hello
job1.release()

job2=beanstalk.reserve(timeout=0)
print "job2: " + job2.body  # hello

bury命令将job放到一个特殊的FIFO队列中,之后不能被reserve命令获取,但可以用kick命令扔回工作队列中,之后就能被消费了:

# pruducer
beanstalk.put(‘hello‘)
beanstalk.put(‘world‘)

# consumer
job1=beanstalk.reserve(timeout=0)
job1.bury()
print job1.stats()[‘state‘] # buried

job2=beanstalk.reserve(timeout=0)
print "job2: " + job2.body  # world
job2.delete()

beanstalk.kick()

job3=beanstalk.reserve(timeout=0)
print "job3: " + job3.body  # hello
job3.delete()

peek命令允许查看一个job,但不会reserve它;

# pruducer
beanstalk.put(‘hello‘)
beanstalk.put(‘world‘)

#print(beanstalk.stats())  

# consumer
job1=beanstalk.reserve(timeout=0)
job1_id=job1.stats()[‘id‘]
print job1_id
job1_r=beanstalk.peek(job1_id)
print "job1 " + job1_r.body  # hello

job2=beanstalk.reserve(timeout=0)
print "job2: " + job2.body  # hello
job2.delete()

job3=beanstalk.reserve(timeout=0)
print "job3: " + job3.body  # world
job3.delete()
时间: 2024-10-08 21:48:36

一种消息和任务队列——beanstalkd的相关文章

rabbitmq五种消息模型整理

目录 0. 配置项目 1. 基本消息模型 1.1 生产者发送消息 1.2 消费者获取消息(自动ACK) 1.3 消息确认机制(ACK) 1.4 消费者获取消息(手动ACK) 1.5 自动ACK存在的问题 1.6 演示手动ACK 2. work消息模型 2.1 生产者 2.2 消费者1 2.3 消费者2 2.4 能者多劳 3. 订阅模型分类 4. 订阅模型-Fanout 4.1 生产者 4.2 消费者1 4.3 消费者2 4.4 测试 5. 订阅模型-Direct 5.1 生产者 5.2 消费者1

MFC 三种消息

在MFC应用程序中传输的消息有三种类型:窗口消息.命令消息和控件通知. (1)窗口消息:WM_XXX 窗口消息(Window Message)一般与窗口的内部运作有关,如:创建窗口.绘制窗口和销毁窗口等.通常,消息是从系统发送到窗口,或从窗口发送到窗口. (2)命令消息:WM_COMMAND 命令消息一般与处理用户请求相关,当用户单击一个菜单项或工具栏时,命令消息产生,并被发送到能处理该请求的类对象(如:装载文件.编辑文本和保存选项等). (3)控件通知:有多种格式       通常,控件通知在

Openfire:XMPP的几种消息类型

XMPP 有以下几种消息类型: l   Normal l   Chat l   Groupchat l   Headline l   Error 根据官方文档(http://www.igniterealtime.org/builds/openfire/docs/3.3.2/documentation/javadoc/org/xmpp/packet/Message.Type.html)的解释,它们的用途分别如下: l   Normal – 用于类mail的接口 l   Chat – 用于典型的行到

JMS两种消息模型

前段时间学习EJB,接触到了JMS(Java消息服务),JMS支持两种消息模型:Point-to-Point(P2P)和Publish/Subscribe(Pub/Sub),即点对点和发布订阅模型. 个人觉得这两个模型挺容易理解的,因为生活中的例子还挺多的. 1,  P2P模型 有以下概念:消息队列(Queue).发送者(Sender).接收者(Receiver).每个消息都被发送到一个特定的队列,接收者从队列获取消息.队列保留着消息,直到它们被消费或超时. (1) 每个消息只有一个消费者(Co

Python实现RabbitMQ中6种消息模型

RabbitMQ与Redis对比 ? RabbitMQ是一种比较流行的消息中间件,之前我一直使用redis作为消息中间件,但是生产环境比较推荐RabbitMQ来替代Redis,所以我去查询了一些RabbitMQ的资料.相比于Redis,RabbitMQ优点很多,比如: 具有消息消费确认机制 队列,消息,都可以选择是否持久化,粒度更小.更灵活. 可以实现负载均衡 RabbitMQ应用场景 异步处理:比如用户注册时的确认邮件.短信等交由rabbitMQ进行异步处理 应用解耦:比如收发消息双方可以使用

Spring整合JMS(二)——三种消息监听器

一.消息监听器MessageListener 在Spring整合JMS的应用中我们在定义消息监听器的时候一共可以定义三种类型的消息监听器,分别是MessageListener.SessionAwareMessageListener和MessageListenerAdapter.下面就分别来介绍一下这几种类型的区别. 1).MessageListener MessageListener是最原始的消息监听器,它是JMS规范中定义的一个接口.其中定义了一个用于处理接收到的消息的onMessage方法,

关于消息总线使用哪种消息队列中间件的调查

几种MQ产品说明: ZeroMQ :  扩展性好,开发比较灵活,采用C语言实现,实际上他只是一个socket库的重新封装,如果我们做为消息队列使用,需要开发大量的代码 RabbitMQ :结合erlang语言本身的并发优势,性能较好,但是不利于做二次开发和维护 ActiveMQ: 历史悠久的开源项目,已经在很多产品中得到应用,实现了JMS1.1规范,可以和spring-jms轻松融合,实现了多种协议,不够轻巧(源代码比RocketMQ多).,支持持久化到数据库,对队列数较多的情况支持不好,不过我

消息队列_Beanstalkd-0001.Beanstalkd之轻量级分布式内存队列部署?

简单介绍: 说明: Beantalkd是一个高性能,轻量级的分布式消息队列,最初设计目的是想通过后台异步执行耗时任务降低WEB应用页面访问延迟,支持过1000万用户的应用,被豆瓣内部广泛使用. 几大特性: 1. 支持持久化,默认使用内存,但可启动时-b指定持久化目录,将任务写入Binlog,以相同参数启动会自动恢复Binlog中内容 2. 支持优先级0~2^32,任务优先级越小表示优先级越高,默认优先级为1024 3. 支持超时重发,预设过期时间或TTR时间内如果没有发送delete/relea

Android 几种消息推送方案总结

首先看一张国内Top500 Android应用中它们用到的第三方推送以及所占数量: 现在总结下Android平台下几种推送方案的基本情况以及优缺点: 一.使用GCM(Google Cloude Messaging) Android自带的推送GCM可以帮助开发人员给他们的Android应用程序发送数据.它是一个轻量级的消息,告诉Android应用程序有新的数据要从服务器获取,或者它可能是一个消息,其中包含了4KB的payload data(像即时通讯这类应用程序可以直接使用该payload消息).