线程同步-条件变量

条件变量的使用:将互斥量的忙等机制改为通知机制

涉及到的函数有以下几个:

int pthread_cond_destroy(pthread_cond_t *cond);
/**********************
 *功能:条件变量的初始化
 *参数:cond:条件变量
 *      attr:条件变量的属性
 * ********************/
int pthread_cond_init(pthread_cond_t *restrict cond , const pthread_condattr_t *restrict attr);
/***静态初始化***/
pthread_cond_t cond = PTHREAD_COND_INITIALIZER;

/*****发送广播给所有处于阻塞等待状态的线程********/
int pthread_cond_broadcast(pthread_cond_t *cond);
/******叫醒某一个处于阻塞等待状态的线程*******/
int pthread_cond_signal(pthread_cond_t *cond);

int pthread_cond_timedwait(pthread_cond_t *restrict cond,pthread_mutex_t *restrict mutex,const struct timespec *restrict abstime);
/********使线程阻塞在一个条件变量上********/
int pthread_cond_wait(pthread_cond_t *restrict cond , pthread_mutex_t *restrict mutex);

eg:

mytbf.c

#include <stdio.h>
#include <stdlib.h>
#include <sys/types.h>
#include <sys/stat.h>
#include <fcntl.h>
#include <unistd.h>
#include <errno.h>
#include <pthread.h>

#include "mytbf.h"

//每个令牌桶内部的信息状态
struct mytbf_st
{
    int cps;//速率
    int burst;//上限
    int token;//令牌个数
    int pos;//令牌桶在这个令牌桶数组中的位置下标
    pthread_mutex_t mut;//并发多个线程同时使用成员,保证变量被独占使用,所以每个令牌桶应该有一把锁
    pthread_cond_t cond;//通知token累加完成
};

//令牌桶数组
static struct mytbf_st *job[MYTBF_MAX];
//由于令牌桶数组可能被其他并发线程同时访问修改(eg:同时调用init),所以需要添加互斥量
static pthread_mutex_t mut_job = PTHREAD_MUTEX_INITIALIZER;
//线程
static pthread_t tid;
//只执行一次
static pthread_once_t init_once = PTHREAD_ONCE_INIT;

//线程:任务是负责每秒向令牌桶数组中加token
static void *thr_alrm(void *p)
{
    int i;

    while(1)
    {
        //令牌桶数组加锁
        pthread_mutex_lock(&mut_job);
        for(i = 0 ; i < MYTBF_MAX; i++)
        {
            if(job[i] != NULL)
            {
                //锁令牌并自加
                pthread_mutex_lock(&job[i]->mut);
                //加令牌
                job[i]->token += job[i]->cps;
                if(job[i]->token > job[i]->burst)
                    job[i]->token = job[i]->burst;
                pthread_cond_broadcast(&job[i]->cond);
                pthread_mutex_unlock(&job[i]->mut);
            }
        }
        //令牌桶解锁
        pthread_mutex_unlock(&mut_job);
        sleep(1);
    }
}

//模块的卸载
static void module_unload(void)
{
    int i;
    //取消线程以及收尸
    pthread_cancel(tid);
    pthread_join(tid,NULL);
    //将令牌桶释放空间并销毁
    pthread_mutex_lock(&mut_job);
    for(i = 0 ; i < MYTBF_MAX ;i++)
    {
        if(job[i] != NULL)
        {
            pthread_mutex_destroy(&job[i]->mut);
            pthread_cond_destroy(&job[i]->cond);
            free(job[i]);
        }

    }
    pthread_mutex_unlock(&mut_job);
    pthread_mutex_destroy(&mut_job);

    return ;
}
//模块加载
static void module_load(void)
{
    int err;
    //创建线程
    err = pthread_create(&tid,NULL,thr_alrm,NULL);
    if(err)
    {
        fprintf(stderr,"pthread_create():%s\n",strerror(err));
        exit(1);
    }
    //挂载钩子函数:当exit时,调用module_unload
    atexit(module_unload);
}

//获取空置的令牌数组位置
static int get_free_pos_unlocked(void)
{
    int i;

    for(i = 0 ; i < MYTBF_MAX; i++)
        if(job[i] == NULL)
            return i;
    return -1;
}

//令牌桶初始化
mytbf_t *mytbf_init(int cps,int burst)
{
    struct mytbf_st *me;
    int pos;
    //只调用一次module_load
    pthread_once(&init_once,module_load);
    //申请空间
    me = malloc(sizeof(*me));
    if(me == NULL)
        return NULL;
    //赋初值
    me->cps = cps;
    me->burst = burst;
    me->token = 0;
    pthread_mutex_init(&me->mut,NULL);
    pthread_cond_init(&me->cond,NULL);

    //令牌桶加锁查看空位并赋值
    pthread_mutex_lock(&mut_job);

    pos = get_free_pos_unlocked();
    if(pos < 0)
    {
        free(me);
        pthread_mutex_unlock(&mut_job);
        return NULL;
    }

    me->pos = pos;
    job[pos] = me;

    pthread_mutex_unlock(&mut_job);

    return me;
}

static int min(int a,int b)
{
    if(a < b)
        return a;
    return b;
}
//获取令牌个数
int mytbf_fetchtoken(mytbf_t *ptr,int size)
{
    struct mytbf_st *me = ptr;
    int n;

    if(size < 0)
        return -EINVAL;
    //锁令牌并处理
    pthread_mutex_lock(&me->mut);
    //没有令牌,解决忙等
    while(me->token <= 0)
    {
        //非忙等
        pthread_cond_wait(&me->cond,&me->mut);
        /*忙等
        pthread_mutex_lock(&me->mut);
        sched_yield();
        pthread_mutex_unlock(&me->mut);
        */
    }

    n = min(me->token,size);

    me->token -= n;

    pthread_mutex_unlock(&me->mut);

    return n;
}

//返还令牌
int mytbf_returntoken(mytbf_t *ptr,int size)
{
    struct mytbf_st *me = ptr;

    if(size < 0)
        return -EINVAL;
    //加锁处理令牌
    pthread_mutex_lock(&me->mut);

    me->token += size;
    if(me->token > me->burst)
        me->token = me->burst;
    //令牌归还后广播
    pthread_cond_broadcast(&me->cond);
    pthread_mutex_unlock(&me->mut);

    return 0;
}
//销毁令牌
int mytbf_destroy(mytbf_t *ptr)
{
    struct mytbf_st *me = ptr;
    //令牌桶加锁销毁
    pthread_mutex_lock(&mut_job);
    job[me->pos] = NULL;
    pthread_mutex_unlock(&mut_job);
    //销毁互斥量和条件变量
    pthread_mutex_destroy(&me->mut);
    pthread_cond_destroy(&me->cond);

    free(ptr);
    return 0;
}

mytbf.h

 + main.c  makefile  + mytbf.c  mytbf.h
#ifndef MYTBF_H__
#define MYTBF_H__

typedef void mytbf_t;

#define MYTBF_MAX       1024

mytbf_t *mytbf_init(int cps,int burst);

int mytbf_fetchtoken(mytbf_t *,int size);

int mytbf_returntoken(mytbf_t *,int size);

int mytbf_destroy(mytbf_t *);

#endif

main.c

/******************
 *功能:按照流控方式(正常每秒10个字节,有令牌桶机制)从文件中读取数据并写stdout
 *      使用多线程方式
 *      使用互斥量和条件变量
 * ***************/
#include <stdio.h>
#include <stdlib.h>
#include <sys/types.h>
#include <sys/stat.h>
#include <fcntl.h>
#include <unistd.h>
#include <errno.h>
#include <signal.h>
#include <string.h>

#include "mytbf.h"
//速率:每秒传输10个字节
#define CPS         10
#define BUFSIZE     1024
//令牌上限:100个
#define BURST       100

int main(int argc,char **argv)
{
    int sfd,dfd = 1;
    char buf[BUFSIZE];
    int pos,len,ret;
    int size;
    mytbf_t *tbf;
    //1.判断输入文件路径是否正确合法
    if(argc < 2)
    {
        fprintf(stderr,"Usage...\n");
        exit(1);
    }
    //2.初始化令牌桶:设置速率和令牌上限
    tbf = mytbf_init(CPS,BURST);
    if(tbf == NULL)
    {
        fprintf(stderr,"mytbf_init() failed.\n");
        exit(1);
    }
    //3.尝试打开文件
    do
    {
        sfd = open(argv[1],O_RDONLY);
        if(sfd < 0)
        {
            if(errno != EINTR)
            {
                perror("open()");
                exit(1);
            }
        }
    }while(sfd < 0);

    //4.读写文件
    while(1)
    {
        //4.1获取令牌个数
        size = mytbf_fetchtoken(tbf,BUFSIZE);
        if(size < 0)
        {
            fprintf(stderr,"mytbf_fetchtoken():%s\n",strerror(-size));
            exit(1);
        }
        //4.2读取相应个数的数据
        while((len = read(sfd,buf,size)) < 0)
        {
            if(errno == EINTR)
                continue;
            perror("read()");
            break;
        }
        //4.3无数据返回
        if(len == 0)
            break;

        //4.4有数据判断文件实际读取的长度是否与令牌个数相同
        //   如果令牌数量 >  实际读取数量 需要返还令牌
        if(size-len > 0)
            mytbf_returntoken(tbf,size-len);

        //4.5输出位置定向
        pos = 0;
        //4.6写到输出文件
        while(len > 0)
        {
            ret = write(dfd,buf+pos,len);
            if(ret < 0)
            {
                if(errno == EINTR)
                    continue;
                perror("write()");
                exit(1);
            }
            pos += ret;
            len -= ret;
        }
    }
    //5.关闭文件,并销毁令牌桶
    close(sfd);

    mytbf_destroy(tbf);

    exit(0);
}

Makefile:

CFLAGS+=-pthread
LDFLAGS+=-pthread

all:mytbf

mytbf:mytbf.o main.o
    gcc $^ -o [email protected] $(CFLAGS) $(LDFLAGS)

clean:
    rm -rf *.o mytbf
时间: 2024-11-02 12:49:39

线程同步-条件变量的相关文章

linux系统编程:线程同步-条件变量(cond)

线程同步-条件变量(cond) 生产者与消费者问题 再引入条件变量之前,我们先看下生产者和消费者问题:生产者不断地生产产品,同时消费者不断地在消费产品. 这个问题的同步在于两处:第一,消费者之间需要同步:同一件产品只可由一人消费.第二,当无产品可消费时,消费者需等待生产者生产后,才可继续消费,这又是一个同步问题.详细了解:生产者消费者问题. 条件变量 条件变量是利用线程间共享的全局变量进行同步的一种机制,并且条件变量总是和互斥锁结合在一起. 相关函数 pthread_cond_t //条件变量类

linux网络编程-----&gt;线程同步--&gt;条件变量

开发使用多线程过程中, 不可避免的会出现多个线程同时操作同一块共享资源, 当操作全部为读时, 不会出现未知结果, 一旦当某个线程操作中有写操作时, 就会出现数据不同步的事件. 而出现数据混乱的原因: 资源共享(独享资源则不会) 调试随机(对数据的访问会出现竞争) 线程间缺少必要的同步机制 以上三点, 前两点不能被改变. 欲提高效率, 传递数据, 资源必须共享. 只要资源共享, 就一定会出现线程间资源竞争, 只要存在竞争关系, 数据就会出现混乱. 所以只能从第三点着手, 使多个线程在访问共享资源的

pThread线程(三) 线程同步--条件变量

条件变量(Condition Variables) 参考资料:http://game-lab.org/posts/posix-thread-cn/#5.1 条件变量是什么? 条件变量为我们提供了另一种线程间同步的方法,然而,互斥量是通过控制线程访问数据来实现同步,条件变量允许线程同步是基于实际数据的值. 如果没有条件变量,程序员需要让线程不断地轮询,以检查是否满足条件.由于线程处在一个不间断的忙碌状态,所以这是相当耗资源的.条件变量就是这么一个不需要轮询就可以解决这个问题的方法. 条件变量总是跟

线程同步——条件变量

1.互斥量的存在问题:     互斥量是线程程序必需的工具,但它们并非万能的.例如,如果线程正在等待共享数据内某个条件出现,那会发生什么呢?它可以重复对互斥对象锁定和解锁,每次都会检查共享数据结构,以查找某个值.但这是在浪费时间和资源,而且这种繁忙查询的效率非常低. 在每次检查之间,可以让调用线程短暂地进入睡眠,比如睡眠三秒钟,但是因此线程代码就无法最快作出响应.真正需要的是这样一种方法:当线程在等待满足某些条件时使线程进入睡眠状态.一旦条件满足,就唤醒因等待满足特定条件而睡眠的线程.如果能够做

c++11线程之条件变量condition_variable(二)

题目:编写一个程序,开启3个线程,这3个线程的ID分别为A.B.C,每个线程将自己的ID在屏幕上打印10遍,要求输出结果必须按ABC的顺序显示:如:ABCABC-.依次递推. 采用C++11实现: [cpp] view plaincopyprint? #include<iostream> #include<thread> #include<mutex> #include<condition_variable> using namespace std; mut

c++11线程之条件变量condition_variable

题目:子线程循环 10 次,接着主线程循环 100 次,接着又回到子线程循环 10 次,接着再回到主线程又循环 100 次,如此循环50次,试写出代码. [cpp] view plaincopyprint? #include<iostream> #include<thread> #include<mutex> #include<condition_variable> using namespace std; mutex m; condition_variab

多线程同步条件变量(转载)

最近看<UNIX环境高级编程>多线程同步,看到他举例说条件变量pthread_cond_t怎么用,愣是没有看懂,只好在网上找了份代码,跑了跑,才弄明白 [cpp] view plaincopy #include <pthread.h> #include <stdio.h> #include <stdlib.h> pthread_mutex_t mutex = PTHREAD_MUTEX_INITIALIZER;/*初始化互斥锁*/ pthread_cond_

pyhon——线程同步条件(event)

event.wait() wait未被设定时,线程会被卡住,执行不下去,一旦设定,就相当于pass event.set() 来给wait设定 event.clear() 来清除set设定 import threading import time class Boss(threading.Thread): def run(self): print("boss: 今晚加班到12.00") print(event.isSet()) event.set() time.sleep(5) prin

线程的条件变量实例

情景1: Jack开着一辆出租车来到一个网站停车.看见没人就走了.过段时间.Susan来到网站准备乘车.可是没有来,于是就等着.过了一会Mike开着车来到了这个网站,Sunsan就上了Mike的车走了.如图所看到的: 程序实现该情景: #include <stdio.h> #include <stdlib.h> #include <unistd.h> #include <pthread.h> pthread_cond_t taxicond = PTHREAD