浅析线程间通信三:Barriers、信号量(semaphores)以及各种同步方法比较

之前的文章讨论了互斥量、条件变量、读写锁和自旋锁用于线程的同步,本文将首先讨论Barriers和信号量的使用,并给出了相应的代码和注意事项,相关代码也可在我的github上下载,然后对线程各种同步方法进行了比较。

Barriers

Barriers是一种不同于前面线程同步机制,它主要用于协调多个线程并行(parallel)共同完成某项任务。一个barrier对象可以使得每个线程阻塞,直到所有协同(合作完成某项任务)的线程执行到某个指定的点,才让这些线程继续执行。前面使用的pthread_join调用也可以看成简单的barrier,那里主线程要求其他线程都退出后,才能继续执行。可以使用接口pthread_barrier_init来初始化一个对象,用phread_barrier_destory来销毁一个barrier对象。他们声明如下:

#include <pthread.h>
int pthread_barrier_destroy(pthread_barrier_t *barrier);
int pthread_barrier_init(pthread_barrier_t *restrict barrier,
const pthread_barrierattr_t *restrict attr, unsigned count); 

当使用pthread_barrier_init接口初始化hygienebarrier对象时,参数count指定需要多少个线程执行到指定的点,才能使得所有线程继续往下执行。每个线程执行的指定的点是指线程自身通过调用pthread_barrier_wait来表明当前线程执行到指定点了,等待其他线程调用pthread_barrier_wait。pthread_barrier_wait声明如下:

#include <pthread.h>
int pthread_barrier_wait(pthread_barrier_t *barrier); 

该函数返回0,表明还需要等待其他线程调用pthread_barrier_wait,返回PTHREAD_BARRIER_SERIAL_THREAD表明所有线程可以继续往下执行了,具体那个线程返回PTHREAD_BARRIER_SERIAL_THREAD则是随机的。下面是用barrier来实现多个线程的排序,代码如下:

#include <stdlib.h>
#include <stdio.h>
#include <pthread.h>
#include <limits.h>
#include <sys/time.h>

#define NTHR   8                /* number of threads */
#define NUMNUM 800L         /* number of numbers to sort */
#define TNUM   (NUMNUM/NTHR)    /* number to sort per thread */

long nums[NUMNUM];
long snums[NUMNUM];

pthread_barrier_t b;

#define heapsort qsort

/*
 * Compare two long integers (helper function for heapsort)
 */
int complong(const void *arg1, const void *arg2)
{
    long l1 = *(long *)arg1;
    long l2 = *(long *)arg2;

    if (l1 == l2)
        return 0;
    else if (l1 < l2)
        return -1;
    else
        return 1;
}

/*
 * Worker thread to sort a portion of the set of numbers.
 */
void * thr_fn(void *arg)
{
    long    idx = (long)arg;

    heapsort(&nums[idx], TNUM, sizeof(long), complong);
    pthread_barrier_wait(&b);

    /*
     * Go off and perform more work ...
     */
    return((void *)0);
}

/*
 * Merge the results of the individual sorted ranges.
 */
void merge()
{
    long    idx[NTHR];
    long    i, minidx, sidx, num;

    for (i = 0; i < NTHR; i++)
        idx[i] = i * TNUM;
    for (sidx = 0; sidx < NUMNUM; sidx++) {
        num = LONG_MAX;
        for (i = 0; i < NTHR; i++) {
            if ((idx[i] < (i+1)*TNUM) && (nums[idx[i]] < num)) {
                num = nums[idx[i]];
                minidx = i;
            }
        }
        snums[sidx] = nums[idx[minidx]];
        idx[minidx]++;
    }
}

int main()
{
    unsigned long   i;
    struct timeval  start, end;
    long long       startusec, endusec;
    double          elapsed;
    int             err;
    pthread_t       tid;

    /*
     * Create the initial set of numbers to sort.
     */
    srandom(1);
    for (i = 0; i < NUMNUM; i++)
        nums[i] = random();

    /*
     * Create 8 threads to sort the numbers.
     */
    gettimeofday(&start, NULL);
    pthread_barrier_init(&b, NULL, NTHR+1);
    for (i = 0; i < NTHR; i++) {
        err = pthread_create(&tid, NULL, thr_fn, (void *)(i * TNUM));
        if (err != 0)
        {
            printf("can't create thread");
            return -1;
        }
}
    pthread_barrier_wait(&b);
    merge();
    gettimeofday(&end, NULL);

    /*
     * Print the sorted list.
     */
    startusec = start.tv_sec * 1000000 + start.tv_usec;
    endusec = end.tv_sec * 1000000 + end.tv_usec;
    elapsed = (double)(endusec - startusec) / 1000000.0;
    printf("sort took %.4f seconds\n", elapsed);
    for (i = 0; i < NUMNUM; i++)
    {
        if( (i < (NUMNUM - 1)) && (snums[i] > snums[i + 1]) )
            printf("sort failed!\n");
        printf("%ld,", snums[i]);
    }
    printf("\n");

    exit(0);
}

上面程序,有以下几点值得注意:

I)在初始化barrier对象时,指定的线程的个数为工作线程个数+1,加1是加上主线程。

II)在程序中线程并不检测调用的pthread_barrier_wait()的返回值是0还是PTHREAD_BARRIER_SERIAL_THREAD,这是因为我们指定使用主线程来合并其他线程的执行结果。

信号量(semaphores)

信号量即可以用于进程间同步也可以用于同一进程内不同线程间的同步,下面暂时只讨论用于线程同步。一个信号量对象有一个相关联的整数值,该整数值用于大于等于0,对于一个已初始化的信号量对象,可以执行两类操作:一种是通过调用sem_wait对相应的整数值减1,即通常说的P操作,若当前的信号量的值等于0,则该操作会阻塞线程;另外一种通过sem_post对相应值加1,即通常说的V操作,若有线程阻塞在该信号量上,,则其中有一个线程被唤醒,即被唤醒的线程从sem_wait调用返回。这个两个操作声明如下:

#include <semaphore.h>
int sem_post(sem_t *sem);
int sem_wait(sem_t *sem);

在Linux2.6后,NPTL实现了POSIX所要求的信号量特性。下面是使用信号量的一个简单例子,代码如下:

#include <unistd.h>
#include <pthread.h>
#include <stdio.h>
#include <stdlib.h>
#include <semaphore.h>

#define BUFF_SIZE   5/* total number of slots */
#define NP          3/* total number of producers */
#define NC          3/* total number of consumers */
#define NITERS      4/* number of items produced/consumed */
#define NONITEM     -1/* stand for no item*/

typedef struct {
    int buf[BUFF_SIZE];   /* shared var */
    int in;               /* buf[in%BUFF_SIZE] is the first empty slot */
    int out;              /* buf[out%BUFF_SIZE] is the first full slot */
    sem_t full;           /* keep track of the number of full spots */
    sem_t empty;          /* keep track of the number of empty spots */
    sem_t mutex;          /* enforce mutual exclusion to shared data */
} sbuf_t;

sbuf_t shared;

void *producer(void *arg)
{
    int i, item, index;

    index = (int)arg;

    for (i=0; i < NITERS; i++)
    {   

        /* Produce item */
        item = i;

        /* Prepare to write item to buf */

        /* If there are no empty slots, wait */
        sem_wait(&shared.empty);

        /* If another thread uses the buffer, wait */
        sem_wait(&shared.mutex);
        shared.buf[shared.in] = item;
        shared.in = (shared.in+1)%BUFF_SIZE;
        printf("[P%d] Producing item%d ...\n", index, item);
        fflush(stdout);
        /* Release the buffer */
        sem_post(&shared.mutex);

        /* Increment the number of full slots */
        sem_post(&shared.full);

        /* Interleave producer and consumer execution */
        if (i % 2 == 1)
            sleep(1);
    }
    return NULL;
}

void *consumer(void *arg)
{
    int i, item, index;

    index = (int)arg;

    for (i=0; i < NITERS; i++)
    {

        /* Prepare to read item to buf */
        /* If there are no full slots, wait */
        sem_wait(&shared.full);

        /* If another thread uses the buffer, wait */
        sem_wait(&shared.mutex);/* consume item */
        item = shared.buf[shared.out];
        shared.buf[shared.in] = NONITEM;
        shared.out = (shared.out+1)%BUFF_SIZE;
        printf(" ------> [C%d] Consuming item%d ...\n", index, item);
        fflush(stdout);
        sem_post(&shared.mutex);/* Release the buffer */

        /* Increment the number of empty slots */
        sem_post(&shared.empty);

        /* Interleave producer and consumer execution */
        if (i % 2 == 1)
            sleep(1);
    }
    return NULL;
}

int main()
{
    pthread_t idP[NP], idC[NC];
    int index;

    /*initialize an unnamed semaphore*/
    sem_init(&shared.full, 0, 0);
    sem_init(&shared.empty, 0, BUFF_SIZE);

    /*initialize mutex*/
    sem_init(&shared.mutex, 0, 1);

    /* Create NP producer */
for (index = 0; index < NP; index++)
    {
        pthread_create(&idP[index], NULL, producer, (void*)index);
    }

    /*Create NC consumers */
    for (index = 0; index < NC; index++)
    {
        pthread_create(&idC[index], NULL, consumer, (void*)index);
    }

    /* wait for all producers and the consumer */
    for (index = 0; index < NP; index++)
    {
        pthread_join(idP[index], NULL);
    }
    for (index = 0; index < NC; index++)
    {
        pthread_join(idC[index], NULL);
    }

    exit(0);
}

编译运行程序结果如下:

$gcc -o sem_example -Wall -lpthread  sem_example.c
$./sem_example
[P1] Producing item0 ...
[P0] Producing item0 ...
[P0] Producing item1 ...
[P2] Producing item0 ...
[P2] Producing item1 ...
------> [C1] Consuming item0 ...
------> [C1] Consuming item0 ...
------> [C2] Consuming item1 ...
------> [C2] Consuming item0 ...
------> [C0] Consuming item1 ...
[P1] Producing item1 ...
------> [C0] Consuming item1 ...
[P0] Producing item2 ...
[P0] Producing item3 ...
------> [C1] Consuming item2 ...
------> [C1] Consuming item3 ...
[P1] Producing item2 ...
[P1] Producing item3 ...
[P2] Producing item2 ...
[P2] Producing item3 ...
------> [C0] Consuming item2 ...
------> [C0] Consuming item3 ...
------> [C2] Consuming item2 ...
------> [C2] Consuming item3 ...

上面程序用信号量实现了一个简单的多个生产者和多个消费者的问题。关于上面程序有几点值得说明的:

I)可以使用sem_init来初始化为一个信号量,其声明如下:

#include <semaphore.h>
int sem_init(sem_t *sem, int pshared, unsigned int value);

其中参数value值为信号量初始值。参数pshared用来指定该信号量是用于进程之间同步还是线程之间同步。如pshared的值为0,则用于线程之间同步,这时候参数sem信号量对象地址,应该是对同步的线程的可见的,通常此时信号量对象被声明为全局对象,或者动态的在堆上分配。如果pshared的值非0,则说明该信号量用于进程之间同步,这时候参数sem的值应该在同步进程之间的共享内存中,比如通过mmap或shm_open等调用返回这个地址。对一个已经初始化的信号量再次调用sem_init,其结果末定义的。

另外关于信号量还值得注意的是,Linux实现是遵循POSIX中规则的,并且在Linux2.6版本后,glibc中使用的是NPTL实现,完全实现POSIX中的规则。对于信号量用于进程通信时,信号量是随内核持续性的(kernel persistence):如果进程中不调用sem_unlink,则即使进程结束了,信号量占用的资源还在。在linux中,命名信号量在虚拟文件系统中创建,通常是/dev/shm,并且命名形式为sem.somename,这也是为什么信号量名字长度为NAME_MAX-4。

各种同步方法比较

下面对的之前讨论的各种同步方法使用场合和优缺点做一个简单的总结:

I)在对某个数据或某段代码的访问要求一次只能只有一个线程可以访问情况时,这时可以使用到互斥量,所有线程约定在操作之前,都要获得该互斥量的锁才行,而对于互斥量来说,它具有这样的特性:一次只能有一个线程拥有这个锁。

II)条件变量通常使用在下面这种情况:一个线程需要某个条件成立才能往下执行,否则就阻塞,而另外一个线程可以修改这个条件,使得条件成立,当条件成立时,就可以使用相关接口唤醒阻塞在相应条件变量的线程,并且条件变量必须与互斥量结合使用,因为它的实现依赖于互斥量的状态。结合条件变量和互斥量使用,而不仅仅使用互斥量,可以防止等待条件成立的线程忙等待。

III)读写锁的引入是对操作类型的进一步细分,即区分读操作和写操作。对于互斥量来说,要么是锁住状态要么是不加锁锁状态,而且一次只有一个线程可以对其加锁,不区分这个线程将要执行的读操作还是写操作;而读写锁对线程的读数据加锁请求和写数据加锁请求进行了区分,从而在读操作次数远远大于写操作次数情况下,比仅使用互斥量,程序会有更高的并发性。

IV)在获得互斥量的锁线程阻塞时,线程会进入睡眠状态,而在获取自旋锁时,线程会处于忙等待(busy-waiting)状态,即不会让出CPU,消耗CPU资源,反复尝试是否能获得自旋锁,直到得到为止。自旋锁适用于这样的情况:线程持有自旋锁的时间比较短并且线程不想消耗重新调度的花费。

V)barrier主要用于协调多个线程并行(parallel)共同完成某项任务,这些线程通常做类似的工作,只不过是针对不同的数据操作,比如执行排序操作。

VI)信号量有一个与之关联的状态(它的计数器),对于互斥量来说,这个计数器就是1,并且互斥量必须总是由它上锁的线程解锁,而信号量的post却不必由执行它的wait操作的同一线程执行。信号量post操作后,其相应的计数器总是加1被记住,而如果当一个条件变量发送信号时,如果没有线程等待在该条件变量上,那么该信号总是丢失。在Poxix.1基本原理中称,有了互斥量和条件变量还提供信号量的主要目的是提供一种进程间同步方式。

上面虽然列车各种同步方法的使用场合,但是通常用的最多的还是互斥量和条件变量,其他的同步方法在极少情况下会使用到。实质上,比如读写锁和信号量高级原语都可以使用互斥量和条件变量来实现。

参考资料

http://man7.org/linux/man-pages/man7/sem_overview.7.html

http://www.csc.villanova.edu/~mdamian/threads/posixsem.html

http://www.jbox.dk/sanos/source/lib/pthread/

《UNIX环境高级编程》 11.6线程的同步

时间: 2024-10-27 06:54:39

浅析线程间通信三:Barriers、信号量(semaphores)以及各种同步方法比较的相关文章

浅析线程间通信一:互斥量和条件变量

线程同步的目的简单来讲就是保证数据的一致性.在Linux中,常用的线程同步方法有互斥量( mutex ).读写锁和条件变量,合理使用这三种方法可以保证数据的一致性,但值得的注意的是,在设计应用程序时,所有的线程都必须遵守相同的数据访问规则为前提,才能保证这些同步方法有效,如果允许某个线程在没有得到访问权限(比如锁)的情况下访问共享资源,那么其他线程在使用共享资源前都获得了锁,也会出现数据不一致的问题.另外还有自旋锁.barrier和信号量线程同步方法.本文将讨论互斥量和条件变量的使用,并给出了相

浅析线程间通信二:读写锁和自旋锁

上文讨论了互斥量和条件变量用于线程的同步,本文将讨论读写锁和自旋锁的使用,并给出了相应的代码和注意事项,相关代码也可在我的github上下载. 读写锁 对于互斥量要么是锁住状态要么是不加锁锁状态,而且一次只有一个线程可以对其加锁,而读写锁对线程的读数据加锁请求和写数据加锁请求进行了区分,从而在某些情况下,程序有更高的并发性.对于读写锁,一次只有一个线程可以占有写模式的读写锁,但是多个线程可以同时占有读模式的读写锁.虽然读写锁的实现各不相同,但当读写锁处于读模式锁住状态时,如果有另外的线程试图以写

线程间通信机制posix匿名信号量

信号量分为两种 一种是简单的信号量,另一种是用于进程间通信的信号量集. 简单的信号量 属于POSIX标准的信号量; 从信号量的命名来看,信号量又可分为命名信号量和匿名(未命名)信号量; 从信号量的值来看,信号量可分为二进制信号量和计数信号量; 1.匿名信号量和命名信号量: 匿名信号量是在内存中分配内存.进行初始化并由系统API进行管理的,它可以在多个线程之间进行资源同步,也可以在多个进程之间进行资源同步,这主要是看在初始化的时候给pshared传递的参数值,为0,则在线程之间同步,非0,则在进程

线程间通信的三种方式(NSThread,GCD,NSOperation)

一.NSThread线程间通信 #import "ViewController.h" @interface ViewController ()<UIScrollViewDelegate> @property (strong, nonatomic) IBOutlet UIScrollView *scrollView; @property (weak, nonatomic)  UIImageView *imageView; @end @implementation ViewCo

【转】VC 线程间通信的三种方式

原文网址:http://my.oschina.net/laopiao/blog/94728 1.使用全局变量(窗体不适用)      实现线程间通信的方法有很多,常用的主要是通过全局变量.自定义消息和事件对象等来实现的.其中又以对全局变量的使用最为简洁.该方法将全局变量作为线程监视的对象,并通过在主线程对此变量值的改变而实现对子线程的控制.      由于这里的全局变量需要在使用它的线程之外对其值进行改变,这就需要通过volatile关键字对此变量进行说明.使用全局变量进行线程通信的方法非常简单

iOS开发NSOperation 三:操作依赖和监听以及线程间通信

一:操作依赖和监听 #import "ViewController.h" @interface ViewController () @end @implementation ViewController /** * 1:NSOperation的使用:1:先创建队列NSOperationQueue:若不创建队列直接封装任务则默认在当前线程中串行执行任务,其队列分为两种主队列和非主队列,主队列和GCD中的主队列一样[NSOperationQueue mainQueue],而alloc in

2016年4月24日_JAVA学习笔记_多线程三_线程间通信

1.毕老师第十四天内容,线程间的通信.大概是使用wait(),notify()等一系列函数来控制各个线程的CPU执行资格和执行权,通过合适的时机在各个线程当中切换来达到线程间通信的目的. 涉及到的方法: wait():让线程处于等待状态,被wait()的线程会被存储到线程池当中,直到被唤醒.只能在同步方法中被调用. notify():随机选择一个在该对象上调用wait方法的线程,解除其阻塞状态.只能在同步方法和同步代码块中被调用. notifyAll():接触所有在该对象上调用wait()方法的

进程间通信与线程间通信

序 今天被问及进程间通信的问题,发现自己了解的并不够,所以,对此好好总结一番~ 操作系统的主要任务是管理计算机的软件.硬件资源.现代操作系统的主要特点是多用户和多任务,也就是程序的并行执行,windows如此linux也是如此.所以操作系统就借助于进程来管理计算机的软.硬件资源,支持多任务的并行执行.要并行执行就需要多进程.多线程.因此多进程和多线程间为了完成一定的任务,就需要进行一定的通信.而线程间通信又和进程间的通信不同.由于进程的数据空间相对独立而线程是共享数据空间的,彼此通信机制也很不同

线程间通信和线程互斥

线程间通信 1> 线程间通信分为两种 主线程进入子线程(前面的方法都可以) 子线程回到主线程 2> 返回主线程 3> 代码 这个案例的思路是:当我触摸屏幕时,会在子线程加载图片,然后在主线程刷新UI界面 视图布局我就不写了,大家自己来吧,线程间通信代码如下: #pragma mark - 添加响应方法触发创建子线程并加载数据 - (void)touchesBegan:(NSSet<UITouch *> *)touches withEvent:(UIEvent *)event