Beanstalkd的使用(Golang)

最近需要引入一种新的消息队列,这个队列最好有专业、简单、消息不丢失等特性,但又不会引入过多的复杂性,

特别是在目前单枪匹马的情况下。然后发现Beanstalkd看起来是我所需要的.

Beanstalkd 支持任务优先级 (priority), 延时 (delay), 超时重发 (time-to-run) 和预留 (buried),

同时支持binlog保证消息在挂掉后不会丢.最后速度还可以。

看了下源码,c语言代码量小而清晰.作者从07年维护到14年,Star也很高,质量应当是有保障的。

队列作者提供了Go客户端,从作者项目列表中可以看到已经写了不少Go相关的东西,看来Go很受后台开发的欢迎。

Beanstalkd 主页在这:  http://kr.github.io/beanstalkd

写了个调用例子如下.

/*
  xcl (2015-8-15)
  多TubeName 多消费者
*/

package main

import (
	"fmt"
	"github.com/kr/beanstalk"
	"runtime"
	"strings"
	"time"
)

var (
	TubeName1 string = "channel1"
	TubeName2 string = "channel2"
)

func Producer(fname, tubeName string) {
	if fname == "" || tubeName == "" {
		return
	}

	c, err := beanstalk.Dial("tcp", "127.0.0.1:11300")
	if err != nil {
		panic(err)
	}
	defer c.Close()

	c.Tube.Name = tubeName
	c.TubeSet.Name[tubeName] = true
	fmt.Println(fname, " [Producer] tubeName:", tubeName, " c.Tube.Name:", c.Tube.Name)

	for i := 0; i < 5; i++ {
		msg := fmt.Sprintf("for %s %d", tubeName, i)
		c.Put([]byte(msg), 30, 0, 120*time.Second)
		fmt.Println(fname, " [Producer] beanstalk put body:", msg)
		//time.Sleep(1 * time.Second)
	}

	c.Close()
	fmt.Println("Producer() end.")
}

func Consumer(fname, tubeName string) {
	if fname == "" || tubeName == "" {
		return
	}

	c, err := beanstalk.Dial("tcp", "127.0.0.1:11300")
	if err != nil {
		panic(err)
	}
	defer c.Close()

	c.Tube.Name = tubeName
	c.TubeSet.Name[tubeName] = true

	fmt.Println(fname, " [Consumer] tubeName:", tubeName, " c.Tube.Name:", c.Tube.Name)

	substr := "timeout"
	for {
		fmt.Println(fname, " [Consumer]///////////////////////// ")
		//从队列中取出
		id, body, err := c.Reserve(1 * time.Second)
		if err != nil {
			if !strings.Contains(err.Error(), substr) {
				fmt.Println(fname, " [Consumer] [", c.Tube.Name, "] err:", err, " id:", id)
			}
			continue
		}
		fmt.Println(fname, " [Consumer] [", c.Tube.Name, "] job:", id, " body:", string(body))

		//从队列中清掉
		err = c.Delete(id)
		if err != nil {
			fmt.Println(fname, " [Consumer] [", c.Tube.Name, "] Delete err:", err, " id:", id)
		} else {
			fmt.Println(fname, " [Consumer] [", c.Tube.Name, "] Successfully deleted. id:", id)
		}
		fmt.Println(fname, " [Consumer]/////////////////////////")
		//time.Sleep(1 * time.Second)
	}
	fmt.Println("Consumer() end. ")
}

func main() {
	runtime.GOMAXPROCS(runtime.NumCPU())

	go Producer("PA", TubeName1)
	go Producer("PB", TubeName2)

	go Consumer("CA", TubeName1)
	go Consumer("CB", TubeName2)

	time.Sleep(10 * time.Second)
}

/*
运行结果:

XCLdeiMac:src xcl$ clear
XCLdeiMac:src xcl$ go run testmq.go
CB  [Consumer] tubeName: channel2  c.Tube.Name: channel2
CA  [Consumer] tubeName: channel1  c.Tube.Name: channel1
CB  [Consumer]/////////////////////////
CA  [Consumer]/////////////////////////
PB  [Producer] tubeName: channel2  c.Tube.Name: channel2
PA  [Producer] tubeName: channel1  c.Tube.Name: channel1
PB  [Producer] beanstalk put body: for channel2 0
PA  [Producer] beanstalk put body: for channel1 0
CA  [Consumer] [ channel1 ] job: 47027  body: for channel1 0
CB  [Consumer] [ channel2 ] job: 47026  body: for channel2 0
PB  [Producer] beanstalk put body: for channel2 1
PA  [Producer] beanstalk put body: for channel1 1
CB  [Consumer] [ channel2 ] Successfully deleted. id: 47026
CB  [Consumer]/////////////////////////
CB  [Consumer]/////////////////////////
CA  [Consumer] [ channel1 ] Successfully deleted. id: 47027
CA  [Consumer]/////////////////////////
CA  [Consumer]/////////////////////////
CA  [Consumer] [ channel1 ] job: 47028  body: for channel1 1
PA  [Producer] beanstalk put body: for channel1 2
CB  [Consumer] [ channel2 ] job: 47029  body: for channel2 1
PB  [Producer] beanstalk put body: for channel2 2
PA  [Producer] beanstalk put body: for channel1 3
CA  [Consumer] [ channel1 ] Successfully deleted. id: 47028
CA  [Consumer]/////////////////////////
CA  [Consumer]/////////////////////////
CB  [Consumer] [ channel2 ] Successfully deleted. id: 47029
PB  [Producer] beanstalk put body: for channel2 3
CB  [Consumer]/////////////////////////
CB  [Consumer]/////////////////////////
PB  [Producer] beanstalk put body: for channel2 4
CB  [Consumer] [ channel2 ] job: 47030  body: for channel2 2
CA  [Consumer] [ channel1 ] job: 47031  body: for channel1 2
PA  [Producer] beanstalk put body: for channel1 4
Producer() end.
Producer() end.
CA  [Consumer] [ channel1 ] Successfully deleted. id: 47031
CA  [Consumer]/////////////////////////
CA  [Consumer]/////////////////////////
CB  [Consumer] [ channel2 ] Successfully deleted. id: 47030
CB  [Consumer]/////////////////////////
CB  [Consumer]/////////////////////////
CB  [Consumer] [ channel2 ] job: 47033  body: for channel2 3
CA  [Consumer] [ channel1 ] job: 47032  body: for channel1 3
CB  [Consumer] [ channel2 ] Successfully deleted. id: 47033
CA  [Consumer] [ channel1 ] Successfully deleted. id: 47032
CB  [Consumer]/////////////////////////
CB  [Consumer]/////////////////////////
CA  [Consumer]/////////////////////////
CA  [Consumer]/////////////////////////
CA  [Consumer] [ channel1 ] job: 47034  body: for channel1 4
CB  [Consumer] [ channel2 ] job: 47035  body: for channel2 4
CB  [Consumer] [ channel2 ] Successfully deleted. id: 47035
CB  [Consumer]/////////////////////////
CA  [Consumer] [ channel1 ] Successfully deleted. id: 47034
CB  [Consumer]/////////////////////////
XCLdeiMac:src xcl$
*/

可用beanstool来查看队列状态

也可以参考我写下面两段,来查。

ar, er := c.ListTubes()
if er != nil {
	fmt.Println("[Example]  er:", er)
} else {
	for i, v := range ar {
		fmt.Println("[Example] ListTubes  i:", i, " v:", v)
		c.Tube.Name = v
		id, body, err := c.Reserve(5 * time.Second)
		if err != nil {
			fmt.Println("[Example] err:", err, " name:", c.Tube.Name)
			continue
		} else {
			fmt.Println("[Example] job:", id)
			fmt.Println("[Example] body:", string(body))
		}

	}
}

func tubeStatus(c *beanstalk.Conn) {
	fmt.Println("[tubeStatus]/////////////////////////")
	fmt.Println("Tube(", c.Tube.Name, ") Stats:")
	m, er := c.Tube.Stats()
	if er != nil {
		fmt.Println("[tubeStatus] err:", er)
	} else {
		for k, v := range m {
			fmt.Println(k, " : ", v)
		}
	}
	fmt.Println("[tubeStatus]/////////////////////////")
}

从测试看,Beanstalkd 足以满足我现在的需求了.

BLOG: http://blog.csdn.net/xcl168

版权声明:本文为博主原创文章,未经博主允许不得转载。

时间: 2024-10-25 23:13:25

Beanstalkd的使用(Golang)的相关文章

Go语言(golang)开源项目大全

转http://www.open-open.com/lib/view/open1396063913278.html内容目录Astronomy构建工具缓存云计算命令行选项解析器命令行工具压缩配置文件解析器控制台用户界面加密数据处理数据结构数据库和存储开发工具分布式/网格计算文档编辑器Encodings and Character SetsGamesGISGo ImplementationsGraphics and AudioGUIs and Widget ToolkitsHardwareLangu

[转]Go语言(golang)开源项目大全

内容目录 Astronomy 构建工具 缓存 云计算 命令行选项解析器 命令行工具 压缩 配置文件解析器 控制台用户界面 加密 数据处理 数据结构 数据库和存储 开发工具 分布式/网格计算 文档 编辑器 Encodings and Character Sets Games GIS Go Implementations Graphics and Audio GUIs and Widget Toolkits Hardware Language and Linguistics 日志 机器学习 Math

golang []byte转string

golang中,字符切片[]byte转换成string最简单的方式是 package main import ( "fmt" _ "unsafe" ) func main() { bytes := []byte("I am byte array !") str := string(bytes) bytes[0] = 'i'//注意这一行,bytes在这里修改了数据,但是str打印出来的依然没变化, fmt.Println(str) } 打印信息:

golang实现Ringbuf

Ring buffer算法优点:高内存使用率,在缓冲buffer内存模型中,不太容易发生内存越界.悬空指针等 bug ,出了问题也容易在内存级别分析调试.做出来的系统容易保持健壮. package main import ( "bytes" "fmt" ) type Ringbuf struct { buf         []byte start, size int } func New(size int) *Ringbuf { return &Ringb

Golang Hash MD4

//Go标准包中只有MD5的实现 //还好,github上有MD4实现. package main import (     "golang.org/x/crypto/md4"     "encoding/hex"     "fmt" ) func get_md4(buf []byte) ([] byte) { ctx := md4.New() ctx.Write(buf) return ctx.Sum(nil) } func main() {

Java程序员的Golang入门指南(上)

Java程序员的Golang入门指南 1.序言 Golang作为一门出身名门望族的编程语言新星,像豆瓣的Redis平台Codis.类Evernote的云笔记leanote等. 1.1 为什么要学习 如果有人说X语言比Y语言好,两方的支持者经常会激烈地争吵.如果你是某种语言老手,你就是那门语言的"传道者",下意识地会保护它.无论承认与否,你都已被困在一个隧道里,你看到的完全是局限的.<肖申克的救赎>对此有很好的注脚: [Red] These walls are funny.

golang学习笔记:golang 语法篇(二)

在语法篇(一)中学习了go中基本的数据类型.变量.常量等组成语言的基本要素,在这一节中将会学习如何将这些元素组织起来,最终写成可以执行的代码. 在这一部分包括: go中的流程控制语句: go中函数的用法: go特殊的错误处理方式: Golang中的流程控制语句 在具体编程的时候免不了需要使用一些特殊的语句实现某些功能,比如使用循环语句来进行迭代,使用选择语句控制程序的执行方式等.这些语句在任何一门程序设计语言 中都会有支持,golang中除了支持常用的循环,条件选择语句以外,还支持跳转语句,下面

Golang关键字—— if/else

Golang中,if/else 关键字用于条件判断,如果满足条件就做某事,否则做另一件事: if age >= 18 { fmt.Println("成年人") } else { fmt.Println("未成年") } 多重判断: if score >= 90 { fmt.Println("优秀") } else if score >= 70 { fmt.Println("良好") } else if sco

golang控制channel的出入口

golang控制channel的出入口 我们常常使用channel来在多个goroutine之间做数据通讯,但是chan作为函数的入参我们应该怎么写呢?也许有人觉得这个问题比较傻,不过这个还真的是我今天才知道的. 首先我们看看下面的代码: func main() { c := make(chan int) go in(c) go out(c) time.Sleep(time.Second) } func in(c chan int) { for i := 0; i < 10; i++ { c <