Linux高级编程--09.线程互斥与同步

多个线程同时访问共享数据时可能会冲突,比如两个线程都要把某个全局变量增加1,这个操作在某平台需要三条指令完成:

从内存读变量值到寄存器
寄存器的值加1
将寄存器的值写回内存

假设两个线程在多处理器平台上同时执行这三条指令,则可能导致下图所示的结果,最后变量只加了一次而非两次。

如下例子就演示了这一过程:

#include <stdio.h>
#include <pthread.h>
#include <unistd.h>

int counter; /* incremented by threads */

void *doit(void *vptr)
{
    int i, val;

    for (i = 0; i < 100; i++) {
        val = counter;
        usleep(1000);
        counter = val + 1;
    }

    return NULL;
}

int main(int argc, char **argv)
{
    pthread_t tidA, tidB;

    pthread_create(&tidA, NULL, &doit, NULL);
    pthread_create(&tidB, NULL, &doit, NULL);

    pthread_join(tidA, NULL);
    pthread_join(tidB, NULL);

    printf("counter = %d \n", counter);
    return 0;
}

在这个例子中,虽然每个线程都给counter加了100,但由于结果的互相覆盖,最终输出值不是200,而是100。

[email protected]:/mnt/share/test> run
counter = 100
[email protected]:/mnt/share/test>

互斥锁mutex

对于多线程的程序,访问冲突的问题是很普遍的,解决的办法是引入互斥锁(Mutex,Mutual Exclusive Lock),获得锁的线程可以完成”读-修改-写”的操作,然后释放锁给其它线程,没有获得锁的线程只能等待而不能访问共享数据,这样”读-修改-写”三步操作组成一个原子操作,要么都执行,要么都不执行,不会执行到中间被打断,也不会在其它处理器上并行做这个操作。

和mutext相关的函数有如下几个:

#include <pthread.h>

pthread_mutex_t mutex = PTHREAD_MUTEX_INITIALIZER;
int pthread_mutex_destroy(pthread_mutex_t *mutex);
int pthread_mutex_init(pthread_mutex_t *restrict mutex, const pthread_mutexattr_t *restrict attr);
int pthread_mutex_lock(pthread_mutex_t *mutex);
int pthread_mutex_trylock(pthread_mutex_t *mutex);
int pthread_mutex_unlock(pthread_mutex_t *mutex);

从名字中基本上就能看出来该如何使用,这里就只是以上面的例子为例,用mutex将其改造一下:

#include <stdio.h>
#include <pthread.h>
#include <unistd.h>

int counter; /* incremented by threads */
pthread_mutex_t counter_mutex = PTHREAD_MUTEX_INITIALIZER;

void *doit(void *vptr)
{
    int i, val;

    for (i = 0; i < 100; i++) {
        pthread_mutex_lock(&counter_mutex);

        val = counter;
        usleep(1000);
        counter = val + 1;

        pthread_mutex_unlock(&counter_mutex);
    }

    return NULL;
}

int main(int argc, char **argv)
{
    pthread_mutex_init(&counter_mutex, NULL);

    pthread_t tidA, tidB;

    pthread_create(&tidA, NULL, &doit, NULL);
    pthread_create(&tidB, NULL, &doit, NULL);

    pthread_join(tidA, NULL);
    pthread_join(tidB, NULL);

    pthread_mutex_destroy(&counter_mutex);

    printf("counter = %d \n", counter);
    return 0;
}

这次的运行结果就是正确的了:

[email protected]:/mnt/share/test> run
counter = 200
[email protected]:/mnt/share/test>

条件变量condtion

前面介绍的Mutext主要用于实现互斥,在多线程的场景中往往还有同步的需求。例如,在生产者和消费者的例子中,消费者需要等待生产者产生数据后才能消费。

单用mutex不能解决同步的问题:假如消费者先获取到锁,但此时并没有资源可用,如果把锁占着不放,生产者无法获取锁,此时就成了死锁;如果立即释放,并重新进入,则会成为轮询而消耗cpu资源。

在pthread库中通过可条件变量(Condition Variable)来阻塞等待一个条件,或者唤醒等待这个条件的线程。Condition Variable用pthread_cond_t类型的变量表示,其相关函数如下:

#include <pthread.h>

pthread_cond_t cond = PTHREAD_COND_INITIALIZER;

int pthread_cond_destroy(pthread_cond_t *cond);
int pthread_cond_init(pthread_cond_t *restrict cond, const pthread_condattr_t *restrict attr);

int pthread_cond_timedwait(pthread_cond_t *restrict cond, pthread_mutex_t *restrict mutex, const struct timespec *restrict abstime);
int pthread_cond_wait(pthread_cond_t *restrict cond, pthread_mutex_t *restrict mutex);

int pthread_cond_broadcast(pthread_cond_t *cond);
int pthread_cond_signal(pthread_cond_t *cond);

从它的wait函数可以看到,condtion是和mutext一起使用的,基本流程如下:

消费者获取资源锁,如果当前无可用资源则调用cond_wait函数释放锁,并等待condtion通知。
生产者产生资源后,获取资源锁,放置资源后嗲用cond_signal函数通知。并释放资源锁。
消费者的cond_wait函数等到condtion通知后,重新获取资源锁,消费资源后再次释放资源锁。

从中可以看到,mutex用于保护资源,wait函数用于等待信号,signal和broadcast函数用于通知信号。其中wait函数中有一次对mutex的释放和重新获取操作,因此生产者和消费者并不会出现死锁。

#include <stdlib.h>
#include <pthread.h>
#include <unistd.h>
#include <queue>
#include <iostream>
using namespace std;

queue<int> buffer;
pthread_cond_t has_product = PTHREAD_COND_INITIALIZER;
pthread_mutex_t lock = PTHREAD_MUTEX_INITIALIZER;

void* consumer(void *p)
{
    while(true)
    {
        sleep(rand() % 5);
        pthread_mutex_lock(&lock);
        if (buffer.empty())
            pthread_cond_wait(&has_product, &lock);

        cout << "### Consume " << buffer.front() << endl;
        buffer.pop();

        pthread_mutex_unlock(&lock);

    }
}

void* producer(void *p)
{
    while(true)
    {
        sleep(rand() % 5);
        pthread_mutex_lock(&lock);

        buffer.push(rand() % 1000);
        cout << ">>> Produce " << buffer.back() << endl;

        pthread_cond_signal(&has_product);
        pthread_mutex_unlock(&lock);
    }
}

int main(int argc, char *argv[])
{
    pthread_t pid, cid;

    srand(time(NULL));
    pthread_create(&pid, NULL, producer, NULL);
    pthread_create(&cid, NULL, consumer, NULL);
    pthread_join(pid, NULL);
    pthread_join(cid, NULL);
    return 0;
}

信号量Semaphore

Mutex变量是非0即1的,可看作一种资源的可用数量,初始化时Mutex是1,表示有一个可用资源,加锁时获得该资源,将Mutex减到0,表示不再有可用资源,解锁时释放该资源,将Mutex重新加到1,表示又有了一个可用资源。

信号量(Semaphore)和Mutex类似,表示可用资源的数量,和Mutex不同的是这个数量可以大于1。其相关操作函数是:

#include <semaphore.h>

int sem_init(sem_t *sem, int pshared, unsigned int value);
int sem_wait(sem_t *sem);
int sem_trywait(sem_t *sem);
int sem_post(sem_t * sem);
int sem_destroy(sem_t * sem);

这里以一个有限资源池的生产者和消费者的例子演示一下信号量的用法:

#include <stdlib.h>
#include <pthread.h>
#include <stdio.h>
#include <unistd.h>
#include <semaphore.h>

#define NUM 5
int queue[NUM];
sem_t blank_number, product_number;

void *producer(void *arg)
{
    int p = 0;
    while (true)
    {
        sem_wait(&blank_number);
        queue[p] = rand() % 1000 + 1;
        printf(">>> Produce %d\n", queue[p]);
        sem_post(&product_number);

        p = (p+1) % NUM;
        sleep(rand()%5);
    }
}

void *consumer(void *arg)
{
    int c = 0;
    while (true)
    {
        sem_wait(&product_number);
        printf("### Consume %d\n", queue[c]);
        queue[c] = 0;
        sem_post(&blank_number);

        c = (c+1) % NUM;
        sleep(rand() % 5);
    }
}

int main(int argc, char *argv[])
{
    pthread_t pid, cid;

    sem_init(&blank_number, 0, NUM);
    sem_init(&product_number, 0, 0);

    pthread_create(&pid, NULL, producer, NULL);
    pthread_create(&cid, NULL, consumer, NULL);
    pthread_join(pid, NULL);
    pthread_join(cid, NULL);

    sem_destroy(&blank_number);
    sem_destroy(&product_number);

    return 0;
}

读写锁(Reader-Writer Lock)

读写锁也是一种线程间的互斥操作,但和互斥锁不同的是,它分为读和写两种模式,其中写模式是独占资源的(和互斥锁一样),而读模式则是共享资源的,此时允许多个读者,从而提高并发性。

由于一时找不到合适的小例子来介绍它,并且读写锁的模型也是比较容易理解的,这里就不举例了。

来自为知笔记(Wiz)

时间: 2024-11-09 17:52:56

Linux高级编程--09.线程互斥与同步的相关文章

线程互斥与同步

能解决下面的问题,基本上就能理解线程互斥与同步了. 子线程循环10次,主线程循环100次,接着子线程循环10,主线程循环100次.如此往复循环50次. 1 package cn.lah.thread; 2 3 public class TraditionalThreadCommunication { 4 5 public static void main(String[] args) { 6 7 final Business business = new Business(); 8 new Th

python多线程编程(2): 使用互斥锁同步线程

上一节的例子中,每个线程互相独立,相互之间没有任何关系.现在假设这样一个例子:有一个全局的计数num,每个线程获取这个全局的计数,根据num进行一些处理,然后将num加1.很容易写出这样的代码: # encoding: UTF-8import threadingimport time class MyThread(threading.Thread): def run(self): global num time.sleep(1) num = num+1 msg = self.name+' set

Linux——高级编程之概要

1.为什么要学习Linux下的高级编程 应用课程的学习,不知道Linux内核的强大功能 Linux下的高级编程课程学习:感知到内核的存在,内核的强大功能 文件管理 进程管理 设备管理 内存管理 网络管理 2.怎么样学习Linux下的高级编程 Linux下高级编程的特点:涉及到内核向用户空间提供的接口(函数) 3.为什么内核要提供这些接口呢 主要原因: A:内核要为应用程序服务,应用程序如果没有内核服务,则功能非常单一 B:内核是一个稳定的代码,同时也要为多个用户空间的程序服务,为了防止用户空间的

Linux系统编程——进程同步与互斥:System V 信号量

信号量概述 信号量广泛用于进程或线程间的同步和互斥,信号量本质上是一个非负的整数计数器,它被用来控制对公共资源的访问. 编程时可根据操作信号量值的结果判断是否对公共资源具有访问的权限,当信号量值大于 0 时,则可以访问,否则将阻塞.PV 原语是对信号量的操作,一次 P 操作使信号量减1,一次 V 操作使信号量加1. 在实际应用中两个进程间通信可能会使用多个信号量,因此 System V 的信号量以集合的概念来管理,具体操作和Posix 信号量大同小异,详情请点此链接:http://blog.cs

JAVA 并发编程-传统线程互斥技术(Synchronized)(三)

java线程互斥是为了保证,同一时刻最多只有一个线程执行该段代码.那么它的出现又是为了解决什么问题呢?账户存取款,在同一时间段只能让一个人进行操作. 下面来看一个简单实例(多线程带来的问题): public class TraditionalThreadSynchronized { /** * @param args */ public static void main(String[] args) { new TraditionalThreadSynchronized().init(); }

linux系统编程:线程同步-互斥量(mutex)

线程同步-互斥量(mutex) 线程同步 多个线程同时访问共享数据时可能会冲突,于是需要实现线程同步. 一个线程冲突的示例 #include <stdio.h> #include <unistd.h> #include <pthread.h> #define Loop 1000000 //全局资然 int counter = 0; void *fun(void *argv) { int i; for (i = 0; i < Loop; i++) { counter

linux系统编程:线程同步-读写锁(rwlock)

线程同步-读写锁(rwlock) 读写锁 读写锁是互斥量的细化:显然,只有对全局资然进行写入操作时,才需要同步:在对全局资然进行读取操作时,是不需要锁的. 相关函数 pthread_rwlock_t //读写锁类型 pthread_rwlock_init //初始化 pthread_rwlock_destroy //销毁锁 pthread_rwlock_rdlock //获取读锁 pthread_rwlock_wrlock //获取写锁 pthread_rwlock_tryrdlock pthr

linux系统编程:线程同步-信号量(semaphore)

线程同步-信号量(semaphore) 生产者与消费者问题再思考 在实际生活中,只要有商品,消费者就可以消费,这没问题.但生产者的生产并不是无限的,例如,仓库是有限的,原材料是有限的,生产指标受消费指标限制等等.为了进一步,解决好生产者与消费者问题,引入信号量进机制. 信号量 信号量(semaphore)是互斥量的升级版:互斥量的状态为0或1,而信号量可以为n.也就是说,使用互斥量时,最多允许一个线程进入关键区,而信号量允许多个,具体值是信号量当前的内部值. 相关函数 sem_t //信号量类型

linux系统编程:线程同步-条件变量(cond)

线程同步-条件变量(cond) 生产者与消费者问题 再引入条件变量之前,我们先看下生产者和消费者问题:生产者不断地生产产品,同时消费者不断地在消费产品. 这个问题的同步在于两处:第一,消费者之间需要同步:同一件产品只可由一人消费.第二,当无产品可消费时,消费者需等待生产者生产后,才可继续消费,这又是一个同步问题.详细了解:生产者消费者问题. 条件变量 条件变量是利用线程间共享的全局变量进行同步的一种机制,并且条件变量总是和互斥锁结合在一起. 相关函数 pthread_cond_t //条件变量类