参考资料:
1. 深入NSQ之旅:http://www.oschina.net/translate/day-22-a-journey-into-nsq
2. nsq源码:https://github.com/bitly/nsq/
3. nsq源码解读:http://www.baiyuxiong.com/?p=873
当看完以上几篇文章之后,对nsq有一定的了解了,很想自己动手实现一个消息队列,本想参考一下其他人基于nsq所实现的消息队列,但是一番搜索之后,竟没找到有直接通过代码来说话的,都是一些理论性的文章转来转去。于是静下心来阅读nsq资料和源码。
基于该网站(http://www.baiyuxiong.com/?p=873)进行安装和简单演示nsq的用法,但是如何用golang语言如何模拟生产者大量产生消息,产生消息队列,让消费者从消息队列里取出消息进行消费呢?
生产者的代码:
1 package main 2 3 import ( 4 "flag" 5 "fmt" 6 "github.com/bitly/go-nsq" 7 "log" 8 ) 9 10 func main() { 11 var nsqAddr = flag.String("nsqd", "127.0.0.1:4150", "nsqd tcp address") 12 flag.Parse() 13 config := nsq.NewConfig() 14 15 producer, err := nsq.NewProducer(*nsqAddr, config) 16 if err != nil { 17 log.Fatal(err) 18 return 19 } 20 21 // 生产者生产100万条消息 22 for i := 0; i < 1000000; i++ { 23 // 最终需要确认消费者读取出来的汉字是否为乱码 24 body := fmt.Sprintf(`{"name":%s, "tid":%d}`, "贺-tom", i) 25 // 发布消息,topic=tom;body={"name":贺-tom, "tid":1~1000000} 26 producer.Publish("tom", []byte(body)) 27 } 28 }
运行生产者producer.go,通过nsqadmin看看生成后的消息队列效果:
消费者代码如下:
1 package main 2 3 import ( 4 "fmt" 5 "github.com/bitly/go-nsq" 6 "log" 7 ) 8 9 func main() { 10 consumer, err := nsq.NewConsumer("tom", "consumer", nsq.NewConfig()) 11 if err != nil { 12 log.Fatal(err) 13 return 14 } 15 16 consumer.AddHandler(nsq.HandlerFunc(func(msg *nsq.Message) error { 17 // 从消息队列里获取消息. 18 fmt.Println("Consumer: ", string(msg.Body)) 19 return nil 20 })) 21 err = consumer.ConnectToNSQD("127.0.0.1:4150") 22 if err != nil { 23 log.Fatal(err) 24 return 25 } 26 <-make(chan bool) 27 }
运行消费者之后:
试想,如果把100万个消息写入数据库,这会是多么糟糕的事情,要消耗多少资源,运行速度会有多慢。看来消息队列是处理大量数据的不错选择。
时间: 2024-10-10 00:02:29