嵌入式 Linux进程间通信(十二)——多线程同步
多线程编程中有三种线程同步机制:互斥锁、信号量、条件量。本文将使用生产者消费者问题编程实践三种线程同步方式。
生产者、消费者问题:生产者线程生产物品,然后将物品放置在一个空缓冲区中供消费者线程消费。消费者线程从缓冲区中获得物品,然后释放缓冲区。当生产者线程生产物品时,如果没有空缓冲区可用,那么生产者线程必须等待消费者线程释放出一个空缓冲区。当消费者线程消费物品时,如果没有满的缓冲区,那么消费者线程将被阻塞,直到新的物品被生产出来。
一、互斥锁
互斥锁用来保证同一时间内只有一个线程在执行某段代码(临界区)。互斥锁可看作某种意义上的全局变量。在同一时刻只能有一个线程掌握某个互斥锁,拥有上锁状态的线程能够对共享资源进行操作。若其他线程希望上锁一个已经被上锁的互斥锁,则该线程就会挂起,直到上锁的线程释放掉互斥锁为止。互斥锁保证让每个线程对共享资源按顺序进行原子操作。
互斥锁主要包括下面的基本函数:
互斥锁初始化:pthread_mutex_init()
互斥锁上锁:pthread_mutex_lock()
互斥锁判断上锁:pthread_mutex_trylock()
互斥锁解锁:pthread_mutex_unlock()
消除互斥锁:pthread_mutex_destroy()
1、初始化互斥锁
静态分配:pthread_mutex_t mutex = PTHREAD_MUTEX_INITIALIZER;
动态分配:int pthread_mutex_init(pthread_mutex_t *mp, const pthread_mutexattr_t *mattr);
mp是互斥锁地址
mattr是属性,通常默认 null初始化互斥锁之前,必须将其所在的内存清零。如果互斥锁已初始化,则它会处于未锁定状态。互斥锁可以位于进程之间共享的内存中或者某个进程的专用内存中。
2、锁定互斥锁
int pthread_mutex_lock(pthread_mutex_t *mutex);
当pthread_mutex_lock() 返回时,该互斥锁已被锁定。调用线程是该互斥锁的属主。如果该互斥锁已被另一个线程锁定和拥有,则调用线程将阻塞,直到该互斥锁变为可用为止。
如果互斥锁类型为 PTHREAD_MUTEX_NORMAL,则不提供死锁检测。尝试重新锁定互斥锁会导致死锁。如果某个线程尝试解除锁定的互斥锁不是由该线程锁定或未锁定,则将产生不确定的行为。
如果互斥锁类型为 PTHREAD_MUTEX_ERRORCHECK,则会提供错误检查。如果某个线程尝试重新锁定的互斥锁已经由该线程锁定,则将返回错误。如果某个线程尝试解除锁定的互斥锁不是由该线程锁定或者未锁定,则将返回错误。
如果互斥锁类型为 PTHREAD_MUTEX_RECURSIVE,则该互斥锁会保留锁定计数这一概念。线程首次成功获取互斥锁时,锁定计数会设置为 1。线程每重新锁定该互斥锁一次,锁定计数就增加 1。线程每解除锁定该互斥锁一次,锁定计数就减小 1。 锁定计数达到 0 时,该互斥锁即可供其他线程获取。如果某个线程尝试解除锁定的互斥锁不是由该线程锁定或者未锁定,则将返回错误。
如果互斥锁类型是 PTHREAD_MUTEX_DEFAULT,则尝试以递归方式锁定该互斥锁将产生不确定的行为。对于不是由调用线程锁定的互斥锁,如果尝试解除对它的锁定,则会产生不确定的行为。如果尝试解除锁定尚未锁定的互斥锁,则会产生不确定的行为。
返回值:pthread_mutex_lock() 在成功完成之后会返回零。其他任何返回值都表示出现了错误。如果出现以下任一情况,该函数将失败并返回对应的值。
EAGAIN:由于已超出了互斥锁递归锁定的最大次数,因此无法获取该互斥锁。
EDEADLK:当前线程已经拥有互斥锁。
3、解除锁定互斥锁
int pthread_mutex_unlock(pthread_mutex_t *mutex);
函数说明:pthread_mutex_unlock() 可释放 mutex 引用的互斥锁对象。互斥锁的释放方式取决于互斥锁的类型属性。如果调用 pthread_mutex_unlock() 时有多个线程被 mutex 对象阻塞,则互斥锁变为可用时调度策略可确定获取该互斥锁的线程。对于 PTHREAD_MUTEX_RECURSIVE 类型的互斥锁,当计数达到零并且调用线程不再对该互斥锁进行任何锁定时,该互斥锁将变为可用。
返回值:pthread_mutex_unlock() 在成功完成之后会返回零。其他任何返回值都表示出现了错误。如果出现以下情况,该函数将失败并返回对应的值。
EPERM :当前线程不拥有互斥锁。
4、销毁互斥锁
int pthread_mutex_destroy(pthread_mutex_t *mp);
注意,没有释放用来存储互斥锁的空间。
返回值:pthread_mutex_destroy() 在成功完成之后会返回零。其他任何返回值都表示出现了错误。如果出现以下任一情况,该函数将失败并返回对应的值。
EINVAL: mp 指定的值不会引用已初始化的互斥锁对象。
5、程序实例
本实例使用互斥锁实现生产者、消费者的经典例子,生产者、消费者公用一个缓冲区,缓冲区存放一条消息。
#include <stdio.h> #include <stdlib.h> #include <pthread.h> #include <sys/time.h> static char buff[50]; int msg_flag=0; pthread_mutex_t mutex; void consumeItem(char *buff) { printf("This is a consumer item\n"); } void produceItem(char *buff) { printf("This is a produce item\n"); } void *consumer(void *param) { while (1) { pthread_mutex_lock(&mutex); if (msg_flag > 0) { msg_flag--; consumeItem(buff); } pthread_mutex_unlock(&mutex); sleep(1); } return NULL; } void *producer(void *param) { while (1) { pthread_mutex_lock(&mutex); if (msg_flag ==0 ) { msg_flag++; produceItem(buff); } pthread_mutex_unlock(&mutex); sleep(1); } return NULL; } int main() { pthread_t tid_c, tid_p; void *retval; pthread_mutex_init(&mutex, NULL); pthread_create(&tid_p, NULL, producer, NULL); pthread_create(&tid_c, NULL, consumer, NULL); pthread_join(tid_p, &retval); pthread_join(tid_c, &retval); return 0; }
二、信号量
与进程一样,线程也可以使用信号量来通信。线程使用信号量同步线程的步骤如下:
1、信号量初始化
int sem_init (sem_t *sem , int pshared, unsigned int value);
对sem指定的信号量进行初始化,设置好共享选项(linux只支持为0,即表示它是当前进程的局部信号量),然后给它一个初始值VALUE。
2、等待信号量
int sem_wait(sem_t *sem);
给信号量减1,然后等待直到信号量的值大于0。
3、释放信号量
int sem_post(sem_t *sem);
信号量值加1。并通知其他等待线程。
4、销毁信号量
int sem_destroy(sem_t *sem);
5、程序实例
本实例使用条件变量实现生产者、消费者的经典例子,生产者、消费者共用一个缓冲区,缓冲区存放多条消息。使用信号量使用如下:
#include <stdio.h> #include <stdlib.h> #include <pthread.h> #include <semaphore.h> static char buff[50]; pthread_mutex_t mutex; pthread_mutex_t cond_mutex; pthread_cond_t cond; sem_t msg_cnt; //缓冲区消息数 sem_t idle_cnt; //缓冲区空闲数 void consumeItem(char *buff) { printf("This is a consumer item\n"); } void produceItem(char *buff) { printf("This is a produce item\n"); } void *consumer(void *param) { while (1) { sem_wait(&msg_cnt); pthread_mutex_lock(&mutex); consumeItem(buff); pthread_mutex_unlock(&mutex); sem_post(&idle_cnt); } return NULL; } void *producer(void *param) { while (1) { sem_wait(&idle_cnt); pthread_mutex_lock(&mutex); produceItem(buff); pthread_mutex_unlock(&mutex); sem_post(&msg_cnt); } return NULL; } int main() { pthread_t tid_c, tid_p; void *retval; pthread_mutex_init(&mutex, NULL); pthread_mutex_init(&cond_mutex, NULL); pthread_cond_t cond; pthread_cond_init(&cond, NULL); sem_init(&msg_cnt, 0, 0); //初始缓冲区没有消息 sem_init(&idle_cnt, 0, 10); //初始缓冲区能放10条消息 pthread_create(&tid_p, NULL, producer, NULL); pthread_create(&tid_c, NULL, consumer, NULL); pthread_join(tid_p, &retval); pthread_join(tid_c, &retval); return 0; }
三、条件变量(cond)
与互斥锁不同,条件变量是用来等待而不是用来上锁的。条件变量用来自动阻塞一个线程,直到某特殊情况发生为止。通常条件变量和互斥锁同时使用。条件变量分 为两部分: 条件和变量。条件本身是由互斥量保护的。线程在改变条件状态前先要锁住互斥量。条件变量使我们可以睡眠等待某种条件出现。条件变量是利用线程间共享的全局 变量进行同步的一种机制,主要包括两个动作:一个线程等待"条件变量的条件成立"而挂起;另一个线程使"条件成立"(给出条件成立信号)。条件的检测是在 互斥锁的保护下进行的。如果一个条件为假,一个线程自动阻塞,并释放等待状态改变的互斥锁。如果另一个线程改变了条件,它发信号给关联的条件变量,唤醒一个或多个等待它的线程,重新获得互斥锁,重新评价条件。如果两进程共享可读写的内存,条件变量可以被用来实现这两进程间的线程同步。
条件变量用来阻塞线程等待某个事件的发生,并且当等待的事件发生时,阻塞线程会被通知。互斥锁的缺点是只有两种状态:锁定和非锁定。而条件变量通过允许线程阻塞和等待另一个线程发送信号的方法弥补了互斥锁的不足,常和互斥锁一起使用。使用时,条件变量被用来阻塞一个线程,当条件不满足时,线程往往解开相应的互斥锁并等待条件发生变化。一旦其它的某个线程改变了条件变量,它将通知相应的条件变量唤醒一个或多个正被此条件变量阻塞的线程。这些线程将重新锁定互斥锁并重新测试条件是否满足。一般说来,条件变量被用来进行线承间的同步
1、初始化条件变量
静态初始化:pthread_cond_t cond = PTHREAD_COND_INITIALIER;
动态初始化:ead_cond_init(pthread_cond_t *cond, pthread_condattr_t *cond_attr);
2、等待条件成立
释放锁,同时阻塞等待条件变量为真才行。timewait()设置等待时间,仍未signal,返回ETIMEOUT(加锁保证只有一个线程wait)
int pthread_cond_wait(pthread_cond_t *cond, pthread_mutex_t *mutex);
int thread_cond_timewait(pthread_cond_t *cond,pthread_mutex *mutex,const timespec *abstime);
3、激活条件变量
int pthread_cond_signal(pthread_cond_t *cond);
int pthread_cond_broadcast(pthread_cond_t *cond); 解除所有线程的阻塞
4、清除条件变量
无线程等待,否则返回EBUSY
int pthread_cond_destroy(pthread_cond_t *cond);
5、程序实例
本实例使用条件变量实现生产者、消费者的经典例子,生产者、消费者公用一个缓冲区,缓冲区存放多条消息,有多个消费者。如果都使用互斥锁,那么多个消费者线程都要不断的去查看缓冲区是否有消息,有就取走。如果我们用条件变量,多个消费者线程都可以放心的“睡觉”,缓冲区有消息,消费者会收到通知,进而收取消息就行。
#include <stdio.h> #include <stdlib.h> #include <pthread.h> #include <sys/time.h> static char buff[50]; pthread_mutex_t mutex; pthread_mutex_t cond_mutex; pthread_cond_t cond; void consumeItem(char *buff) { printf("This is a consumer item\n"); } void produceItem(char *buff) { printf("This is a produce item\n"); } void *consumer(void *param) { int t = *(int *)param; while (1) { pthread_cond_wait(&cond, &cond_mutex); pthread_mutex_lock(&mutex); printf("consumer thread %d: ", t); consumeItem(buff); pthread_mutex_unlock(&mutex); } return NULL; } void *producer(void *param) { while (1) { pthread_mutex_lock(&mutex); produceItem(buff); pthread_mutex_unlock(&mutex); pthread_cond_signal(&cond); sleep(1); } return NULL; } int main(int argc, char **argv) { pthread_t tid_c1, tid_p, tid_c2, tid_c3; void *retval; pthread_mutex_init(&mutex, NULL); pthread_mutex_init(&cond_mutex, NULL); pthread_cond_t cond; pthread_cond_init(&cond, NULL); int p[3] = {1, 2, 3}; //线程编号 pthread_create(&tid_p, NULL, producer, NULL); pthread_create(&tid_c1, NULL, consumer, &p[0]); pthread_create(&tid_c2, NULL, consumer, &p[1]); pthread_create(&tid_c3, NULL, consumer, &p[2]); pthread_join(tid_p, &retval); pthread_join(tid_c1, &retval); pthread_join(tid_c2, &retval); pthread_join(tid_c3, &retval); return 0; }