Linux基础——多线程实现任务

这里,我们首先要实现一种数据结构,将相应的任务,线程的fd,还有队列实现。

声明代码如下:

 1 #ifndef _HEAD_H
 2 #define _HEAD_H
 3 #include <stdio.h>
 4 #include <stdlib.h>
 5 #include <unistd.h>
 6 #include <string.h>
 7 #include <sys/stat.h>
 8 #include <sys/select.h>
 9 #include <sys/types.h>
10 #include <fcntl.h>
11 #include <pthread.h>
12 #include <sys/time.h>
13 #include <signal.h>
14 #define MSG_LEN 1024
15 #define TASK_CNT 1024
16 extern pthread_mutex_t lock;
17 extern pthread_cond_t cond1;
18 extern pthread_cond_t cond2;
19 typedef struct tag_fd
20 {
21     int s_rfd;
22     int s_wfd;
23     struct tag_fd *next;
24 }FD,*pFD;
25 typedef struct tag_task
26 {
27     char s_msg[MSG_LEN];
28     int s_fd;
29 }TASK,*pTASK;
30 typedef struct tag_que
31 {
32     TASK arr[TASK_CNT+1];
33     int front;
34     int tail;
35 }QUEUE,*pQUEUE;
36 void fd_insert(pFD *phead,int rfd,int wfd);
37 void fd_init(pFD *phead);
38 int  fd_find(pFD phead,int rfd);
39 void fd_del(pFD *phead,int rfd);
40 void add_task(pQUEUE pq,pTASK pt);
41 void get_task(pQUEUE pq,pTASK pt);
42 void excute_task(pTASK pt);
43 #endif

我们需要根据线程的占用情况,控制好,所以我们应把线程插入到一个链表中。
实现代码如下:

 1 #include "head.h"
 2 void fd_init(pFD *phead)
 3 {
 4     *phead= NULL;
 5 }
 6 void fd_insert(pFD *phead,int rfd,int wfd)
 7 {
 8     pFD pnew = (pFD )calloc(1,sizeof(FD));
 9     pnew->s_rfd=rfd;
10     pnew->s_wfd=wfd;
11     pnew->next = *phead;
12     *phead = pnew;
13 }
14 int fd_find(pFD phead,int rfd)
15 {
16     while(phead)
17     {
18         if(phead->s_rfd==rfd)
19             break;
20         else
21             phead = phead->next;
22     }
23     if(phead == NULL)
24         return -1;
25     else
26         return phead->s_wfd;
27 }
28
29 void fd_del(pFD *phead,int rfd)
30 {
31     pFD pcur,ppre;
32     pcur=*phead;
33     ppre=NULL;
34     while(pcur)
35     {
36         if(pcur->s_rfd == rfd)
37             break;
38         else
39         {
40             ppre=pcur;
41             pcur = pcur ->next;
42         }
43     }
44     if(ppre==NULL)
45     {
46         *phead=pcur->next;
47         free(pcur);
48         pcur=NULL;
49     }
50     else
51     {
52         ppre->next=pcur->next;
53         free(pcur);
54         pcur=NULL;
55     }
56 }

然后,我们还需要实现对任务的控制,例如任务的添加、获得、执行等。

实现代码如下:

 1 #include "head.h"
 2 static int que_empty(pQUEUE pq)
 3 {
 4     return pq->front == pq->tail;
 5 }
 6 static int que_full(pQUEUE pq)
 7 {
 8     return (pq->tail+1)%(TASK_CNT+1)==pq->front;
 9 }
10 static int que_cnt(pQUEUE pq)
11 {
12     return (pq->tail - pq->front +TASK_CNT+1)%(TASK_CNT + 1);
13 }
14 void add_task(pQUEUE pq ,pTASK pt)
15 {
16     pthread_mutex_lock(&lock);
17     while(que_full(pq))
18         pthread_cond_wait(&cond1,&lock);
19     pq->arr[pq->tail]=*pt;
20     pq->tail = (pq->tail+1)%(TASK_CNT+1);
21     if(que_cnt(pq)==1)
22         pthread_cond_broadcast(&cond2);
23     printf("添加了一个任务!!\n");
24     pthread_mutex_unlock(&lock);
25 }
26 void get_task(pQUEUE pq ,pTASK pt)
27 {
28     pthread_mutex_lock(&lock);
29     while(que_empty(pq))
30         pthread_cond_wait(&cond2,&lock);
31     *pt=pq->arr[pq->front];
32     pq->front = (pq->front+1)%(TASK_CNT+1);
33     if(que_cnt(pq)== TASK_CNT -1)
34         pthread_cond_broadcast(&cond1);
35     printf("获得了一个任务!!\n");
36     pthread_mutex_unlock(&lock);
37 }
38
39
40 void excute_task(pTASK pt)
41 {
42     char buf[1024];
43     memset(buf,0,1024);
44     strcpy(buf,pt->s_msg);
45     int index;
46     for(index=0;index < strlen(buf);index++)
47         buf[index]=toupper(buf[index]);
48     buf[index]=‘\0‘;
49     write(pt -> s_fd,buf,strlen(buf));
50 }

最后,我们只需在服务器端应用select循环查询是否有任务,再执行相应的操作。

服务器实现代码如下:

  1 #include "head.h"
  2 pthread_mutex_t lock;
  3 pthread_cond_t cond1,cond2;
  4 void* hand(void* arg)
  5 {
  6     pthread_detach(pthread_self());
  7     TASK task;
  8     pQUEUE pq = (pQUEUE)arg;
  9     while(1)
 10     {
 11         get_task(pq,&task);
 12         excute_task(&task);
 13         sleep(1);
 14     }
 15 }
 16 int main(int argc,char *argv[])
 17 {
 18     if(argc != 3)
 19     {
 20         perror("参数错误!!\n");
 21         exit(1);
 22     }
 23     signal(SIGINT,SIG_IGN);
 24     signal(SIGPIPE,SIG_IGN);
 25     signal(SIGQUIT,SIG_IGN);
 26     QUEUE que;
 27     int fd;
 28     fd_set read_set,revc;
 29     pFD list;
 30     memset(&que,0,sizeof(QUEUE));
 31     fd_init(&list);
 32     int cnt = atoi(argv[2]);
 33     pthread_t *arr=(pthread_t *)calloc(cnt,sizeof(pthread_t));
 34     pthread_mutex_init(&lock,NULL);
 35     pthread_cond_init(&cond1,NULL);
 36     pthread_cond_init(&cond2,NULL);
 37     int index=0;
 38     while(cnt > 0)
 39     {
 40         pthread_create(arr+index,NULL,hand,(void*)&que);
 41         cnt--;
 42         index++;
 43     }
 44     fd = open(argv[1],O_RDONLY);
 45     if(fd == -1)
 46     {
 47         perror("管道打开失败!!\n");
 48         exit(1);
 49     }
 50     struct timeval tm;
 51     int ret;
 52     FD_ZERO(&read_set);
 53     FD_ZERO(&revc);
 54     FD_SET(fd,&read_set);
 55     while(1)
 56     {
 57         tm.tv_sec=0;
 58         tm.tv_usec=1000;
 59         revc = read_set;
 60         ret=select(1024,&revc,NULL,NULL,&tm);
 61         if(ret == 0)
 62             continue;
 63         else if(ret > 0)
 64         {
 65             if(FD_ISSET(fd,&revc))
 66             {
 67                 char buf[32];
 68                 memset(buf,0,32);
 69                 if(read(fd,buf,32)==0)
 70                     continue;
 71                 else
 72                 {
 73                     char name[32];
 74                     int r_fd,w_fd;
 75                     buf[strlen(buf)-1]=‘\0‘;
 76                     memset(name,0,32);
 77                     sprintf(name,"r.%s",buf);
 78                     w_fd=open(name,O_WRONLY);
 79                     memset(name,0,32);
 80                     sprintf(name,"w.%s",buf);
 81                     r_fd=open(name,O_RDONLY);
 82                     fd_insert(&list,r_fd,w_fd);
 83                     FD_SET(r_fd,&read_set);
 84                 }
 85             }
 86         }
 87         pFD pcur=list;
 88         while(pcur)
 89         {
 90             if(FD_ISSET(pcur->s_rfd,&revc))
 91             {
 92                 char buf[1024];
 93                 memset(buf,0,1024);
 94                 if(read(pcur->s_rfd,buf,1024)==0)
 95                 {
 96                     FD_CLR(pcur->s_rfd,&read_set);
 97                     int i=pcur->s_rfd;
 98                     pcur=pcur->next;
 99                     fd_del(&list,i);
100                 }
101                 else
102                 {
103                     TASK tk;
104                     memset(&tk,0,sizeof(TASK));
105                     tk.s_fd=pcur->s_wfd;
106                     strcpy(tk.s_msg,buf);
107                     add_task(&que,&tk);
108                     pcur=pcur->next;
109                 }
110             }
111             else
112                 pcur=pcur->next;
113         }
114     }
115     pthread_mutex_destory(&lock);
116     pthread_cond_destory(&cond1);
117     pthread_cond_destory(&cond2);
118     return 0;
119 }

客户端实现代码如下:

 1 #include <stdio.h>
 2 #include <stdlib.h>
 3 #include <fcntl.h>
 4 #include <unistd.h>
 5 #include <string.h>
 6 #include <sys/stat.h>
 7 #include <sys/types.h>
 8 int main(int argc,char *argv[])
 9 {
10     int fd_server,send,revc;
11     char rname[32],wname[32];
12     memset(rname,0,32);
13     memset(wname,0,32);
14     sprintf(rname,"r.%d",getpid());
15     sprintf(wname,"w.%d",getpid());
16     mkfifo(rname,0666);
17     mkfifo(wname,0666);
18     fd_server=open(argv[1],O_WRONLY);
19     char msg[1024];
20     memset(msg,0,1024);
21     sprintf(msg,"%d\n",getpid());
22     write(fd_server,msg,strlen(msg));
23     revc=open(rname,O_RDONLY);
24     send=open(wname,O_WRONLY);
25     while(memset(msg,0,1024),fgets(msg,1024,stdin))
26     {
27         write(send,msg,strlen(msg));
28         memset(msg,0,1024);
29         read(revc,msg,1024);
30         write(1,msg,strlen(msg));
31     }
32     close(fd_server);
33     close(send);
34     close(revc);
35     unlink(rname);
36     unlink(wname);
37     return 0;
38 }

时间: 2024-11-29 12:12:07

Linux基础——多线程实现任务的相关文章

[Z] linux基础编程:IO模型:阻塞/非阻塞/IO复用 同步/异步 Select/Epoll/AIO

原文链接:http://blog.csdn.net/colzer/article/details/8169075 IO概念 Linux的内核将所有外部设备都可以看做一个文件来操作.那么我们对与外部设备的操作都可以看做对文件进行操作.我们对一个文件的读写,都通过调用内核提供的系统调用:内核给我们返回一个file descriptor(fd,文件描述符).而对一个socket的读写也会有相应的描述符,称为socketfd(socket描述符).描述符就是一个数字,指向内核中一个结构体(文件路径,数据

linux下多线程编程

最近研究mysql源码,各种锁,各种互斥,好在我去年认真学了<unix环境高级编程>, 虽然已经忘得差不多了,但是学过始终是学过,拿起来也快.写这篇文章的目的就是总结linux 下多线程编程,作为日后的参考资料. 本文将介绍linux系统下多线程编程中,线程同步的各种方法.包括: 互斥量(mutex) 读写锁 条件变量 信号量 文件互斥 在介绍不同的线程同步的方法之前,先简单的介绍一下进程和线程的概念, 它们的优缺点,线程相关的API,读者——写者问题和哲学家就餐问题. 基础知识 1. 进程和

linux基础编程:IO模型:阻塞/非阻塞/IO复用 同步/异步 Select/Epoll/AIO(转载)

IO概念 Linux的内核将所有外部设备都可以看做一个文件来操作.那么我们对与外部设备的操作都可以看做对文件进行操作.我们对一个文件的读写,都通过调用内核提供的系统调用:内核给我们返回一个file descriptor(fd,文件描述符).而对一个socket的读写也会有相应的描述符,称为socketfd(socket描述符).描述符就是一个数字,指向内核中一个结构体(文件路径,数据区,等一些属性).那么我们的应用程序对文件的读写就通过对描述符的读写完成. linux将内存分为内核区,用户区.l

Linux基础--进程管理相关命令介绍(1)

本文主要介绍了Linux中进程管理的相关命令,涉及到的主要命令有pstree,ps. (1)pstree ①功能:用来查看进程树 ②用例: (2)ps ①功能:用来显示系统当前进程状态的命令 ②用例: ③相关用法: ps a:显示所有跟终端相关的进程 ps x:显示所有跟终端无关的进程 相关注释: STAT各代码含义: S:可中断的睡眠态 D:不可中断的睡眠态 R:运行或可运行 T:停止 Z:僵死 s:session leader l:多线程进程 +:前台进程 N:低优先级进程 <:高优先级进程

老男孩26期运维班linux基础知识大比拼即将开始

老男孩26期运维班linux基础知识大比拼 1 比赛说明 比赛时间:2015年11月12日下午15点 比赛地点:老男孩教育教室二 比赛人员:老男孩教育26期全体同学 奖项:团体奖(按组奖励共1-9组): 一等奖奖励300元人民币或每人老男孩老师新书一本和老师合影签名. 二等奖奖励200元人民币 三等奖奖励100元人民币 个人奖:3名,赠老男孩新书一本 惩罚:没有得奖的组,或者组内无人得奖的组,罚100元或派选代表表演一个节目(唱歌或其它) 比赛规则当场宣布: 主评委:老男孩老师,张导 辅助评委:

linux基础part1

linux基础部分一 一.linux简介 1.掌握Linux的定义:Linux是一套免费使用和自由传播的类Unix操作系统,是一个基于POSIX和UNIX的多用户.多任务.支持多线程和多CPU的操作系统. 2.掌握Linux操作系统的主要用途:主要用于服务器,特别是网络服务器. 3. 掌握两种常见的桌面环境:KDE和GNOME. 二. 登录和退出Linux 1.掌握关闭Linux系统的命令:init 0 2.掌握什么是Linux终端:Linux终端也称为虚拟控制台.Linux终端采用字符命令行方

[笔面] Linux基础部分

Linux基础 1. 绝对路径用什么符号表示?当前目录.上层目录用什么表示?主目录用什么表示? 切换目录用什么命令?2. 怎么查看当前进程?怎么执行退出?怎么查看当前路径?3. 怎么清屏?怎么退出当前命令?怎么执行睡眠?怎么查看当前用户id?查看指定帮助用什么命令?4. Ls 命令执行什么功能? 可以带哪些参数,有什么区别?5. 建立软链接(快捷方式),以及硬链接的命令.6. 目录创建用什么命令?创建文件用什么命令?复制文件用什么命令?7. 文件权限修改用什么命令?格式是怎么样的?8. 查看文件

2017-2018-2 20179204《网络攻防实践》第一周学习总结之linux基础

我在实验楼中学习了Linux基础入门课程,这里做一个学习小结. 第一节 linux系统简介 本节主要介绍了linux是什么.发展历史.重要人物.linux与window的区别以及如何学习linux. 1.什么是linux Linux是一个操作系统,就像Windows(xp,7,8)和 Mac OS.Linux 主要是系统调用和内核那两层.直观地看,操作系统还包含一些在其上运行的应用程序,比如文本编辑器.浏览器.电子邮件等. 2.linux与windows的区别 linux免费或收取少许费用: l

Linux基础入门级命令文档

Linux系统上命令的使用格式,及常用命令示例 1.命令提示符 登录系统后,第一眼看到的内容是: [[email protected] ~]# 上图就是 Linux 系统的命令提示符.那么,这个提示符的含义是什么呢? []:这是提示符的分隔符号,没有特殊含义. root:显示的是当前的登录用户,笔者现在使用的是 root 用户登录. @:分隔符号,没有特殊含义. node1:当前系统的简写主机名node1. ~:代表用户当前所在的目录,此例中用户当前所在的目录是家目录. ]#:命令提示符,Lin