这里,我们首先要实现一种数据结构,将相应的任务,线程的fd,还有队列实现。
声明代码如下:
1 #ifndef _HEAD_H 2 #define _HEAD_H 3 #include <stdio.h> 4 #include <stdlib.h> 5 #include <unistd.h> 6 #include <string.h> 7 #include <sys/stat.h> 8 #include <sys/select.h> 9 #include <sys/types.h> 10 #include <fcntl.h> 11 #include <pthread.h> 12 #include <sys/time.h> 13 #include <signal.h> 14 #define MSG_LEN 1024 15 #define TASK_CNT 1024 16 extern pthread_mutex_t lock; 17 extern pthread_cond_t cond1; 18 extern pthread_cond_t cond2; 19 typedef struct tag_fd 20 { 21 int s_rfd; 22 int s_wfd; 23 struct tag_fd *next; 24 }FD,*pFD; 25 typedef struct tag_task 26 { 27 char s_msg[MSG_LEN]; 28 int s_fd; 29 }TASK,*pTASK; 30 typedef struct tag_que 31 { 32 TASK arr[TASK_CNT+1]; 33 int front; 34 int tail; 35 }QUEUE,*pQUEUE; 36 void fd_insert(pFD *phead,int rfd,int wfd); 37 void fd_init(pFD *phead); 38 int fd_find(pFD phead,int rfd); 39 void fd_del(pFD *phead,int rfd); 40 void add_task(pQUEUE pq,pTASK pt); 41 void get_task(pQUEUE pq,pTASK pt); 42 void excute_task(pTASK pt); 43 #endif
我们需要根据线程的占用情况,控制好,所以我们应把线程插入到一个链表中。
实现代码如下:
1 #include "head.h" 2 void fd_init(pFD *phead) 3 { 4 *phead= NULL; 5 } 6 void fd_insert(pFD *phead,int rfd,int wfd) 7 { 8 pFD pnew = (pFD )calloc(1,sizeof(FD)); 9 pnew->s_rfd=rfd; 10 pnew->s_wfd=wfd; 11 pnew->next = *phead; 12 *phead = pnew; 13 } 14 int fd_find(pFD phead,int rfd) 15 { 16 while(phead) 17 { 18 if(phead->s_rfd==rfd) 19 break; 20 else 21 phead = phead->next; 22 } 23 if(phead == NULL) 24 return -1; 25 else 26 return phead->s_wfd; 27 } 28 29 void fd_del(pFD *phead,int rfd) 30 { 31 pFD pcur,ppre; 32 pcur=*phead; 33 ppre=NULL; 34 while(pcur) 35 { 36 if(pcur->s_rfd == rfd) 37 break; 38 else 39 { 40 ppre=pcur; 41 pcur = pcur ->next; 42 } 43 } 44 if(ppre==NULL) 45 { 46 *phead=pcur->next; 47 free(pcur); 48 pcur=NULL; 49 } 50 else 51 { 52 ppre->next=pcur->next; 53 free(pcur); 54 pcur=NULL; 55 } 56 }
然后,我们还需要实现对任务的控制,例如任务的添加、获得、执行等。
实现代码如下:
1 #include "head.h" 2 static int que_empty(pQUEUE pq) 3 { 4 return pq->front == pq->tail; 5 } 6 static int que_full(pQUEUE pq) 7 { 8 return (pq->tail+1)%(TASK_CNT+1)==pq->front; 9 } 10 static int que_cnt(pQUEUE pq) 11 { 12 return (pq->tail - pq->front +TASK_CNT+1)%(TASK_CNT + 1); 13 } 14 void add_task(pQUEUE pq ,pTASK pt) 15 { 16 pthread_mutex_lock(&lock); 17 while(que_full(pq)) 18 pthread_cond_wait(&cond1,&lock); 19 pq->arr[pq->tail]=*pt; 20 pq->tail = (pq->tail+1)%(TASK_CNT+1); 21 if(que_cnt(pq)==1) 22 pthread_cond_broadcast(&cond2); 23 printf("添加了一个任务!!\n"); 24 pthread_mutex_unlock(&lock); 25 } 26 void get_task(pQUEUE pq ,pTASK pt) 27 { 28 pthread_mutex_lock(&lock); 29 while(que_empty(pq)) 30 pthread_cond_wait(&cond2,&lock); 31 *pt=pq->arr[pq->front]; 32 pq->front = (pq->front+1)%(TASK_CNT+1); 33 if(que_cnt(pq)== TASK_CNT -1) 34 pthread_cond_broadcast(&cond1); 35 printf("获得了一个任务!!\n"); 36 pthread_mutex_unlock(&lock); 37 } 38 39 40 void excute_task(pTASK pt) 41 { 42 char buf[1024]; 43 memset(buf,0,1024); 44 strcpy(buf,pt->s_msg); 45 int index; 46 for(index=0;index < strlen(buf);index++) 47 buf[index]=toupper(buf[index]); 48 buf[index]=‘\0‘; 49 write(pt -> s_fd,buf,strlen(buf)); 50 }
最后,我们只需在服务器端应用select循环查询是否有任务,再执行相应的操作。
服务器实现代码如下:
1 #include "head.h" 2 pthread_mutex_t lock; 3 pthread_cond_t cond1,cond2; 4 void* hand(void* arg) 5 { 6 pthread_detach(pthread_self()); 7 TASK task; 8 pQUEUE pq = (pQUEUE)arg; 9 while(1) 10 { 11 get_task(pq,&task); 12 excute_task(&task); 13 sleep(1); 14 } 15 } 16 int main(int argc,char *argv[]) 17 { 18 if(argc != 3) 19 { 20 perror("参数错误!!\n"); 21 exit(1); 22 } 23 signal(SIGINT,SIG_IGN); 24 signal(SIGPIPE,SIG_IGN); 25 signal(SIGQUIT,SIG_IGN); 26 QUEUE que; 27 int fd; 28 fd_set read_set,revc; 29 pFD list; 30 memset(&que,0,sizeof(QUEUE)); 31 fd_init(&list); 32 int cnt = atoi(argv[2]); 33 pthread_t *arr=(pthread_t *)calloc(cnt,sizeof(pthread_t)); 34 pthread_mutex_init(&lock,NULL); 35 pthread_cond_init(&cond1,NULL); 36 pthread_cond_init(&cond2,NULL); 37 int index=0; 38 while(cnt > 0) 39 { 40 pthread_create(arr+index,NULL,hand,(void*)&que); 41 cnt--; 42 index++; 43 } 44 fd = open(argv[1],O_RDONLY); 45 if(fd == -1) 46 { 47 perror("管道打开失败!!\n"); 48 exit(1); 49 } 50 struct timeval tm; 51 int ret; 52 FD_ZERO(&read_set); 53 FD_ZERO(&revc); 54 FD_SET(fd,&read_set); 55 while(1) 56 { 57 tm.tv_sec=0; 58 tm.tv_usec=1000; 59 revc = read_set; 60 ret=select(1024,&revc,NULL,NULL,&tm); 61 if(ret == 0) 62 continue; 63 else if(ret > 0) 64 { 65 if(FD_ISSET(fd,&revc)) 66 { 67 char buf[32]; 68 memset(buf,0,32); 69 if(read(fd,buf,32)==0) 70 continue; 71 else 72 { 73 char name[32]; 74 int r_fd,w_fd; 75 buf[strlen(buf)-1]=‘\0‘; 76 memset(name,0,32); 77 sprintf(name,"r.%s",buf); 78 w_fd=open(name,O_WRONLY); 79 memset(name,0,32); 80 sprintf(name,"w.%s",buf); 81 r_fd=open(name,O_RDONLY); 82 fd_insert(&list,r_fd,w_fd); 83 FD_SET(r_fd,&read_set); 84 } 85 } 86 } 87 pFD pcur=list; 88 while(pcur) 89 { 90 if(FD_ISSET(pcur->s_rfd,&revc)) 91 { 92 char buf[1024]; 93 memset(buf,0,1024); 94 if(read(pcur->s_rfd,buf,1024)==0) 95 { 96 FD_CLR(pcur->s_rfd,&read_set); 97 int i=pcur->s_rfd; 98 pcur=pcur->next; 99 fd_del(&list,i); 100 } 101 else 102 { 103 TASK tk; 104 memset(&tk,0,sizeof(TASK)); 105 tk.s_fd=pcur->s_wfd; 106 strcpy(tk.s_msg,buf); 107 add_task(&que,&tk); 108 pcur=pcur->next; 109 } 110 } 111 else 112 pcur=pcur->next; 113 } 114 } 115 pthread_mutex_destory(&lock); 116 pthread_cond_destory(&cond1); 117 pthread_cond_destory(&cond2); 118 return 0; 119 }
客户端实现代码如下:
1 #include <stdio.h> 2 #include <stdlib.h> 3 #include <fcntl.h> 4 #include <unistd.h> 5 #include <string.h> 6 #include <sys/stat.h> 7 #include <sys/types.h> 8 int main(int argc,char *argv[]) 9 { 10 int fd_server,send,revc; 11 char rname[32],wname[32]; 12 memset(rname,0,32); 13 memset(wname,0,32); 14 sprintf(rname,"r.%d",getpid()); 15 sprintf(wname,"w.%d",getpid()); 16 mkfifo(rname,0666); 17 mkfifo(wname,0666); 18 fd_server=open(argv[1],O_WRONLY); 19 char msg[1024]; 20 memset(msg,0,1024); 21 sprintf(msg,"%d\n",getpid()); 22 write(fd_server,msg,strlen(msg)); 23 revc=open(rname,O_RDONLY); 24 send=open(wname,O_WRONLY); 25 while(memset(msg,0,1024),fgets(msg,1024,stdin)) 26 { 27 write(send,msg,strlen(msg)); 28 memset(msg,0,1024); 29 read(revc,msg,1024); 30 write(1,msg,strlen(msg)); 31 } 32 close(fd_server); 33 close(send); 34 close(revc); 35 unlink(rname); 36 unlink(wname); 37 return 0; 38 }
时间: 2024-11-29 12:12:07