1 producer — n consumers 模型 实现

  1 #include<stdio.h>
  2 #include<string.h>
  3 #include<pthread.h>
  4 #include<stdlib.h>
  5 #include<unistd.h>
  6 #include<queue>
  7 using namespace std;
  8 #define LEN 128
  9 typedef struct task_que
 10 {
 11     queue<char*> string_queue;
 12     int m_flag ;        //结束标志
 13     int m_capacity;
 14     pthread_mutex_t m_lock;
 15     pthread_cond_t m_pro , m_con;
 16 }QUE , *pQUE;
 17 void* thd_func(void* arg);
 18 void put(pQUE pq, char* src);
 19 void get(pQUE pq, char* dest);
 20 int main(int argc, char *argv[])
 21 {
 22     QUE aque;
 23     aque.m_flag = 0;
 24     int nthds = atoi( argv[1] );   //线程的个数
 25     aque.m_capacity = atoi( argv[2] );//字符串队列的大小
 26     pthread_mutex_init( &aque.m_lock , NULL ); //锁
 27     pthread_cond_init( &aque.m_pro ,NULL );   //生产者条件变量
 28     pthread_cond_init( &aque.m_con , NULL );  //消费者条件变量
 29     //开辟线程空间
 30     pthread_t *thd_arr = ( pthread_t*)calloc(nthds, sizeof(pthread_t));
 31     int* ntasks = (int*)calloc(nthds, sizeof(int));//用来记录 线程工作了几次
 32     //创建线程
 33     int index;
 34     for(index = 0; index < nthds; index ++)
 35     {
 36         pthread_create( thd_arr + index, NULL, thd_func,(void*)&aque );
 37     }
 38
 39
 40     //输入字符串到队列中
 41     char buf[LEN] ;
 42     while( memset(buf, 0, LEN), fgets(buf, LEN, stdin) != NULL)
 43     {
 44         put(&aque, buf);
 45     }
 46
 47
 48
 49     //发出结束字符串
 50     strcpy(buf, "over");
 51     put(&aque, buf);
 52
 53     for(index = 0; index < nthds; index ++)
 54     {
 55         pthread_join(thd_arr[index], (void**)(ntasks + index ));
 56     }
 57     for(index = 0; index < nthds; index ++)
 58     {
 59         printf("%d ", ntasks[index]);
 60     }
 61     printf("\n");
 62
 63     pthread_mutex_destroy(&aque.m_lock);
 64     pthread_cond_destroy(&aque.m_pro);
 65     pthread_cond_destroy(&aque.m_con);
 66     return 0 ;
 67 }
 68 void put(pQUE pq,  char* src)      //把字符串写到队列中
 69 {
 70     pthread_mutex_lock(&pq ->m_lock);   //加锁
 71     while(pq ->string_queue.size() == pq ->m_capacity)  //队列满则阻塞
 72     {
 73         pthread_cond_wait(&pq -> m_pro, &pq ->m_lock);
 74
 75     }
 76     //插入队列
 77     char* tem =  ( char*)calloc( LEN , sizeof( char ));
 78     strcpy(tem,src);
 79     pq->string_queue.push(tem);
 80     pthread_mutex_unlock(&pq -> m_lock);  //解锁
 81     pthread_cond_broadcast(&pq ->m_con);  //唤醒所有消费者线程
 82
 83
 84 }
 85
 86 void get(pQUE pq, char* dest)
 87 {
 88     pthread_mutex_lock(&pq ->m_lock); //加锁
 89     while(pq ->m_flag == 0 &&  pq ->string_queue.empty()  ) //队列空 并且未结束 则阻塞
 90     {
 91         pthread_cond_wait(&pq ->m_con, &pq ->m_lock);
 92     }
 93     if(pq ->m_flag == 1)                 //判断结束标志
 94     {
 95         pthread_mutex_unlock(&pq ->m_lock); //解锁
 96         return ;
 97     }
 98     //出队
 99     strcpy(dest, pq ->string_queue.front());
100     pq->string_queue.pop();
101
102     pthread_mutex_unlock(&pq ->m_lock);
103     pthread_cond_signal(&pq ->m_pro);
104
105 }
106
107 void* thd_func(void* arg)
108 {
109     pQUE pq = (pQUE)arg ;
110     char buf[LEN] ;
111     int ncnt = 0 ;
112     while(1)
113     {
114         memset(buf, 0, LEN) ;
115         get(pq, buf);
116         if(pq ->m_flag == 1)            //判断结束标志
117         {
118             printf("%u exit!\n", pthread_self());
119             pthread_exit((void*)ncnt);             //退出
120         }
121         ncnt ++ ;
122         printf("%u: %s\n", pthread_self(), buf);  //打印字符串
123         if(strcmp("over", buf) == 0)              //判断结束字符串
124         {
125             pq ->m_flag = 1 ;                    //把结束符号置为 1
126             pthread_cond_broadcast(&pq ->m_con); //唤醒所有线程
127             pthread_exit((void*)ncnt);           //退出
128         }
129         if(ncnt & 1 == 1) sleep(1);     //简单的负载平衡
130     }
131 }
时间: 2024-10-10 22:37:00

1 producer — n consumers 模型 实现的相关文章

golang for thread channel routine consumer and producer

package main import "time" func testThread(){ i:=3 go func(a int){ println(a) println("this is go func!") }(i) time.Sleep(1*time.Second) println("hello main thread!") } func testChannel(){ //read write channel ch:=make(chan i

Java多线程开发之~~~多条件Condition接口的使用

我们在多线程开发中,可能会出现这种情况.就是一个线程需要另外一个线程满足某某条件才能继续运行,或者需 要其他线程满足好几个条件才能运行,对于这样的多条件的多线程并发,我们如何控制好各个线程之间的关系,使他们 能很好的处理冲突不至于相互出现问题呢,下面我们来介绍一下Java提供的Condition这个接口,这个接口很好的实现了 这种需求. 对于这个问题最经典的例子就是生产者消费者模型,生产者当缓冲区满的时候不生产商品知道缓冲区有空余,消费 者当缓冲区为0 的时候不拿商品,直到生产者向缓冲区放入商品

二、Java之activeMQ的使用

本文基于上一篇:activeMQ服务安装,将展示如何使用activeMQ: 一.简介 上一篇我们了解了如何创建和启动一个activeMQ消息服务.那我们应当要如何去使用这个服务呢?其实也就是我们需要学习怎么去写客户端的代码. 1)activeMQ的客户端需要引入依赖的支持: <dependency> <groupId>org.apache.activemq</groupId> <artifactId>activemq-all</artifactId&g

【转】Python线程同步机制: Locks, RLocks, Semaphores, Conditions, Events和Queues

Python线程同步机制: Locks, RLocks, Semaphores, Conditions, Events和Queues | Comments 翻译自Laurent Luce的博客原文名称:Python threads synchronization: Locks, RLocks, Semaphores, Conditions, Events and Queues原文连接:http://www.laurentluce.com/posts/python-threads-synchron

kafka文档(转)

来自:http://blog.csdn.net/beitiandijun/article/details/40582541 来源:http://kafka.apache.org/documentation.html#configuration 3.     Configuration Kafka在配置文件中使用key-value方式进行属性配置.这些values可以通过文件或者编程方式提供. 3.1      Broker  Configs 基本配置如下: -broker.id -log.dir

Benchmarking Apache Kafka: 2 Million Writes Per Second (On Three Cheap Machines)

I wrote a blog post about how LinkedIn uses Apache Kafka as a central publish-subscribe log for integrating data between applications, stream processing, and Hadoop data ingestion. To actually make this work, though, this "universal log" has to

java多线程编码注意事项

Sole purpose of using concurrency is to produce scalable and faster program. But always remember, speed comes after correctness. Your Java program must follow its invariant in all conditions, which it would, if executed in sequential manner. If you a

软件设计杂谈

\ disclaimer: 本文所讲的设计,非UI/UE的设计,单单指软件代码/功能本身在技术上的设计.UI/UE的主题请出门右转找特赞(Tezign). 在如今这个Lean/Agile横扫一切的年代,设计似乎有了被边缘化的倾向,做事的周期如此之快,似乎已容不下人们更多的思考.MVP(Minimal Viable Produce)在很多团队里演化成一个形而上的图腾,于是工程师们找到了一个完美的借口:我先做个MVP,设计的事,以后再说. 如果纯属个人玩票,有个点子,hack out还说得过去:但要

python笔记11-多线程之Condition(条件变量)

前言 当小伙伴a在往火锅里面添加鱼丸,这个就是生产者行为:另外一个小伙伴b在吃掉鱼丸就是消费者行为.当火锅里面鱼丸达到一定数量加满后b才能吃,这就是一种条件判断了. 这就是本篇要讲的Condition(条件变量) Condition Condition(条件变量)通常与一个锁关联.需要在多个Contidion中共享一个锁时,可以传递一个Lock/RLock实例给构造方法,否则它将自己生成一个RLock实例. 可以认为,除了Lock带有的锁定池外,Condition还包含一个等待池,池中的线程处于