线程同步
A. mutex (互斥量)
多个线程同时访问共享数据时可能会冲突,这跟前面讲信号时所说的可重要性是同样的问
题。假如 两个线程都要把某个全局变量增加1,这个操作在某平台需要三条指令完成:
1. 从内存读变量值到寄存器
2. 寄存器的值加1
3. 将寄存器的值写回内存
我们通过一个简单的程序观察这一现象。上图所描述的现象从理论上是存在这种可能的,但实际运行程序时很难观察到,为了使现象更容易观察到,我们把上述三条指令做的事情用更多条指令来做:
我们在“读取变量的值”和“把变量的新值保存回去”这两步操作之间插入一个printf调用,它会执 行write系统调用进内核,为内核调度别的线程执行提供了一个很好的时机。我们在这个循环中重 复上述操作几千次,就会观察到访问冲突的现象。
代码:
#include<stdio.h> #include<stdlib.h> #include<pthread.h> #define NLOOP 5000 static int g_val = 0; void *read_write_mem(void* arg) { int i=0, _count=0 ; for(; i<NLOOP; ++i) { _count = g_val; printf("pthread is run,: %u, g_val is: %d\n", (unsigned long)pthread_self(),g_val); g_val = _count + 1; } return NULL; } int main() { pthread_t id1,id2; pthread_create(&id1, NULL, read_write_mem, "thread1 run..."); pthread_create(&id2, NULL, read_write_mem, "thread2 run..."); pthread_join(id1, NULL); pthread_join(id2, NULL); printf("final g_val is : %d\n",g_val); return 0; }
我们创建两个线程,各自把g_val增加5000次,正常情况下最后g_val应该等于10000,但事实
上每次运行该程序的结果都不一样,有时候数到5000多,有时候数到6000多。
运行结果:
对于多线程的程序,访问冲突的问题是很普遍的,解决的办法是引入互斥锁(Mutex,Mutual Exclusive Lock),获得锁的线程可以完成“读-修改-写”的操作,然后释放锁给其它线程,没有获得锁的线程只能等待而不能访问共享数据,这样“读-修改-写”三步操作组成一个原子操作,要么都执行,要么都不执行,不会执行到中间被打断,也不会在其它处理器上并行做这个操作。
Mutex:pthread_mutex_t类型的变量表示,可以这样初始化和销毁:
SYNOPSIS:
#include<pthread.h>
int pthread_mutex_destroy(pthread_mutex_t *mutex);
int pthread_mutex_init(pthread_mutex_t *mutex, const pthread_mutex_t *restrict mutex, const pthread_mutexattr_t *restrict attr);
pthread_mutex_t mutex = PTHREAD_MUTEX_INITIALIZER;
pthread_mutex_init函数对Mutex做初始化,参数attr设定Mutex的属性,如果attr为NULL则表示缺省属性,本章不详细介绍Mutex属性,感兴趣的读者可以参考[APUE2e]。
用pthread_mutex_init函 数初始化的Mutex可以用pthread_mutex_destroy销毁。如果Mutex变量是静态分配的(全局变量 或static变量),
也可以用宏定义PTHREAD_MUTEX_INITIALIZER来初始化,相当于 用pthread_mutex_init初始化并且attr参数为NULL。Mutex的加锁和解锁操作可以用下列函数:
SYNOPSIS:
#include<pthread.h>
int pthread_mutex_lock(pthread_mutex_t *mutex);
int pthread_mutex_trylock(pthread_mutex_t *mutex);
int pthread_mutex_unlock(pthread_mutex_t *mutex);
返回值:成功返回0,失败返回错误号。
一个线程可以调用pthread_mutex_lock获得Mutex,如果这时另一个线程已经调用
pthread_mutex_lock获得了该Mutex,则当前线程需要挂起等待,直到另一个线程调用
pthread_mutex_unlock释放Mutex,当前线程被唤醒,才能获得该Mutex并继续执行。
如果一个线程既想获得锁,又不想挂起等待,可以调用pthread_mutex_trylock,如果Mutex已
经被 另一个线程获得,这个函数会失败返回EBUSY,而不会使线程挂起等待。
好,解决上面的问题!
代码:
#include<stdio.h> #include<stdlib.h> #include<pthread.h> #define NLOOP 5000 static int g_val = 0; static pthread_mutex_t mutex_lock = PTHREAD_MUTEX_INITIALIZER; void *read_write_mem(void* arg) { int i=0, _count=0 ; for(; i<NLOOP; ++i) { pthread_mutex_lock(&mutex_lock); _count = g_val; printf("pthread is run,: %u, g_val is: %d\n", (unsigned long)pthread_self(),g_val); g_val = _count + 1; pthread_mutex_unlock(&mutex_lock); } return NULL; } int main() { pthread_t id1,id2; pthread_create(&id1, NULL, read_write_mem, "thread1 run..."); pthread_create(&id2, NULL, read_write_mem, "thread2 run..."); pthread_join(id1, NULL); pthread_join(id2, NULL); printf("final g_val is : %d\n",g_val); return 0; }
运行结果:
Mutex变量 的值为1表示互斥锁空闲,这时某个进程调用lock可以获得锁,而Mutex的值为0表示互斥锁已经被 某个线程获得,其它线程再调用lock只能挂起等待。
B.ondition Variable(条件变量)
#include<pthread.h>
int pthread_cond_destroy(pthread_cond_t *cond);
int pthread_cond_init(pthread_cond_t *cond, const pthread_condattr_t *restrict attr); -->初始化一个条件变量。如果条件变量是静态分配的,也乐意用宏定义PTHREAD_COND_INITIALIZER初始化,相当于用pthread_cond_init函数初始化并且attr参数为NULL。
pthread_cond_wait(pthread_cond_t *restrict cond, pthread_mutex_t *restrict mutex);
P ,V
pthread_cond_t cond = PTHREAD_COND_INITIALIZER;
线程间的同步还有这样一种情况:线程A需要等某个条件成立才能继续往下执行,现在这个条件不成立,线程A救你阻塞等待,而线程B在执行过程中使这个条件成立了,就唤醒线程A继续执行。
可见,一个Condition Variable总是和一个Mutex搭配使用的。一个线程可以调用pthread_cond_wait在一个Condition Variable上阻塞等待,这个函数做以下三步操作:
1. 释放Mutex
2. 阻塞等待
3. 当被唤醒时,重新获得Mutex并返回
pthread_cond_timedwait函数还有一个额外的参数可以设定等待超时,如果到达了abstime所指定的 时刻仍然没有别的线程来唤醒当前线程,就返回ETIMEDOUT。一个线程可以调用pthread_cond_signal唤醒在某个Condition Variable上等待的另一个线程,也可以调用pthread_cond_broadcast唤醒在这个Condition Variable上等待的所有线程。
下面的程序演示了一个生产者-消费者的例子,生产者生产一个结构体串在链表的表头上,消费者从表头取出结构体。
代码:
//comm.h
#include<stdio.h> #include<stdlib.h> #include<pthread.h> #include<unistd.h> static pthread_mutex_t lock; static pthread_cond_t need_product; void *product(); void *consumer();
//comm.c
#include"comm.h" static pthread_mutex_t lock = PTHREAD_MUTEX_INITIALIZER; static pthread_cond_t need_product = PTHREAD_COND_INITIALIZER; typedef struct _list_node_t { int product; struct _list_node_t *next; }list_node_t; list_node_t *head; void init_list(list_node_t *p) { p->product = 0; p->next = NULL; } void *product() { while(1) { sleep(rand()%4); pthread_mutex_lock(&lock); list_node_t *p = (list_node_t*)malloc(sizeof(list_node_t)); init_list(p); p->product = rand()%10000; p->next = head; head = p; pthread_mutex_unlock(&lock); printf("product done...singal consumer : %d\n",p->product); pthread_cond_signal(&need_product); } return NULL; } void *consumer() { while(1) { //sleep(rand()%4); pthread_mutex_lock(&lock); list_node_t *q = head; while( !head ) { printf("product is empty! need product"); pthread_cond_wait(&need_product,&lock); //wait } q = head; head = head->next; q->next = NULL; pthread_mutex_unlock(&lock); printf("consumer done... %d\n",q->product); free(q); q = NULL; } return NULL; }
//main.c
#include"comm.h" void *product_run(void *arg) { printf("%s\n",(char*)arg); product(); return NULL; } void *consumer_run(void *arg) { printf("%s\n",(char*)arg); consumer(); return NULL; } int main() { pthread_t id1,id2; pthread_create(&id1, NULL, product_run, "product"); pthread_create(&id2, NULL, consumer_run, "consumer"); pthread_join(id1, NULL); pthread_join(id2, NULL); return 1; }
//Makefile
prod_cons_model:main.o comm.o gcc -o prod_cons_model main.o comm.o -lpthread main.o:main.c comm.h gcc -c main.c comm.o:comm.c comm.h gcc -c comm.c .PHONY:clean clean: rm *.o prod_cons_model
运行结果: