1 #include<stdio.h> 2 #include<string.h> 3 #include<pthread.h> 4 #include<stdlib.h> 5 #include<unistd.h> 6 #include<queue> 7 using namespace std; 8 #define LEN 128 9 typedef struct task_que 10 { 11 queue<char*> string_queue; 12 int m_flag ; //结束标志 13 int m_capacity; 14 pthread_mutex_t m_lock; 15 pthread_cond_t m_pro , m_con; 16 }QUE , *pQUE; 17 void* thd_func(void* arg); 18 void put(pQUE pq, char* src); 19 void get(pQUE pq, char* dest); 20 int main(int argc, char *argv[]) 21 { 22 QUE aque; 23 aque.m_flag = 0; 24 int nthds = atoi( argv[1] ); //线程的个数 25 aque.m_capacity = atoi( argv[2] );//字符串队列的大小 26 pthread_mutex_init( &aque.m_lock , NULL ); //锁 27 pthread_cond_init( &aque.m_pro ,NULL ); //生产者条件变量 28 pthread_cond_init( &aque.m_con , NULL ); //消费者条件变量 29 //开辟线程空间 30 pthread_t *thd_arr = ( pthread_t*)calloc(nthds, sizeof(pthread_t)); 31 int* ntasks = (int*)calloc(nthds, sizeof(int));//用来记录 线程工作了几次 32 //创建线程 33 int index; 34 for(index = 0; index < nthds; index ++) 35 { 36 pthread_create( thd_arr + index, NULL, thd_func,(void*)&aque ); 37 } 38 39 40 //输入字符串到队列中 41 char buf[LEN] ; 42 while( memset(buf, 0, LEN), fgets(buf, LEN, stdin) != NULL) 43 { 44 put(&aque, buf); 45 } 46 47 48 49 //发出结束字符串 50 strcpy(buf, "over"); 51 put(&aque, buf); 52 53 for(index = 0; index < nthds; index ++) 54 { 55 pthread_join(thd_arr[index], (void**)(ntasks + index )); 56 } 57 for(index = 0; index < nthds; index ++) 58 { 59 printf("%d ", ntasks[index]); 60 } 61 printf("\n"); 62 63 pthread_mutex_destroy(&aque.m_lock); 64 pthread_cond_destroy(&aque.m_pro); 65 pthread_cond_destroy(&aque.m_con); 66 return 0 ; 67 } 68 void put(pQUE pq, char* src) //把字符串写到队列中 69 { 70 pthread_mutex_lock(&pq ->m_lock); //加锁 71 while(pq ->string_queue.size() == pq ->m_capacity) //队列满则阻塞 72 { 73 pthread_cond_wait(&pq -> m_pro, &pq ->m_lock); 74 75 } 76 //插入队列 77 char* tem = ( char*)calloc( LEN , sizeof( char )); 78 strcpy(tem,src); 79 pq->string_queue.push(tem); 80 pthread_mutex_unlock(&pq -> m_lock); //解锁 81 pthread_cond_broadcast(&pq ->m_con); //唤醒所有消费者线程 82 83 84 } 85 86 void get(pQUE pq, char* dest) 87 { 88 pthread_mutex_lock(&pq ->m_lock); //加锁 89 while(pq ->m_flag == 0 && pq ->string_queue.empty() ) //队列空 并且未结束 则阻塞 90 { 91 pthread_cond_wait(&pq ->m_con, &pq ->m_lock); 92 } 93 if(pq ->m_flag == 1) //判断结束标志 94 { 95 pthread_mutex_unlock(&pq ->m_lock); //解锁 96 return ; 97 } 98 //出队 99 strcpy(dest, pq ->string_queue.front()); 100 pq->string_queue.pop(); 101 102 pthread_mutex_unlock(&pq ->m_lock); 103 pthread_cond_signal(&pq ->m_pro); 104 105 } 106 107 void* thd_func(void* arg) 108 { 109 pQUE pq = (pQUE)arg ; 110 char buf[LEN] ; 111 int ncnt = 0 ; 112 while(1) 113 { 114 memset(buf, 0, LEN) ; 115 get(pq, buf); 116 if(pq ->m_flag == 1) //判断结束标志 117 { 118 printf("%u exit!\n", pthread_self()); 119 pthread_exit((void*)ncnt); //退出 120 } 121 ncnt ++ ; 122 printf("%u: %s\n", pthread_self(), buf); //打印字符串 123 if(strcmp("over", buf) == 0) //判断结束字符串 124 { 125 pq ->m_flag = 1 ; //把结束符号置为 1 126 pthread_cond_broadcast(&pq ->m_con); //唤醒所有线程 127 pthread_exit((void*)ncnt); //退出 128 } 129 if(ncnt & 1 == 1) sleep(1); //简单的负载平衡 130 } 131 }
时间: 2024-10-10 22:37:00