理解 Linux 条件变量

理解 Linux 条件变量

1 简介

当多个线程之间因为存在某种依赖关系,导致只有当某个条件存在时,才可以执行某个线程,此时条件变量(pthread_cond_t)可以派上用场。比如:

例1: 当系统不忙(这是一个条件)时,执行扫描文件状态的线程。

例2: 多个线程组成线程池,只有当任务队列中存在任务时,才用其中一个线程去执行这个任务。为避免惊群(thrundering herd),可以采用条件变量同步线程池中的线程。

2 用法

条件变量(pthread_cond_t)必须与锁(pthread_mutex_t)一起使用。

条件变量的API:

1) pthread_cond_init

2) pthread_cond_signal / pthread_cond_broadcast

3) pthread_cond_wait / pthread_cond_timedwait

4) pthread_cond_destroy

线程A:

include <stdio.h>  
include <sys/time.h>  
include <unistd.h>  
include <pthread.h>  
include <errno.h>
...

void A_thread_run(void *arg)
{
    ...

    pthread_mutex_lock (& lock);

    // 条件满足, 发出通知
    pthread_cond_signal (& cond);

    pthread_mutex_unlock (& lock);

    ...
}

线程B:

void B_thread_run(void *arg)
{
    for ( ; ; ) {
        pthread_mutex_lock (&lock);

        /* pthread_cond_wait 原子调用: 等待条件变量, 解除锁, 然后阻塞
         * 当 pthread_cond_wait 返回,则条件变量有信号,同时上锁
         *
         * 等待条件有两种方式:条件等待pthread_cond_wait()和计时等待pthread_cond_timedwait(),
         * 其中计时等待方式如果在给定时刻前条件没有满足,则返回ETIMEOUT
         * 无论哪种等待方式,都必须和一个互斥锁配合,以防止多个线程同时请求pthread_cond_wait()
         * (或pthread_cond_timedwait(),下同)的竞争条件(Race Condition)。
         * mutex互斥锁必须是普通锁(PTHREAD_MUTEX_TIMED_NP)或者适应锁(PTHREAD_MUTEX_ADAPTIVE_NP),
         * 且在调用pthread_cond_wait()前必须由本线程加锁(pthread_mutex_lock()),而在更新条件等待队列以前,
         * mutex保持锁定状态,并在线程挂起进入等待前解锁。
         * 在条件满足从而离开pthread_cond_wait()之前,mutex将被重新加锁,以与进入pthread_cond_wait()前的加锁动作对应。
         * 激发条件有两种形式,pthread_cond_signal()激活一个等待该条件的线程,存在多个等待线程时按入队顺序激活其中一个;
         * 而pthread_cond_broadcast()则激活所有等待线程(惊群)。
         */

        pthread_cond_wait (&cond, &lock);

        if (shutdown) {
            break;
        }

        /* Unlock */
        pthread_mutex_unlock (&lock);

        /* do your task here */
    }

    pthread_mutex_unlock (&lock);
    pthread_exit (0);
}

线程B调用pthread_cond_wait,从而阻塞在此句,等待有信号通知。pthread_cond_wait内部存在原子调用:解除锁和等待条件变量有信号。当pthread_cond_wait函数返回,表明得到了信号通知,同时上锁。

线程A用pthread_cond_signal通知调用了pthread_cond_wait的线程B。

3 避免惊群

这是个狼多肉少,僧多粥少,色鬼多美女少的时代。每当一块肉丢到狼群,就引发一群狼去争抢,但最后只有一只狼得到了肉。这就是惊群(thrundering herd)。现实世界的惊群,比如老师在课堂上每次提出一个问题,最后只找一个学生回答,时间久了,学生对这个老师的问题就倦怠了。计算机的惊群会造成服务器资源空耗。

pthread_cond_signal函数的作用是发送一个信号给另外一个正在处于阻塞等待状态的线程,使其脱离阻塞状态,继续执行.如果没有线程处在阻塞等待状态,pthread_cond_signal也会成功返回。

但使用pthread_cond_signal不会有“惊群现象”产生,它最多只给一个线程发信号。假如有多个线程正在阻塞等待着这个条件变量的话,那么是根据各等待线程优先级的高低确定哪个线程接收到信号开始继续执行。如果各线程优先级相同,则根据等待时间的长短来确定哪个线程获得信号。但无论如何一个pthread_cond_signal调用最多发信一次。

4 线程池threadpool

经典的例子就是一个线程池是一个固定数目的线程的组合,其中每个线程(worker)完全可以做相同的工作。线程池包含这样一个任务(task)队列,用户向任务队列中添加任务,线程池自动派发线程去执行任务。

每个线程有特定于线程的参数(thread argument),每个任务也有特定于任务的数据(task argument)。线程函数执行任务函数,同时传递给任务函数线程参数和任务参数。

典型的例子就是每个线程包含了到数据库或其他资源的连接,任务函数可以安全地使用这些连接,因为任务函数是在线程函数中同步执行的。

下面是完整的线程池代码,原来的代码中没有特定于线程的参数,我添加了这部分代码。

threadpool.h

/*
 * 2014-06-18: last modified by cheungmine
 *
 * Copyright (c) 2011, Mathias Brossard <[email protected]>.
 * All rights reserved.
 *
 * Redistribution and use in source and binary forms, with or without
 * modification, are permitted provided that the following conditions are
 * met:
 *
 *  1. Redistributions of source code must retain the above copyright
 *     notice, this list of conditions and the following disclaimer.
 *
 *  2. Redistributions in binary form must reproduce the above copyright
 *     notice, this list of conditions and the following disclaimer in the
 *     documentation and/or other materials provided with the distribution.
 *
 * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
 * "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
 * LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
 * A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
 * HOLDER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
 * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
 * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
 * DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
 * THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
 * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
 * OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
 */

#ifndef _THREADPOOL_H_
#define _THREADPOOL_H_

#ifndef POOL_MAX_THREADS
#   define POOL_MAX_THREADS  256
#endif

#ifndef POOL_MAX_QUEUES
#   define POOL_MAX_QUEUES  1024
#endif

#ifndef POOL_DEFAULT_THREADS
#   define POOL_DEFAULT_THREADS  32
#endif

#ifndef POOL_DEFAULT_QUEUES
#   define POOL_DEFAULT_QUEUES  256
#endif

/**
 * @file threadpool.h
 * @brief Threadpool Header file
 */

typedef struct threadpool_t threadpool_t;

/**
 * @file threadpool.h
 * @brief thread_context_t
 *   thread can take itself argument
 *   added by cheungmine.
 *   2014-06-17
 */
typedef struct thread_context_t
{
    void *pool;
    pthread_t thread;
    void *thread_arg;
    struct threadpool_task_t *task;
} thread_context_t;

/**
 *  @struct threadpool_task
 *  @brief the work struct
 *
 *  @var function Pointer to the function that will perform the task.
 *  @var argument Argument to be passed to the function.
 */
typedef struct threadpool_task_t
{
    void (*function)(thread_context_t *);
    int    flags;     /* user defined */
    void * argument;
} threadpool_task_t;

typedef enum
{
    threadpool_invalid        = -1,
    threadpool_lock_failure   = -2,
    threadpool_queue_full     = -3,
    threadpool_shutdown       = -4,
    threadpool_run_failure    = -5,
    threadpool_out_memory     = -6
} threadpool_error_t;

static const char* threadpool_error_messages[] = {
    "threadpool_success",
    "threadpool_invalid",
    "threadpool_lock_failure",
    "threadpool_queue_full",
    "threadpool_shutdown",
    "threadpool_run_failure",
    "threadpool_out_memory"
};

/**
 * @function threadpool_create
 * @brief Creates a threadpool_t object.
 * @param thread_count Number of worker threads.
 * @param queue_size   Size of the queue.
 * @param thread_args  array of arguments with count of thread_count, NULL if ignored.
 * @param flags        Unused parameter.
 * @return a newly created thread pool or NULL
 */
threadpool_t *threadpool_create (int thread_count, int queue_size, void **thread_args, int flags);

/**
 * @function threadpool_add
 * @brief add a new task in the queue of a thread pool
 * @param pool     Thread pool to which add the task.
 * @param function Pointer to the function that will perform the task.
 * @param argument Argument to be passed to the function.
 * @param flags    Unused parameter.
 * @return 0 if all goes well, negative values in case of error (@see
 * threadpool_error_t for codes).
 */
int threadpool_add (threadpool_t *pool, void (*routine)(thread_context_t *), void *task_arg, int flags);

/**
 * @function threadpool_destroy
 * @brief Stops and destroys a thread pool.
 * @param pool  Thread pool to destroy.
 * @param flags Unused parameter.
 */
int threadpool_destroy (threadpool_t *pool, int flags);

#endif /* _THREADPOOL_H_ */

threadpool.c

/*
 * 2014-06-18: last modified by cheungmine
 *
 * Copyright (c) 2011, Mathias Brossard <[email protected]>.
 * All rights reserved.
 *
 * Redistribution and use in source and binary forms, with or without
 * modification, are permitted provided that the following conditions are
 * met:
 *
 *  1. Redistributions of source code must retain the above copyright
 *     notice, this list of conditions and the following disclaimer.
 *
 *  2. Redistributions in binary form must reproduce the above copyright
 *     notice, this list of conditions and the following disclaimer in the
 *     documentation and/or other materials provided with the distribution.
 *
 * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
 * "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
 * LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
 * A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
 * HOLDER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
 * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
 * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
 * DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
 * THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
 * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
 * OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
 */

/**
 * @file threadpool.c
 * @brief Threadpool implementation file
 */

#include <stdlib.h>
#include <pthread.h>
#include <unistd.h>

#include "threadpool.h"

/**
 *  @struct threadpool
 *  @brief The threadpool struct
 *
 *  @var notify       Condition variable to notify worker threads.
 *  @var threads      Array containing worker threads ID.
 *  @var thread_count Number of threads
 *  @var queue        Array containing the task queue.
 *  @var queue_size   Size of the task queue.
 *  @var head         Index of the first element.
 *  @var tail         Index of the next element.
 *  @var shutdown     Flag indicating if the pool is shutting down
 */
struct threadpool_t {
    pthread_mutex_t lock;
    pthread_cond_t notify;

    int head;
    int tail;
    int count;
    int shutdown;
    int started;

    int thread_count;
    int queue_size;

    threadpool_task_t *queues;
    thread_context_t thread_ctxs[0];
};

/**
 * @function void *threadpool_run(void *threadpool)
 * @brief the worker thread
 * @param threadpool the pool which own the thread
 */
static void *threadpool_run (void *threadpool);

int threadpool_free(threadpool_t *pool);

threadpool_t *threadpool_create(int thread_count, int queue_size, void **thread_args, int flags)
{
    int i;
    threadpool_t *pool = NULL;

    /* Check thread_count for negative or otherwise very big input parameters */
    if (thread_count < 0 || thread_count > POOL_MAX_THREADS) {
        goto err;
    }
    if (thread_count == 0) {
        thread_count = POOL_DEFAULT_THREADS;
    }

    /* Check queue_size for negative or otherwise very big input parameters */
    if (queue_size < 0 || queue_size > POOL_MAX_QUEUES) {
        goto err;
    }
    if (queue_size == 0) {
        queue_size = POOL_DEFAULT_QUEUES;
    }

    /* create threadpool */
    if ( (pool = (threadpool_t *) malloc (sizeof(threadpool_t) +
        sizeof(thread_context_t) * thread_count +
        sizeof(threadpool_task_t) * queue_size)) == NULL ) {
        goto err;
    }

    /* Initialize */
    pool->thread_count = thread_count;
    pool->queue_size = queue_size;
    pool->head = pool->tail = pool->count = 0;
    pool->shutdown = pool->started = 0;
    pool->queues = (threadpool_task_t *) (& pool->thread_ctxs[thread_count]);

    /* Initialize mutex and conditional variable first */
    if ((pthread_mutex_init (&(pool->lock), NULL) != 0) ||
       (pthread_cond_init (&(pool->notify), NULL) != 0)) {
        goto err;
    }

    /* Start worker threads */
    for (i = 0; i < thread_count; i++) {
        thread_context_t * pctx = & pool->thread_ctxs[i];

        /* set pool to each thread context */
        pctx->pool = (void*) pool;

        /* assign thread argument if valid */
        if (thread_args) {
            pctx->thread_arg = thread_args[i];
        } else {
            pctx->thread_arg = 0;
        }

        if ( pthread_create (& pctx->thread, NULL, threadpool_run, (void*) pctx) != 0) {
            threadpool_destroy (pool, 0);
            return NULL;
        } else {
            pool->started++;
        }
    }

    return pool;

 err:
    if(pool) {
        threadpool_free(pool);
    }
    return NULL;
}

int threadpool_add (threadpool_t *pool, void (*function)(thread_context_t *), void *task_arg, int flags)
{
    int err = 0;
    int next;

    if ( pool == NULL || function == NULL ) {
        return threadpool_invalid;
    }

    if (pthread_mutex_lock (&(pool->lock)) != 0) {
        return threadpool_lock_failure;
    }

    next = pool->tail + 1;
    next = (next == pool->queue_size) ? 0 : next;

    do {
        /* Are we full ? */
        if (pool->count == pool->queue_size) {
            err = threadpool_queue_full;
            break;
        }

        /* Are we shutting down ? */
        if (pool->shutdown) {
            err = threadpool_shutdown;
            break;
        }

        /* Add task to queue */
        pool->queues[pool->tail].function = function;
        pool->queues[pool->tail].argument = task_arg;
        pool->queues[pool->tail].flags = flags;

        pool->tail = next;
        pool->count += 1;

        /* pthread_cond_broadcast */
        if (pthread_cond_signal (&(pool->notify)) != 0) {
            err = threadpool_lock_failure;
            break;
        }
    } while(0);

    if (pthread_mutex_unlock (&pool->lock) != 0) {
        err = threadpool_lock_failure;
    }

    return err;
}

int threadpool_destroy (threadpool_t *pool, int flags)
{
    int i, err = 0;

    if (pool == NULL) {
        return threadpool_invalid;
    }

    if (pthread_mutex_lock (&(pool->lock)) != 0) {
        return threadpool_lock_failure;
    }

    do {
        /* Already shutting down */
        if (pool->shutdown) {
            err = threadpool_shutdown;
            break;
        }

        pool->shutdown = 1;

        /* Wake up all worker threads */
        if ((pthread_cond_broadcast(&(pool->notify)) != 0) ||
           (pthread_mutex_unlock(&(pool->lock)) != 0)) {
            err = threadpool_lock_failure;
            break;
        }

        /* Join all worker thread */
        for (i = 0; i < pool->thread_count; i++) {
            if (pthread_join (pool->thread_ctxs[i].thread, NULL) != 0) {
                err = threadpool_run_failure;
            }
        }
    } while(0);

    if (pthread_mutex_unlock (&pool->lock) != 0) {
        err = threadpool_lock_failure;
    }

    /* Only if everything went well do we deallocate the pool */
    if (!err) {
        threadpool_free (pool);
    }
    return err;
}

int threadpool_free (threadpool_t *pool)
{
    if (pool == NULL || pool->started > 0) {
        return -1;
    }

    pthread_mutex_lock (&(pool->lock));
    pthread_mutex_destroy (&(pool->lock));
    pthread_cond_destroy (&(pool->notify));

    free(pool);
    return 0;
}

/**
 * each thread run function
 */
static void *threadpool_run (void * param)
{
    threadpool_task_t task;

    thread_context_t * thread_ctx = (thread_context_t *) param;
    threadpool_t * pool = thread_ctx->pool;

    for ( ; ; ) {
        /* Lock must be taken to wait on conditional variable */
        pthread_mutex_lock (&(pool->lock));

        /* Wait on condition variable, check for spurious wakeups.
           When returning from pthread_cond_wait(), we own the lock. */
        while ((pool->count == 0) && (!pool->shutdown)) {
            pthread_cond_wait (&(pool->notify), &(pool->lock));
        }

        if (pool->shutdown) {
            break;
        }

        /* Grab our task */
        task.function = pool->queues[pool->head].function;
        task.argument = pool->queues[pool->head].argument;
        task.flags    = pool->queues[pool->head].flags;

        thread_ctx->task = &task;

        pool->head += 1;
        pool->head = (pool->head == pool->queue_size) ? 0 : pool->head;
        pool->count -= 1;

        /* Unlock */
        pthread_mutex_unlock (&(pool->lock));

        /* Get to work */
        (*(task.function)) (thread_ctx);
    }

    pool->started--;

    pthread_mutex_unlock (&(pool->lock));
    pthread_exit (NULL);
    return (NULL);
}

理解 Linux 条件变量

时间: 2024-11-05 13:33:49

理解 Linux 条件变量的相关文章

Linux 条件变量

一.什么是条件变量 与互斥锁不同,条件变量是用来等待而不是用来上锁的.条件变量用来自动阻塞一个线程,直到某特殊情况发生为止.通常条件变量和互斥锁同时使用. 条件变量使我们可以睡眠等待某种条件出现.条件变量是利用线程间共享的全局变量进行同步的一种机制,主要包括两个动作:一个线程等待"条件变量的条件成立"而挂起:另一个线程使"条件成立"(给出条件成立信号). 条件的检测是在互斥锁的保护下进行的.如果一个条件为假,一个线程自动阻塞,并释放等待状态改变的互斥锁.如果另一个线

linux条件变量使用和与信号量的区别

近来在项目中用到条件变量和信号量做同步时,这一块一直都有了解,但也一直没有总结,这次总结一下,给大家提供点参考,也给自己留点纪念. 首先,关于信号量和条件变量的概念可以自行查看APUE,我这直接把APUE中的代码拿过来对比: 条件变量的使用: #include <pthread.h> struct msg { struct msg *m_next; /* ... more stuff here ... */ }; struct msg *workq; pthread_cond_t qready

Linux 条件变量函数signal和wait补充

pthread_cond_wait必须放在pthread_mutex_lock和pthread_mutex_unlock之间,因为他要根据共享变量的状态来觉得是否要等待,而为了不永远等待下去所以必须要在lock/unlock中共享变量的状态改变必须遵守lock/unlock的规则 pthread_cond_signal即可以放在pthread_mutex_lock和pthread_mutex_unlock之间,也可以放在pthread_mutex_lock和pthread_mutex_unloc

条件变量、信号量、互斥锁

转载 http://blog.csdn.net/yusiguyuan/article/details/14161225 线程间的同步技术,主要以互斥锁和条件变量为主,条件变量和互斥所的配合使用可以很好的处理对于条件等待的线程间的同步问题.举个例子:当有两个变量x,y需要在多线程间同步并且学要根据他们之间的大小比较来启动不同的线程执行顺序,这便用到了条件变量这一技术.看代码 1 #include <iostream> 2 #include <pthread.h> 3 using na

linux多线程-互斥&amp;条件变量与同步

多线程代码问题描述 我们都知道,进程是操作系统对运行程序资源分配的基本单位,而线程是程序逻辑,调用的基本单位.在多线程的程序中,多个线程共享临界区资源,那么就会有问题: 比如 #include <pthread.h> #include <unistd.h> #include <stdio.h> #include <stdlib.h> int g_val = 10; void * test1(void* args) { g_val = 20; printf(&

Linux 线程 条件变量

下面是一个多线程,生产者消费者问题,一个队列放暂存的数据: 1 #include <iostream> 2 #include <queue> 3 #include <stdlib.h> 4 #include <unistd.h> 5 #include <pthread.h> 6 7 using std::cout; 8 using std::endl; 9 using std::queue; 10 11 #define N 100 12 #def

Linux互斥量&amp;条件变量

互斥量 Mutex 互斥量1. #include <pthread.h>  2. int pthread_mutex_init(pthread_mutex_t *mutex, const pthread_mutexattr_t *mutexattr);  3.   4. int pthread_mutex_lock(pthread_mutex_t *mutex);  5.   6. int pthread_mutex_unlock(pthread_mutex_t *mutex);  7.   

深入理解Solaris内核中互斥锁(mutex)与条件变量(condvar)之协同工作原理

在Solaris上写内核模块总是会用到互斥锁(mutex)与条件变量(condvar), 光阴荏苒日月如梭弹指一挥间,Solaris的大船说沉就要沉了,此刻心情不是太好(Orz).每次被年轻的有才华的同事们(比如Letty同学)问起mutex和cv怎么协同工作的,我总是不能给出一个非常清晰的解释.直到今天,看了cv_wait()的源代码之后,我终于可以给他们一个清楚明白的回答了. Solaris的源码无法被公开粘贴出来,幸好还有OpenSolaris的继承者illumos. 先贴cv_wait(

Linux多线程编程-条件变量

条件变量 如果说线程间的互斥锁是用来同步共享数据的访问的话,那么条件变量是用于线程之间共享数据的值.条件变量提供了一种线程之间的通知机制,当某个共享数据达到某个值时,唤醒等待这个共享数据的线程.条件变量相关函数主要 有5个: #include <pthread.h> int pthread_cond_init(pthread_cond_t *cond, const pthread_condattr_t *cond_attr); int pthread_cond_destroy(pthread_