nsq之生产者和消费者

参考资料:

1. 深入NSQ之旅:http://www.oschina.net/translate/day-22-a-journey-into-nsq

2. nsq源码:https://github.com/bitly/nsq/

3. nsq源码解读:http://www.baiyuxiong.com/?p=873

  当看完以上几篇文章之后,对nsq有一定的了解了,很想自己动手实现一个消息队列,本想参考一下其他人基于nsq所实现的消息队列,但是一番搜索之后,竟没找到有直接通过代码来说话的,都是一些理论性的文章转来转去。于是静下心来阅读nsq资料和源码。

  基于该网站(http://www.baiyuxiong.com/?p=873)进行安装和简单演示nsq的用法,但是如何用golang语言如何模拟生产者大量产生消息,产生消息队列,让消费者从消息队列里取出消息进行消费呢?

  生产者的代码:

 1 package main
 2
 3 import (
 4     "flag"
 5     "fmt"
 6     "github.com/bitly/go-nsq"
 7     "log"
 8 )
 9
10 func main() {
11     var nsqAddr = flag.String("nsqd", "127.0.0.1:4150", "nsqd tcp address")
12     flag.Parse()
13     config := nsq.NewConfig()
14
15     producer, err := nsq.NewProducer(*nsqAddr, config)
16     if err != nil {
17         log.Fatal(err)
18         return
19     }
20
21     // 生产者生产100万条消息
22     for i := 0; i < 1000000; i++ {
23         // 最终需要确认消费者读取出来的汉字是否为乱码
24         body := fmt.Sprintf(`{"name":%s, "tid":%d}`, "贺-tom", i)
25         // 发布消息,topic=tom;body={"name":贺-tom, "tid":1~1000000}
26         producer.Publish("tom", []byte(body))
27     }
28 }

  运行生产者producer.go,通过nsqadmin看看生成后的消息队列效果:

  消费者代码如下:

 1 package main
 2
 3 import (
 4     "fmt"
 5     "github.com/bitly/go-nsq"
 6     "log"
 7 )
 8
 9 func main() {
10     consumer, err := nsq.NewConsumer("tom", "consumer", nsq.NewConfig())
11     if err != nil {
12         log.Fatal(err)
13         return
14     }
15
16     consumer.AddHandler(nsq.HandlerFunc(func(msg *nsq.Message) error {
17         // 从消息队列里获取消息.
18         fmt.Println("Consumer: ", string(msg.Body))
19         return nil
20     }))
21     err = consumer.ConnectToNSQD("127.0.0.1:4150")
22     if err != nil {
23         log.Fatal(err)
24         return
25     }
26     <-make(chan bool)
27 }

  运行消费者之后:

  试想,如果把100万个消息写入数据库,这会是多么糟糕的事情,要消耗多少资源,运行速度会有多慢。看来消息队列是处理大量数据的不错选择。

时间: 2024-10-10 00:02:29

nsq之生产者和消费者的相关文章

多线程操作实例——生产者与消费者

面对多线程学习生产者与消费者是最基本的实例 对于java后端开发的人员必须要掌握,还有考研考试计算机操作系统的同鞋. 下面是三个实例对于生产者与消费者的的例子,层层递进,逐步解决问题. 问题:生产者——设置信息名字name,和内容content 消费者——负责取出设置的信息. 一.基本实现 由于线程的不确定性可能出现以下问题: (1)消费者取出的信息不匹配,即不是由同一个生产者设置的信息 (2)生产者生产了多个信息,消费者才开始取出信息,或消费者取出的重复的信息. 上面的问题下面会逐一解决,下面

同步函数 生产者和消费者模式 加强版(多人生产和多人消费)

曾经搞了半天, 生产者和消费者模式  加强版(多人生产 多人消费 ).. 以前的代码格式就不再吐槽了(以后努力改进) //输出结果是一个无限循环 import java.util.concurrent.locks.Condition; import java.util.concurrent.locks.Lock; import java.util.concurrent.locks.ReentrantLock; /** * 多个生产者&多个消费者模式 * 多个生产者不断生产,多个消费者不停的消费

Parallel Programming-实现并行操作的流水线(生产者、消费者)

本文介绍如何使用C#实现并行执行的流水线(生产者消费者): 流水线示意图 实现并行流水线 一.流水线示意图 上图演示了流水线,action1接收input,然后产生结果保存在buffer1中,action2读取buffer1中由action1产生的数据,以此类推指导action4完成产生Output. 以上也是典型的生产者消费者模式. 上面的模式如果使用普通常规的串行执行是很简单的,按部就班按照流程图一步一步执行即可.如果为了提高效率,想使用并行执行,也就是说生产者和消费者同时并行执行,该怎么办

java多线程(同步与死锁问题,生产者与消费者问题)

首先我们来看同步与死锁问题: 所谓死锁,就是A拥有banana,B拥有apple. A对B说:你把apple给我,我就把banana给你. B对A说:你把banana给我,我就把apple给你. 但是A和B都在等待对方的答复,那么这样最终的结果就是A得不到apple,B也得不到banana.这种死循环就是死锁. 于是我们可以模拟上面的描述,写出以下代码: 类A代表A这个人, public class A { public void say(){ System.out.println("A sai

生产者与消费者问题

***********************************************声明****************************************************** 原创作品,出自 "晓风残月xj" 博客,欢迎转载,转载时请务必注明出处(http://blog.csdn.net/xiaofengcanyuexj). 由于各种原因,可能存在诸多不足,欢迎斧正! *******************************************

使用JUC并发工具包的Lock和Condition,实现生产者和消费者问题中的有界缓存

JDK5.0之前,用java实现生产者和消费者的唯一方式就是使用synchronized内置锁和wait/notify条件通知机制.JDK5.0之后提供了显示锁Lock和条件队列Condition,与内置锁和内置条件队列相对应,但是显示的锁和条件队列,功能更强大,更灵活.此外JDK5.0之后还提供了大量很有用的并发工具类,如BlockingQueue等,基于这些数据结构,能够方便.快速.高效的构建自己应用需要的效果.这里我们简单使用下显示锁和条件队列,来模拟有界缓存的实现,功能类似于JDK内置的

Java中的生产者、消费者问题

Java中的生产者.消费者问题描述: 生产者-消费者(producer-consumer)问题, 也称作有界缓冲区(bounded-buffer)问题, 两个进程共享一个公共的固定大小的缓冲区(仓库). 其中一个是生产者, 用于将产品放入仓库: 另外一个是消费者, 用于从仓库中取出产品消费. 问题出现在当仓库已经满了, 而此时生产者还想向其中放入一个新的产品的情形, 其解决方法是让生产者此时进行等待, 等待消费者从仓库中取走了一个或者多个产品后再去唤醒它. 同样地, 当仓库已经空了, 而消费者还

java 22 - 16 多线程之生产者和消费者的问题

生产者和消费者问题的描述图 通过上图,我们可以发现: 生产者和消费者使用的都是同一个资源(肉包子) 所以,当使用线程的时候,这两类的锁也是同一把锁(为了避免出现线程安全问题) 例子:学生信息的录入和获取 * 资源类:Student * 设置学生数据:SetThread(生产者) * 获取学生数据:GetThread(消费者) * 测试类:StudentDemo * 资源类:Student 1 public class Student { 2 3 String name; 4 int age; 5

实验四、生产者和消费者

一.    实验目的 1.掌握临界区的概念及临界区的设计原则: 2.掌握信号量的概念.PV操作的含义以及应用PV操作实现进程的同步与互斥: 3.分析进程争用资源的现象,学习解决进程互斥的方法. 二.    实验内容及要求 分析进程的同步与互斥现象,编程实现经典的进程同步问题——生产者消费者问题的模拟 生产者--消费者问题表述: 有一环形缓冲池,包含n个缓冲区(0~n-1). 有两类进程:一组生产者进程和一组消费者进程,生产者进程向空的缓冲区中放产品,消费者进程从满的缓冲区中取走产品,所有进程必须