libuv学习笔记(24)

libuv学习笔记(24)

线程相关数据结构与函数(2)

数据结构

typedef union {//读写锁
  struct {
    unsigned int num_readers_;
    CRITICAL_SECTION num_readers_lock_;
    HANDLE write_semaphore_;
  } state_;
  /* TODO: remove me in v2.x. */
  struct {
    SRWLOCK unused_;
  } unused1_;
  /* TODO: remove me in v2.x. */
  struct {
    uv_mutex_t unused1_;
    uv_mutex_t unused2_;
  } unused2_;
} uv_rwlock_t;
typedef HANDLE uv_sem_t;//信号量
typedef union {//线程池同步管理
  CONDITION_VARIABLE cond_var;//调用系统API实现
  struct {//libuv自己实现
    unsigned int waiters_count;
    CRITICAL_SECTION waiters_count_lock;
    HANDLE signal_event;
    HANDLE broadcast_event;
  } fallback;
} uv_cond_t;
typedef struct {
  unsigned int n;
  unsigned int count;
  uv_mutex_t mutex;
  uv_sem_t turnstile1;
  uv_sem_t turnstile2;
} uv_barrier_t;

读写锁相关函数

初始化

int uv_rwlock_init(uv_rwlock_t* rwlock) {
  //创建信号量,最大资源数和可用资源数都为1
  HANDLE handle = CreateSemaphoreW(NULL, 1, 1, NULL);
  if (handle == NULL)
    return uv_translate_sys_error(GetLastError());
  rwlock->state_.write_semaphore_ = handle;
  //初始化临界区
  InitializeCriticalSection(&rwlock->state_.num_readers_lock_);
  //初始化读请求的数量
  rwlock->state_.num_readers_ = 0;
  return 0;
}

释放读写锁

void uv_rwlock_destroy(uv_rwlock_t* rwlock) {
  DeleteCriticalSection(&rwlock->state_.num_readers_lock_);
  CloseHandle(rwlock->state_.write_semaphore_);
}

读锁定

void uv_rwlock_rdlock(uv_rwlock_t* rwlock) {
  //进入临界区
  EnterCriticalSection(&rwlock->state_.num_readers_lock_);
  //递增读请求数量
  if (++rwlock->state_.num_readers_ == 1) {
    //如果为1,说明没有其他的地方读锁定了,等待资源可用
    DWORD r = WaitForSingleObject(rwlock->state_.write_semaphore_, INFINITE);
    if (r != WAIT_OBJECT_0)
      uv_fatal_error(GetLastError(), "WaitForSingleObject");
  }
  //离开临界区
  LeaveCriticalSection(&rwlock->state_.num_readers_lock_);
}

尝试读锁定

int uv_rwlock_tryrdlock(uv_rwlock_t* rwlock) {
  int err;
  //尝试进入临界区
  if (!TryEnterCriticalSection(&rwlock->state_.num_readers_lock_))
    return UV_EBUSY;
  err = 0;
  if (rwlock->state_.num_readers_ == 0) {
    //获取资源
    DWORD r = WaitForSingleObject(rwlock->state_.write_semaphore_, 0);
    if (r == WAIT_OBJECT_0)
      rwlock->state_.num_readers_++;
    else if (r == WAIT_TIMEOUT)
      err = UV_EBUSY;
    else if (r == WAIT_FAILED)
      uv_fatal_error(GetLastError(), "WaitForSingleObject");
  } else {
    rwlock->state_.num_readers_++;
  }

  LeaveCriticalSection(&rwlock->state_.num_readers_lock_);
  return err;
}

释放读锁定

void uv_rwlock_rdunlock(uv_rwlock_t* rwlock) {
  //进入临界区
  EnterCriticalSection(&rwlock->state_.num_readers_lock_);
  if (--rwlock->state_.num_readers_ == 0) {
    //没有都请求了,释放资源
    if (!ReleaseSemaphore(rwlock->state_.write_semaphore_, 1, NULL))
      uv_fatal_error(GetLastError(), "ReleaseSemaphore");
  }
  //离开临界区
  LeaveCriticalSection(&rwlock->state_.num_readers_lock_);
}

写锁定

void uv_rwlock_wrlock(uv_rwlock_t* rwlock) {
  //等待资源可用
  DWORD r = WaitForSingleObject(rwlock->state_.write_semaphore_, INFINITE);
  if (r != WAIT_OBJECT_0)
    uv_fatal_error(GetLastError(), "WaitForSingleObject");
}

尝试写锁定

int uv_rwlock_trywrlock(uv_rwlock_t* rwlock) {
  DWORD r = WaitForSingleObject(rwlock->state_.write_semaphore_, 0);
  if (r == WAIT_OBJECT_0)
    return 0;
  else if (r == WAIT_TIMEOUT)
    return UV_EBUSY;
  else
    uv_fatal_error(GetLastError(), "WaitForSingleObject");
}

解除写锁定

void uv_rwlock_wrunlock(uv_rwlock_t* rwlock) {
  //释放信号量
  if (!ReleaseSemaphore(rwlock->state_.write_semaphore_, 1, NULL))
    uv_fatal_error(GetLastError(), "ReleaseSemaphore");
}

信号量相关API

初始化

int uv_sem_init(uv_sem_t* sem, unsigned int value) {
  *sem = CreateSemaphore(NULL, value, INT_MAX, NULL);//创建信号量
  if (*sem == NULL)
    return uv_translate_sys_error(GetLastError());
  else
    return 0;
}

释放信号量

void uv_sem_destroy(uv_sem_t* sem) {
  if (!CloseHandle(*sem))//关闭句柄
    abort();
}

发送

void uv_sem_post(uv_sem_t* sem) {
  if (!ReleaseSemaphore(*sem, 1, NULL))//释放一个资源
    abort();
}

等待

void uv_sem_wait(uv_sem_t* sem) {
  if (WaitForSingleObject(*sem, INFINITE) != WAIT_OBJECT_0)
    abort();
}

尝试等待

int uv_sem_trywait(uv_sem_t* sem) {
  DWORD r = WaitForSingleObject(*sem, 0);

  if (r == WAIT_OBJECT_0)
    return 0;

  if (r == WAIT_TIMEOUT)
    return UV_EAGAIN;

  abort();
  return -1; /* Satisfy the compiler. */
}

线程池同步相关API

初始化

int uv_cond_init(uv_cond_t* cond) {
  uv__once_init();//调用uv_init全局初始化
  if (HAVE_CONDVAR_API())//系统支持相关的API
    return uv_cond_condvar_init(cond);
  else
    return uv_cond_fallback_init(cond);
}

系统支持

static int uv_cond_condvar_init(uv_cond_t* cond) {
  //调用InitializeConditionVariable初始化
  pInitializeConditionVariable(&cond->cond_var);
  return 0;
}

系统不支持

static int uv_cond_fallback_init(uv_cond_t* cond) {
  int err;
  cond->fallback.waiters_count = 0;
  //初始化临界区
  InitializeCriticalSection(&cond->fallback.waiters_count_lock);
  //新建一个自动还原状态的事件,初始化为无信号
  cond->fallback.signal_event = CreateEvent(NULL,  /* no security */
                                            FALSE, /* auto-reset event */
                                            FALSE, /* non-signaled initially */
                                            NULL); /* unnamed */
  if (!cond->fallback.signal_event) {
    err = GetLastError();
    goto error2;
  }
  //创建一个需要手动改变状态的事件
  cond->fallback.broadcast_event = CreateEvent(NULL,  /* no security */
                                               TRUE,  /* manual-reset */
                                               FALSE, /* non-signaled */
                                               NULL); /* unnamed */
  if (!cond->fallback.broadcast_event) {
    err = GetLastError();
    goto error;
  }
  return 0;
error:
  CloseHandle(cond->fallback.signal_event);
error2:
  DeleteCriticalSection(&cond->fallback.waiters_count_lock);
  return uv_translate_sys_error(err);
}

释放

void uv_cond_destroy(uv_cond_t* cond) {
  if (HAVE_CONDVAR_API())//根据系统的支持情况释放资源
    uv_cond_condvar_destroy(cond);
  else
    uv_cond_fallback_destroy(cond);
}

设为有信号状态

void uv_cond_signal(uv_cond_t* cond) {
  if (HAVE_CONDVAR_API())
    uv_cond_condvar_signal(cond);
  else
    uv_cond_fallback_signal(cond);
}

使用系统API

static void uv_cond_condvar_signal(uv_cond_t* cond) {
  pWakeConditionVariable(&cond->cond_var);//唤醒
}

libuv自己实现

static void uv_cond_fallback_signal(uv_cond_t* cond) {
  int have_waiters;
  EnterCriticalSection(&cond->fallback.waiters_count_lock);
  have_waiters = cond->fallback.waiters_count > 0;
  LeaveCriticalSection(&cond->fallback.waiters_count_lock);
  if (have_waiters)
    //将signal_event设为有信号,这样等待该event的线程中的一个将会唤醒
    SetEvent(cond->fallback.signal_event);
}

广播通知(所有等待的线程都被唤醒)

void uv_cond_broadcast(uv_cond_t* cond) {
  if (HAVE_CONDVAR_API())
    uv_cond_condvar_broadcast(cond);
  else
    uv_cond_fallback_broadcast(cond);
}

使用系统API实现

static void uv_cond_condvar_broadcast(uv_cond_t* cond) {
  pWakeAllConditionVariable(&cond->cond_var);//唤醒所有线程
}

libuv自己实现

static void uv_cond_fallback_broadcast(uv_cond_t* cond) {
  int have_waiters;
  EnterCriticalSection(&cond->fallback.waiters_count_lock);
  have_waiters = cond->fallback.waiters_count > 0;
  LeaveCriticalSection(&cond->fallback.waiters_count_lock);
  if (have_waiters)
    //所有等待broadcast_event的线程都被唤醒
    SetEvent(cond->fallback.broadcast_event);
}

等待

void uv_cond_wait(uv_cond_t* cond, uv_mutex_t* mutex) {
  if (HAVE_CONDVAR_API())
    uv_cond_condvar_wait(cond, mutex);
  else
    uv_cond_fallback_wait(cond, mutex);
}

通过系统API实现

static void uv_cond_condvar_wait(uv_cond_t* cond, uv_mutex_t* mutex) {
  //等待cond_var,并离开临界区mutex
  if (!pSleepConditionVariableCS(&cond->cond_var, mutex, INFINITE))
    abort();
}

libuv自己实现

static int uv_cond_wait_helper(uv_cond_t* cond, uv_mutex_t* mutex,
    DWORD dwMilliseconds) {
  DWORD result;
  int last_waiter;
  HANDLE handles[2] = {
    cond->fallback.signal_event,
    cond->fallback.broadcast_event
  };
  EnterCriticalSection(&cond->fallback.waiters_count_lock);
  cond->fallback.waiters_count++;//等待的线程计数加一
  LeaveCriticalSection(&cond->fallback.waiters_count_lock);
  //离开临界区
  uv_mutex_unlock(mutex);
  //等待任意一个event为有信号状态
  result = WaitForMultipleObjects(2, handles, FALSE, dwMilliseconds);
  EnterCriticalSection(&cond->fallback.waiters_count_lock);
  cond->fallback.waiters_count--;
  //如果信号是broadcast_event并且没有等待者了,说明这是最后一个等待的线程
  last_waiter = result == WAIT_OBJECT_0 + 1
      && cond->fallback.waiters_count == 0;
  LeaveCriticalSection(&cond->fallback.waiters_count_lock);
  //对于最后一个等待线程,手动将broadcast_event设为无信号
  if (last_waiter) {
    ResetEvent(cond->fallback.broadcast_event);
  }
  uv_mutex_lock(mutex);
  if (result == WAIT_OBJECT_0 || result == WAIT_OBJECT_0 + 1)
    return 0;
  if (result == WAIT_TIMEOUT)
    return UV_ETIMEDOUT;
  abort();
  return -1; /* Satisfy the compiler. */
}

等待指定时间,与上面的函数相比,多了一个超时参数

int uv_cond_timedwait(uv_cond_t* cond, uv_mutex_t* mutex,
    uint64_t timeout) {
  if (HAVE_CONDVAR_API())
    return uv_cond_condvar_timedwait(cond, mutex, timeout);
  else
    return uv_cond_fallback_timedwait(cond, mutex, timeout);
}

barrier相关函数

初始化

int uv_barrier_init(uv_barrier_t* barrier, unsigned int count) {
  int err;
  barrier->n = count;//任务数量
  barrier->count = 0;
  err = uv_mutex_init(&barrier->mutex);
  if (err)
    return err;
  err = uv_sem_init(&barrier->turnstile1, 0);
  if (err)
    goto error2;
  err = uv_sem_init(&barrier->turnstile2, 1);
  if (err)
    goto error;
  return 0;
error:
  uv_sem_destroy(&barrier->turnstile1);
error2:
  uv_mutex_destroy(&barrier->mutex);
  return err;
}

释放

void uv_barrier_destroy(uv_barrier_t* barrier) {
  uv_sem_destroy(&barrier->turnstile2);
  uv_sem_destroy(&barrier->turnstile1);
  uv_mutex_destroy(&barrier->mutex);
}

等待

int uv_barrier_wait(uv_barrier_t* barrier) {
  int serial_thread;
  uv_mutex_lock(&barrier->mutex);//进入临界区
  if (++barrier->count == barrier->n) {//最后一个任务
    uv_sem_wait(&barrier->turnstile2);//等待第二个信号量
    uv_sem_post(&barrier->turnstile1);//释放第一个信号量
  }
  uv_mutex_unlock(&barrier->mutex);
  uv_sem_wait(&barrier->turnstile1);//等待第一个信号量
  uv_sem_post(&barrier->turnstile1);//释放第一个信号量
  uv_mutex_lock(&barrier->mutex);//进入临界区
  serial_thread = (--barrier->count == 0);
  if (serial_thread) {//最后一个任务
    uv_sem_wait(&barrier->turnstile1);/等待第一个
    uv_sem_post(&barrier->turnstile2);//释放第二个
  }
  uv_mutex_unlock(&barrier->mutex);

  uv_sem_wait(&barrier->turnstile2);//等待第二个
  uv_sem_post(&barrier->turnstile2);//释放第二个
  return serial_thread;
}

当某一线程需要等待其他一些线程任务完成之后才能继续运行时,可以使用barrier。

流程如下:

a.所有的相关线程都调用uv_barrier_wait等待同一个uv_barrier_t,此时除了最后一个,都会在等待第一个信号量的地方阻塞。

b.最后一个调用uv_barrier_wait的线程会等待第二个信号量,此时第二个信号量没有资源,然后释放第一个信号量。

c.之前阻塞在等待第一个信号量的线程中的一个获取信号量,继续运行,接着释放第一个信号量,这导致所有阻塞的进程都会因此一个一个的继续运行

d.接着除了最后一个线程,所有的线程都会阻塞在等待第二个信号量的地方。

e.最后一个线程,等待第一个信号量,此时第一个信号量重新变为没有资源,接着释放第二个信号量,激活其他等待的线程

f.其他阻塞在等待第二个信号量的线程,一个一个的唤醒,最终第二个信号量的资源为一,所有的线程继续运行

注意,最后一个线程是指逻辑上的最后一个,并不是确定的。

如果线程数比初始化时设置的任务数少,那么都会阻塞

如果线程池数比任务数多,那么等待任务数量的线程之后就会继续运行。

时间: 2024-08-02 22:48:32

libuv学习笔记(24)的相关文章

C++学习笔记24,方法重写与方法隐藏

该博文仅用于交流学习,请慎用于任何商业用途,本博主保留对该博文的一切权利. 博主博客:http://blog.csdn.net/qq844352155 转载请注明出处: 方法重写.是指在子类中重新编写父类中的虚函数的实现.要求子类中的函数必须跟父类中的原型一致. 包括返回值类型(协变返回类型不算)以及参数的数目,排列顺序. #include <iostream> #include <string> using namespace std; class base{ public: v

[原创]java WEB学习笔记24:MVC案例完整实践(part 5)---删除操作的设计与实现

本博客为原创:综合 尚硅谷(http://www.atguigu.com)的系统教程(深表感谢)和 网络上的现有资源(博客,文档,图书等),资源的出处我会标明 本博客的目的:①总结自己的学习过程,相当于学习笔记 ②将自己的经验分享给大家,相互学习,互相交流,不可商用 内容难免出现问题,欢迎指正,交流,探讨,可以留言,也可以通过以下方式联系. 本人互联网技术爱好者,互联网技术发烧友 微博:伊直都在0221 QQ:951226918 ---------------------------------

libuv学习笔记(一)

前言 学网络I/O的时候难免会碰到这样或那样的异步IO库,比如libevent.libev.libuv,看完UNP之后动手写过几个简单的小玩意,总感觉网络底层的那些函数使用起来好麻烦,一个接一个地man起来也挺费劲,于是学习这些成熟网络I/O库的想法应运而生. 初看这些库的简介感觉都差不多,原理和poll/select/epoll等都大同小异,无非是在不同平台上面封装了一层API,不过真想把他们用起来还是没那么容易的,下面就记录一下我学习libuv的一些过程. 最开始看的是libevent,顺便

ArcGIS API for JavaScript 4.2学习笔记[24] 【IdentifyTask类】的使用(结合IdentifyParameters类)(第七章完结)

好吧,我都要吐了. 接连三个例子都是类似的套路,使用某个查询参数类的实例,结合对应的Task类,对返回值进行取值.显示. 这个例子是Identify识别,使用了TileLayer这种图层,数据来自Server的MapServer. 结果演示 戳不同的地方会有不同的识别结果. 我对TileLayer不是很了解,这一例仅针对有了解的同学,做一个IdentifyTask的解释. IdentifyTask/IdentifyParameter/IdentifyResult三个类 既然是一样的套路,那么先对

List的一阶函数操作代码实战详解之Scala学习笔记-24

package com.leegh.dataset /** * @author Guohui Li */object List_FirstOrder_Ops { def main(args: Array[String]): Unit = { println(List(1, 2, 3, 4) ::: List(4, 5, 6, 7, 8) ::: List(10, 11)) println(List(1, 2, 3, 4) ::: (List(4, 5, 6, 7, 8) ::: List(10,

Android学习笔记(24):进度条组件ProgressBar及其子类

ProgressBar作为进度条组件使用,它还派生了SeekBar(拖动条)和RatingBar(星级评分条). ProgressBar支持的XML属性: Attribute Name Related Method Description style 设置ProgressBar指定风格 android:indeterminate 设置为true时,进度条不显示运行进度 android:indeterminateBehavior indeterminate模式下.当进度条达到最大值时的动画处理行为

C++学习笔记24:makefile文件

makefile make命令:负责c/c++程序编译与链接 make根据指定命令进行建构 建构规则文件:GNUmakefile , makefile,Makefile makefile 文件格式 makefile 语法 --基本语法,变量,条件判断,循环,函数 makefile 文件的基本格式 target...:prerequisites... [tab键]  commands makefile文件的规则 makefile 文件由一系列规则构成 规则目的:建构目标的先决条件是什么以及如何构建

Python学习笔记24:Django搭建简单的博客网站(二)

上一节说道如何使用Django创建并运行一个项目,这节说如何添加一个博客应用. 一 项目跟应用的关系 在添加应用之前,先来看看项目与应用之间有什么不同之处呢? 项目是针对一个特定的 Web 网站相关的配置和其应用的组合.一个项目可以包含多个应用. 应用是一个提供功能的 Web 应用 – 例如:一个博客系统.一个公共记录的数据库或者一个简单的投票系统. 应用是"可插拔的":你可以在多个项目使用一个应用,你还可以分发应用,因为它们没有被捆绑到一个给定的 Django 安装环境中. 这样,我

【MongoDB学习笔记24】MongoDB的explain和hint函数

一.explain函数 explain函数可以提供大量查询相关的信息,如果是慢查询,它最重要的诊断工具.例如: 在有索引的字段上查询: > db.post.find({"loc.city":"ny"}).explain()    {         "cursor" : "BtreeCursor loc.city_1",         "isMultiKey" : false,         &q