之前的文章讨论了互斥量、条件变量、读写锁和自旋锁用于线程的同步,本文将首先讨论Barriers和信号量的使用,并给出了相应的代码和注意事项,相关代码也可在我的github上下载,然后对线程各种同步方法进行了比较。
Barriers
Barriers是一种不同于前面线程同步机制,它主要用于协调多个线程并行(parallel)共同完成某项任务。一个barrier对象可以使得每个线程阻塞,直到所有协同(合作完成某项任务)的线程执行到某个指定的点,才让这些线程继续执行。前面使用的pthread_join调用也可以看成简单的barrier,那里主线程要求其他线程都退出后,才能继续执行。可以使用接口pthread_barrier_init来初始化一个对象,用phread_barrier_destory来销毁一个barrier对象。他们声明如下:
#include <pthread.h> int pthread_barrier_destroy(pthread_barrier_t *barrier); int pthread_barrier_init(pthread_barrier_t *restrict barrier, const pthread_barrierattr_t *restrict attr, unsigned count);
当使用pthread_barrier_init接口初始化hygienebarrier对象时,参数count指定需要多少个线程执行到指定的点,才能使得所有线程继续往下执行。每个线程执行的指定的点是指线程自身通过调用pthread_barrier_wait来表明当前线程执行到指定点了,等待其他线程调用pthread_barrier_wait。pthread_barrier_wait声明如下:
#include <pthread.h> int pthread_barrier_wait(pthread_barrier_t *barrier);
该函数返回0,表明还需要等待其他线程调用pthread_barrier_wait,返回PTHREAD_BARRIER_SERIAL_THREAD表明所有线程可以继续往下执行了,具体那个线程返回PTHREAD_BARRIER_SERIAL_THREAD则是随机的。下面是用barrier来实现多个线程的排序,代码如下:
#include <stdlib.h> #include <stdio.h> #include <pthread.h> #include <limits.h> #include <sys/time.h> #define NTHR 8 /* number of threads */ #define NUMNUM 800L /* number of numbers to sort */ #define TNUM (NUMNUM/NTHR) /* number to sort per thread */ long nums[NUMNUM]; long snums[NUMNUM]; pthread_barrier_t b; #define heapsort qsort /* * Compare two long integers (helper function for heapsort) */ int complong(const void *arg1, const void *arg2) { long l1 = *(long *)arg1; long l2 = *(long *)arg2; if (l1 == l2) return 0; else if (l1 < l2) return -1; else return 1; } /* * Worker thread to sort a portion of the set of numbers. */ void * thr_fn(void *arg) { long idx = (long)arg; heapsort(&nums[idx], TNUM, sizeof(long), complong); pthread_barrier_wait(&b); /* * Go off and perform more work ... */ return((void *)0); } /* * Merge the results of the individual sorted ranges. */ void merge() { long idx[NTHR]; long i, minidx, sidx, num; for (i = 0; i < NTHR; i++) idx[i] = i * TNUM; for (sidx = 0; sidx < NUMNUM; sidx++) { num = LONG_MAX; for (i = 0; i < NTHR; i++) { if ((idx[i] < (i+1)*TNUM) && (nums[idx[i]] < num)) { num = nums[idx[i]]; minidx = i; } } snums[sidx] = nums[idx[minidx]]; idx[minidx]++; } } int main() { unsigned long i; struct timeval start, end; long long startusec, endusec; double elapsed; int err; pthread_t tid; /* * Create the initial set of numbers to sort. */ srandom(1); for (i = 0; i < NUMNUM; i++) nums[i] = random(); /* * Create 8 threads to sort the numbers. */ gettimeofday(&start, NULL); pthread_barrier_init(&b, NULL, NTHR+1); for (i = 0; i < NTHR; i++) { err = pthread_create(&tid, NULL, thr_fn, (void *)(i * TNUM)); if (err != 0) { printf("can't create thread"); return -1; } } pthread_barrier_wait(&b); merge(); gettimeofday(&end, NULL); /* * Print the sorted list. */ startusec = start.tv_sec * 1000000 + start.tv_usec; endusec = end.tv_sec * 1000000 + end.tv_usec; elapsed = (double)(endusec - startusec) / 1000000.0; printf("sort took %.4f seconds\n", elapsed); for (i = 0; i < NUMNUM; i++) { if( (i < (NUMNUM - 1)) && (snums[i] > snums[i + 1]) ) printf("sort failed!\n"); printf("%ld,", snums[i]); } printf("\n"); exit(0); }
上面程序,有以下几点值得注意:
I)在初始化barrier对象时,指定的线程的个数为工作线程个数+1,加1是加上主线程。
II)在程序中线程并不检测调用的pthread_barrier_wait()的返回值是0还是PTHREAD_BARRIER_SERIAL_THREAD,这是因为我们指定使用主线程来合并其他线程的执行结果。
信号量(semaphores)
信号量即可以用于进程间同步也可以用于同一进程内不同线程间的同步,下面暂时只讨论用于线程同步。一个信号量对象有一个相关联的整数值,该整数值用于大于等于0,对于一个已初始化的信号量对象,可以执行两类操作:一种是通过调用sem_wait对相应的整数值减1,即通常说的P操作,若当前的信号量的值等于0,则该操作会阻塞线程;另外一种通过sem_post对相应值加1,即通常说的V操作,若有线程阻塞在该信号量上,,则其中有一个线程被唤醒,即被唤醒的线程从sem_wait调用返回。这个两个操作声明如下:
#include <semaphore.h> int sem_post(sem_t *sem); int sem_wait(sem_t *sem);
在Linux2.6后,NPTL实现了POSIX所要求的信号量特性。下面是使用信号量的一个简单例子,代码如下:
#include <unistd.h> #include <pthread.h> #include <stdio.h> #include <stdlib.h> #include <semaphore.h> #define BUFF_SIZE 5/* total number of slots */ #define NP 3/* total number of producers */ #define NC 3/* total number of consumers */ #define NITERS 4/* number of items produced/consumed */ #define NONITEM -1/* stand for no item*/ typedef struct { int buf[BUFF_SIZE]; /* shared var */ int in; /* buf[in%BUFF_SIZE] is the first empty slot */ int out; /* buf[out%BUFF_SIZE] is the first full slot */ sem_t full; /* keep track of the number of full spots */ sem_t empty; /* keep track of the number of empty spots */ sem_t mutex; /* enforce mutual exclusion to shared data */ } sbuf_t; sbuf_t shared; void *producer(void *arg) { int i, item, index; index = (int)arg; for (i=0; i < NITERS; i++) { /* Produce item */ item = i; /* Prepare to write item to buf */ /* If there are no empty slots, wait */ sem_wait(&shared.empty); /* If another thread uses the buffer, wait */ sem_wait(&shared.mutex); shared.buf[shared.in] = item; shared.in = (shared.in+1)%BUFF_SIZE; printf("[P%d] Producing item%d ...\n", index, item); fflush(stdout); /* Release the buffer */ sem_post(&shared.mutex); /* Increment the number of full slots */ sem_post(&shared.full); /* Interleave producer and consumer execution */ if (i % 2 == 1) sleep(1); } return NULL; } void *consumer(void *arg) { int i, item, index; index = (int)arg; for (i=0; i < NITERS; i++) { /* Prepare to read item to buf */ /* If there are no full slots, wait */ sem_wait(&shared.full); /* If another thread uses the buffer, wait */ sem_wait(&shared.mutex);/* consume item */ item = shared.buf[shared.out]; shared.buf[shared.in] = NONITEM; shared.out = (shared.out+1)%BUFF_SIZE; printf(" ------> [C%d] Consuming item%d ...\n", index, item); fflush(stdout); sem_post(&shared.mutex);/* Release the buffer */ /* Increment the number of empty slots */ sem_post(&shared.empty); /* Interleave producer and consumer execution */ if (i % 2 == 1) sleep(1); } return NULL; } int main() { pthread_t idP[NP], idC[NC]; int index; /*initialize an unnamed semaphore*/ sem_init(&shared.full, 0, 0); sem_init(&shared.empty, 0, BUFF_SIZE); /*initialize mutex*/ sem_init(&shared.mutex, 0, 1); /* Create NP producer */ for (index = 0; index < NP; index++) { pthread_create(&idP[index], NULL, producer, (void*)index); } /*Create NC consumers */ for (index = 0; index < NC; index++) { pthread_create(&idC[index], NULL, consumer, (void*)index); } /* wait for all producers and the consumer */ for (index = 0; index < NP; index++) { pthread_join(idP[index], NULL); } for (index = 0; index < NC; index++) { pthread_join(idC[index], NULL); } exit(0); }
编译运行程序结果如下:
$gcc -o sem_example -Wall -lpthread sem_example.c $./sem_example [P1] Producing item0 ... [P0] Producing item0 ... [P0] Producing item1 ... [P2] Producing item0 ... [P2] Producing item1 ... ------> [C1] Consuming item0 ... ------> [C1] Consuming item0 ... ------> [C2] Consuming item1 ... ------> [C2] Consuming item0 ... ------> [C0] Consuming item1 ... [P1] Producing item1 ... ------> [C0] Consuming item1 ... [P0] Producing item2 ... [P0] Producing item3 ... ------> [C1] Consuming item2 ... ------> [C1] Consuming item3 ... [P1] Producing item2 ... [P1] Producing item3 ... [P2] Producing item2 ... [P2] Producing item3 ... ------> [C0] Consuming item2 ... ------> [C0] Consuming item3 ... ------> [C2] Consuming item2 ... ------> [C2] Consuming item3 ...
上面程序用信号量实现了一个简单的多个生产者和多个消费者的问题。关于上面程序有几点值得说明的:
I)可以使用sem_init来初始化为一个信号量,其声明如下:
#include <semaphore.h> int sem_init(sem_t *sem, int pshared, unsigned int value);
其中参数value值为信号量初始值。参数pshared用来指定该信号量是用于进程之间同步还是线程之间同步。如pshared的值为0,则用于线程之间同步,这时候参数sem信号量对象地址,应该是对同步的线程的可见的,通常此时信号量对象被声明为全局对象,或者动态的在堆上分配。如果pshared的值非0,则说明该信号量用于进程之间同步,这时候参数sem的值应该在同步进程之间的共享内存中,比如通过mmap或shm_open等调用返回这个地址。对一个已经初始化的信号量再次调用sem_init,其结果末定义的。
另外关于信号量还值得注意的是,Linux实现是遵循POSIX中规则的,并且在Linux2.6版本后,glibc中使用的是NPTL实现,完全实现POSIX中的规则。对于信号量用于进程通信时,信号量是随内核持续性的(kernel persistence):如果进程中不调用sem_unlink,则即使进程结束了,信号量占用的资源还在。在linux中,命名信号量在虚拟文件系统中创建,通常是/dev/shm,并且命名形式为sem.somename,这也是为什么信号量名字长度为NAME_MAX-4。
各种同步方法比较
下面对的之前讨论的各种同步方法使用场合和优缺点做一个简单的总结:
I)在对某个数据或某段代码的访问要求一次只能只有一个线程可以访问情况时,这时可以使用到互斥量,所有线程约定在操作之前,都要获得该互斥量的锁才行,而对于互斥量来说,它具有这样的特性:一次只能有一个线程拥有这个锁。
II)条件变量通常使用在下面这种情况:一个线程需要某个条件成立才能往下执行,否则就阻塞,而另外一个线程可以修改这个条件,使得条件成立,当条件成立时,就可以使用相关接口唤醒阻塞在相应条件变量的线程,并且条件变量必须与互斥量结合使用,因为它的实现依赖于互斥量的状态。结合条件变量和互斥量使用,而不仅仅使用互斥量,可以防止等待条件成立的线程忙等待。
III)读写锁的引入是对操作类型的进一步细分,即区分读操作和写操作。对于互斥量来说,要么是锁住状态要么是不加锁锁状态,而且一次只有一个线程可以对其加锁,不区分这个线程将要执行的读操作还是写操作;而读写锁对线程的读数据加锁请求和写数据加锁请求进行了区分,从而在读操作次数远远大于写操作次数情况下,比仅使用互斥量,程序会有更高的并发性。
IV)在获得互斥量的锁线程阻塞时,线程会进入睡眠状态,而在获取自旋锁时,线程会处于忙等待(busy-waiting)状态,即不会让出CPU,消耗CPU资源,反复尝试是否能获得自旋锁,直到得到为止。自旋锁适用于这样的情况:线程持有自旋锁的时间比较短并且线程不想消耗重新调度的花费。
V)barrier主要用于协调多个线程并行(parallel)共同完成某项任务,这些线程通常做类似的工作,只不过是针对不同的数据操作,比如执行排序操作。
VI)信号量有一个与之关联的状态(它的计数器),对于互斥量来说,这个计数器就是1,并且互斥量必须总是由它上锁的线程解锁,而信号量的post却不必由执行它的wait操作的同一线程执行。信号量post操作后,其相应的计数器总是加1被记住,而如果当一个条件变量发送信号时,如果没有线程等待在该条件变量上,那么该信号总是丢失。在Poxix.1基本原理中称,有了互斥量和条件变量还提供信号量的主要目的是提供一种进程间同步方式。
上面虽然列车各种同步方法的使用场合,但是通常用的最多的还是互斥量和条件变量,其他的同步方法在极少情况下会使用到。实质上,比如读写锁和信号量高级原语都可以使用互斥量和条件变量来实现。
参考资料
http://man7.org/linux/man-pages/man7/sem_overview.7.html
http://www.csc.villanova.edu/~mdamian/threads/posixsem.html
http://www.jbox.dk/sanos/source/lib/pthread/
《UNIX环境高级编程》 11.6线程的同步