libev实现分析

libev是一个事件驱动库,底层是基于select、epoll、kqueue等I/O复用接口。所谓事件驱动库,就是用户定义一个事件以及改事件发生时调用的函数,该库会监听该事件,并在事件发生时调用相应的函数。

libev提供了很多事件监听器(watcher),最主要的有IO、时间以及信号监听器。当某一个文件的读事件或者写事件发生时,周期时间到了时,进程接收到某个信号时,就会调用用户定义的回调函数。

下面以IO事件为例,讲述libev的工作原理:

1、实例

 1 #include<stdio.h>
 2 #include <ev.h>
 3 // every watcher type has its own typedef‘d struct
 4 // with the name ev_TYPE
 5 ev_io stdin_watcher;
 6 ev_timer timeout_watcher;
 7
 8 // all watcher callbacks have a similar signature
 9 // this callback is called when data is readable on stdin
10 static void
11 stdin_cb (EV_P_ ev_io *w, int revents)
12 {
13   puts ("stdin ready OK!");
14   // for one-shot events, one must manually stop the watcher
15   // with its corresponding stop function.
16   ev_io_stop (EV_A_ w);
17
18   // this causes all nested ev_run‘s to stop iterating
19   ev_break (EV_A_ EVBREAK_ALL);
20 }
21
22 // another callback, this time for a time-out
23 static void
24 timeout_cb (EV_P_ ev_timer *w, int revents)
25 {
26    puts ("timeout");
27    // this causes the innermost ev_run to stop iterating
28    ev_break (EV_A_ EVBREAK_ONE);
29 }
30
31 int
32 main (void)
33 {
34     // use the default event loop unless you have special needs
35     struct ev_loop *loop = EV_DEFAULT;
36     // initialise an io watcher, then start it
37     // this one will watch for stdin to become readable
38     ev_io_init (&stdin_watcher, stdin_cb, /*STDIN_FILENO*/ 0, EV_READ);
39     ev_io_start (loop, &stdin_watcher);
40
41     // initialise a timer watcher, then start it
42     // simple non-repeating 5.5 second timeout
43     ev_timer_init (&timeout_watcher, timeout_cb, 5.5, 0.);
44     ev_timer_start (loop, &timeout_watcher);
45
46     // now wait for events to arrive
47     ev_run (loop, 0);
48     //
49     // break was called, so exit
50     return 0;
51 }

可以看出,libev库的使用简单、方便。我们只要定义个事件的监听器对象,初始化,开始,最后调用ev_run。不同事件监听器初始化的内容也不一样,比如,IO事件监听器需要初始化监听的文件描述符,事件以及回调函数。

2、事件监听器

typedef ev_watcher *W;
typedef ev_watcher_list *WL;
typedef ev_watcher_time *WT;

typedef struct ev_watcher
{
  int active;
  int pending;
  int priority;
  void *data;
  void (*cb)(EV_P_ struct type *w, int revents);
} ev_watcher;

typedef struct ev_watcher_list
{
  int active;
  int pending;
  int priority;
  void *data;
  void (*cb)(EV_P_ struct ev_watcher_list *w, int revents);
  struct ev_watcher_list *next;
} ev_watcher_list;

typedef struct ev_io
{
  int active;
  int pending;
  int priority;
  void *data;
  void (*cb)(EV_P_ struct ev_io *w, int revents);
  struct ev_watcher_list *next;

  int fd;     /* ro */
  int events; /* ro */
} ev_io;
ev_watcher是一个基础监听器,包括回调函数cd;监听器列表(ev_watcher_list)是在监听器的基础上添加指向下一个监听器的指针next;IO监听器是在监听器列表的基础上加上了其特有的文件描述符和事件类型。ev_io_init (&stdin_watcher, stdin_cb, /*STDIN_FILENO*/ 0, EV_READ);该函数就是初始化stdin_watcher这个监听器的回调函数,文件描述符,以及事假类型。3、struct ev_loop
struct ev_loop;
# define EV_P  struct ev_loop *loop
# define EV_P_ struct ev_loop *loop,
# define EV_A  loop
# define EV_A_ loop,  //这4个宏用于形参

struct ev_loop{ //部分参数
    ANFD *anfds
    int anfdmax

    int *fdchanges
    int fdchangemax
    int fdchangecnt

    ANPENDING *pendings [NUMPRI]
    int pendingmax [NUMPRI]
    int pendingcnt [NUMPRI]

    int backend
    int backend_fd
    void (*backend_modify)(EV_P_ int fd, int oev, int nev)
    void (*backend_poll)(EV_P_ ev_tstamp timeout)
}

typedef struct
{
  WL head;           //ev_watcher_list *head;
  unsigned char events; /* the events watched for */
  unsigned char reify;  /* flag set when this ANFD needs reification (EV_ANFD_REIFY, EV__IOFDSET) */
  unsigned char emask;  /* the epoll backend stores the actual kernel mask in here */
  unsigned char unused;

  unsigned int egen;    /* generation counter to counter epoll bugs */
  SOCKET handle;
  OVERLAPPED or, ow;
} ANFD;

typedef struct
{
  W w;       //ev_watcher *w;
  int events; /* the pending event set for the given watcher */
} ANPENDING;
ev_run函数主要是一个while循环,在这个while循环中不断检测各个事件是否发生,如果发生就调用其回调函数。而这个过程中,主要用到的对象就是struct ev_loop结构体对象,检测哪些事件,回调哪个函数都存放在该对象中。struct ev_loop结构体中的字段很多,以IO事件为例介绍几个主要的:anfds是一个数组,数组元素是结构体ANFD,ANFD有一个成员是监听器列表。数组下标是文件描述符,而列表成员是监听该文件的事件监听器。所以,anfds有点类似散列表,以文件描述符作为键,以监听器作为值,采用开链法解决散列冲突。该字段的初始化在ev_io_start函数中,主要目的是用户定义的监听器告诉ev_loop。fdchanges是一个int数组,也是在ev_io_start中初始化。存放的是监听了的文件描述符。这样ev_run每次循环的时候,要先从fdchanges中取出已经监听的文件描述符,再以该描述符为下标,从anfds中取出监听器对象。这样就得到文件描述符以及监听的事件。

pendings是一个二维数组,第一维是优先级,第二维是监听器。这个数组是用于执行相应的回调函数,根据优先级,遍历所有监听器,调用监听器的回调函数。

3、ev_io_start
ev_io_start (EV_P_ ev_io *w) EV_THROW
{
  ev_start (EV_A_ (W)w, 1);   //w->active = active;
  array_needsize (ANFD, anfds, anfdmax, fd + 1, array_init_zero); //when fd + 1 > anfdmax : 重新分配数组大小
                                                                  //anfds = (type *)array_realloc(sizeof (ANFD), (anfds), &(anfdmax), (fd + 1));
  wlist_add (&anfds[fd].head, (WL)w);  // w->next = *head;*head = w;

  fd_change (EV_A_ fd, w->events & EV__IOFDSET | EV_ANFD_REIFY); //nfds [fd].reify |= flags;
                                                                 //++fdchangecnt;
                                                                 //array_needsize (int, fdchanges, fdchangemax, fdchangecnt, EMPTY2);
                                                                 //fdchanges [fdchangecnt - 1] = fd;
}

array_realloc (int elem, void *base, int *cur, int cnt)
{
  *cur = array_nextsize (elem, *cur, cnt);
  return ev_realloc (base, elem * *cur);
}

//分配当前数组大小的两倍内存,如果大于4096,则为4096的倍数
int array_nextsize (int elem, int cur, int cnt)
{
  int ncur = cur + 1;

  do
    ncur <<= 1;
  while (cnt > ncur);

  /* if size is large, round to MALLOC_ROUND - 4 * longs to accommodate malloc overhead */
  if (elem * ncur > MALLOC_ROUND - sizeof (void *) * 4) //#define MALLOC_ROUND 4096
    {
      ncur *= elem;
      ncur = (ncur + elem + (MALLOC_ROUND - 1) + sizeof (void *) * 4) & ~(MALLOC_ROUND - 1);
      ncur = ncur - sizeof (void *) * 4;
      ncur /= elem;
    }

  return ncur;
}

ev_io_start函数主要就是对ev_loop的anfds和fdchanges字段操作,上面已介绍。array_needsize函数实现当数组大小不够时,要重新分配内存,分配方式与stl::vector有些类似,都是新分配的内存为当前内存的2倍,然后移动原先数据到新内存,释放旧内存。

4、ev_run
ev_run (EV_P_ int flags)
{
    ...
    do{
      fd_reify (EV_A);
      backend_poll (EV_A_ waittime);
      if (expect_false (checkcnt))
        queue_events (EV_A_ (W *)checks, checkcnt, EV_CHECK);
      EV_INVOKE_PENDING;
    }
    while(...)
    ...
}
ev_run函数代码比较多,以上是以IO事件为例,进行的精简。下面是以epoll作为IO多路复用的机制进行ev_run说明1、ev_loop对象的初始化:
ev_loop对象初始化:
1、# define EV_DEFAULT  ev_default_loop (0)

2、
static struct ev_loop default_loop_struct;
EV_API_DECL struct ev_loop *ev_default_loop_ptr = 0;

ev_default_loop (unsigned int flags) EV_THROW
{
  if (!ev_default_loop_ptr)
    {
      EV_P = ev_default_loop_ptr = &default_loop_struct;
      loop_init (EV_A_ flags);
    }

  return ev_default_loop_ptr;
}

3、
static void noinline ecb_cold
loop_init (EV_P_ unsigned int flags) EV_THROW
{
 flags = atoi (getenv ("LIBEV_FLAGS"));
#if EV_USE_IOCP
      if (!backend && (flags & EVBACKEND_IOCP  )) backend = iocp_init   (EV_A_ flags);
#endif
#if EV_USE_PORT
      if (!backend && (flags & EVBACKEND_PORT  )) backend = port_init   (EV_A_ flags);
#endif
#if EV_USE_KQUEUE
      if (!backend && (flags & EVBACKEND_KQUEUE)) backend = kqueue_init (EV_A_ flags);
#endif
#if EV_USE_EPOLL
      if (!backend && (flags & EVBACKEND_EPOLL )) backend = epoll_init  (EV_A_ flags);
#endif
#if EV_USE_POLL
      if (!backend && (flags & EVBACKEND_POLL  )) backend = poll_init   (EV_A_ flags);
#endif
#if EV_USE_SELECT
      if (!backend && (flags & EVBACKEND_SELECT)) backend = select_init (EV_A_ flags);
}
EV_USE_EPOLL 、EV_USE_SELECT等宏是在调用./configure时,搜索sys/epoll.h sys/select.h等文件,如果文件存在,就将宏设置为1.
__cplusplus宏是g++编译器定义的

4、
int inline_size
epoll_init (EV_P_ int flags)
{
  backend_fd = epoll_create (256);

  if (backend_fd < 0)
    return 0;

  fcntl (backend_fd, F_SETFD, FD_CLOEXEC);

  backend_mintime = 1e-3; /* epoll does sometimes return early, this is just to avoid the worst */
  backend_modify  = epoll_modify;
  backend_poll    = epoll_poll;

  epoll_eventmax = 64; /* initial number of events receivable per poll */
  epoll_events = (struct epoll_event *)ev_malloc (sizeof (struct epoll_event) * epoll_eventmax);

  return EVBACKEND_EPOLL;//即 EVBACKEND_EPOLL   = 0x00000004U
}
epoll_init返回值为非0,所以不会调用后面的poll_init、select_init。

   所以,在初始化时,调用了epoll_create初始化backend_fd。

 2、fd_reify

fd_reify (EV_P)
{
    for (i = 0; i < fdchangecnt; ++i)
    {
      int fd = fdchanges [i];
      ANFD *anfd = anfds + fd;

      if (o_reify & EV__IOFDSET)
        backend_modify (EV_A_ fd, o_events, anfd->events); //即poll_modify
    }
  fdchangecnt = 0;
}
epoll_modify (EV_P_ int fd, int oev, int nev)
{
  struct epoll_event ev;

  ev.data.u64 = (uint64_t)(uint32_t)fd
              | ((uint64_t)(uint32_t)++anfds [fd].egen << 32);
  ev.events   = (nev & EV_READ  ? EPOLLIN  : 0)
              | (nev & EV_WRITE ? EPOLLOUT : 0);

  if (expect_true (!epoll_ctl (backend_fd, oev && oldmask != nev ? EPOLL_CTL_MOD : EPOLL_CTL_ADD, fd, &ev)))
    return;
}

  在fd_reify中将andfs中监听的事件添加到backend_fd中。

  3、backend_poll

backend_poll (EV_A_ waittime); // 即epoll_poll
epoll_poll (EV_P_ ev_tstamp timeout)
{
  eventcnt = epoll_wait (backend_fd, epoll_events, epoll_eventmax, timeout * 1e3);
  for (i = 0; i < eventcnt; ++i)
    {
      struct epoll_event *ev = epoll_events + i;

      int fd = (uint32_t)ev->data.u64; /* mask out the lower 32 bits */
      int want = anfds [fd].events;
      int got  = (ev->events & (EPOLLOUT | EPOLLERR | EPOLLHUP) ? EV_WRITE : 0)
               | (ev->events & (EPOLLIN  | EPOLLERR | EPOLLHUP) ? EV_READ  : 0);
      fd_event (EV_A_ fd, got);  //将watcher设置到loop的pending数组中
      }
}
fd_event (EV_P_ int fd, int revents)
{
  ANFD *anfd = anfds + fd;
  if (expect_true (!anfd->reify))
    fd_event_nocheck (EV_A_ fd, revents);
}
fd_event_nocheck (EV_P_ int fd, int revents)
{
  ANFD *anfd = anfds + fd;
  ev_io *w;

  for (w = (ev_io *)anfd->head; w; w = (ev_io *)((WL)w)->next)
    {
      int ev = w->events & revents;

      if (ev)
        ev_feed_event (EV_A_ (W)w, ev);
    }
}
ev_feed_event (EV_P_ void *w, int revents) EV_THROW
{
  W w_ = (W)w;
  int pri = ABSPRI (w_);

  if (expect_false (w_->pending))
    pendings [pri][w_->pending - 1].events |= revents;
  else
    {
      w_->pending = ++pendingcnt [pri];
      array_needsize (ANPENDING, pendings [pri], pendingmax [pri], w_->pending, EMPTY2);
      pendings [pri][w_->pending - 1].w      = w_;
      pendings [pri][w_->pending - 1].events = revents;
    }
}

在backend_poll中会调用epoll_wait,通过fd_event函数,将就绪的文件描述符对应的监听器添加到ev_loop对象的pendings字段中

  4、EV_INVOKE_PENDING

void noinline
ev_invoke_pending (EV_P)
{
  pendingpri = NUMPRI;

  while (pendingpri) /* pendingpri possibly gets modified in the inner loop */
    {
      --pendingpri;

      while (pendingcnt [pendingpri])
        {
          ANPENDING *p = pendings [pendingpri] + --pendingcnt [pendingpri];

          p->w->pending = 0;
          EV_CB_INVOKE (p->w, p->events);
        }
    }
}
# define EV_CB_INVOKE(watcher,revents) (watcher)->cb (EV_A_ (watcher), (revents))

EV_INVOKE_PENDING会一次调用pendings中的监听器的回调函数。

至此,ev_run大体介绍完毕。

小结:

总的来说,对于IO事件驱动,libev是先将监听器存放在一个数组,每次遍历都将监听器监听的文件描述符添加到epoll_wait进行监听,然后将eopll_wait返回的就绪描述符对应的监听器添加到pendings,最后调用pendings中监听器的回调函数。

 
				
时间: 2024-10-26 07:14:35

libev实现分析的相关文章

事件驱动模型Libev(二)

Libev设计思路 理清了Libev的代码结构和主要的数据结构,就可以跟着示例中接口进入到Libev中,跟着代码了解其设计的思路.这里我们管struct ev_loop称作为事件循环驱动器而将各种watcher称为事件监控器. 1.分析例子中的IO事件 这里在前面的例子中我们先把定时器和信号事件的使用注释掉,只看IO事件监控器,从而了解Libev最基本的逻辑.可以结合Gdb设断点一步一步的跟看看代码的逻辑是怎样的. 我们从main开始一步步走.首先执行 struct ev_loop *main_

[gevent源码分析] gevent两架马车-libev和greenlet

本篇将讨论gevent的两架马车-libev和greenlet如何协同工作的. gevent事件驱动底层使用了libev,我们先看看如何单独使用gevent中的事件循环. #coding=utf8 import socket import gevent from gevent.core import loop def f(): s, address = sock.accept() print address s.send("hello world\r\n") loop = loop()

[gevent源码分析] libev cython绑定core.pyx

gevent core就是封装了libev,使用了cython的语法,感兴趣童鞋可以好好研究研究.其实libev是有python的封装 pyev(https://pythonhosted.org/pyev/),不过pyev是使用C来写扩展的,代码巨复杂.在看core.pyx代码之前先学习一下 core.pyx用到的cython知识. 一: cython基础知识 1.cdef, def, cpdef的区别 cdef用于定义C中的函数,变量,如cdef int i;而def知识python中的函数定

解决 libev.so.4()(64bit) is needed by percona-xtrabackup-2.3.4-1.el6.x86_64案例

在mysql主从同步时经常会用到Xtra, XtraBackup可以说是一个相对完美的免费开源数据备份工具,支持在线无锁表同步复制和可并行高效率的安全备份恢复机制相比mysqldump来说优势较大好处多,在RHEL6中安装XtraBackup时会发生缺少依赖包的现象 本案例针对Xtra缺少依赖包的情况进行安装分析解决 1.本实验环境 [root@master ~]# uname  -r 2.6.32-573.el6.x86_64 [root@master ~]# cat /etc/redhat-

Gevent的socket协程安全性分析

一般讨论socket的并发安全性,都是指线程的安全性...而且绝大多数的情况下socket都不是线程安全的.. 当然一些框架可能会对socket进行一层封装,让其成为线程安全的...例如java的netty框架就是如此,将socket封装成channel,然后让channel封闭到一个线程中,那么这个channel的所有的读写都在它所在的线程中串行的进行,那么自然也就是线程安全的了..... 其实很早看Gevent的源码的时候,就已经看过这部分的东西了,当时就已经知道gevent的socket不

大数据系统数据采集产品的架构分析

任何完整的大数据平台,一般包括以下的几个过程: 数据采集 数据存储 数据处理 数据展现(可视化,报表和监控) 其中,数据采集是所有数据系统必不可少的,随着大数据越来越被重视,数据采集的挑战也变的尤为突出.这其中包括: 数据源多种多样 数据量大,变化快 如何保证数据采集的可靠性的性能 如何避免重复数据 如何保证数据的质量 我们今天就来看看当前可用的一些数据采集的产品,重点关注一些它们是如何做到高可靠,高性能和高扩展. Apache Flume Flume 是Apache旗下,开源,高可靠,高扩展,

libev 默认事件循环初始化的解析

libev第一次进入的是默认的事件循环,这里将源码中执行的默认循环流程解析一下,要进入事件循环,如下例子 int main (void) { // use the default event loop unless you have special needs struct ev_loop *loop = EV_DEFAULT; // initialise an io watcher, then start it // this one will watch for stdin to becom

Darwin Streaming Server 核心代码分析

基本概念 首先,我针对的代码是Darwin Streaming Server 6.0.3未经任何改动的版本. Darwin Streaming Server从设计模式上看,采用了Reactor的并发服务器设计模式,如果对Reactor有一定的了解会有助于对Darwin Streaming Server核心代码的理解. Reactor模式是典型的事件触发模式,当有事件发生时则完成相应的Task,Task的完成是通过调用相应的handle来实现的,对于handle的调用是由有限个数的Thread来完

先从一个 libev 的 demo 入手

最近想研究下 libev 这个网络库,所以先从官方文档一个最简单的 demo 开始,代码如下: //io.c // a single header file is required #include <ev.h> #include <stdio.h> // for puts // every watcher type has its own typedef'd struct // with the name ev_TYPE ev_io stdin_watcher; ev_timer