Libevent源码分析(六)--- bufferevent

上一节说过,libevent提供六种bufferevent类型,后面会详细分析其中的两个:bufferevent_sock和bufferevent_async.下面是bufferevent的详细定义:

struct bufferevent {
    /** Event base for which this bufferevent was created. */
    struct event_base *ev_base;
    /** Pointer to a table of function pointers to set up how this
        bufferevent behaves. */
    const struct bufferevent_ops *be_ops;

    /** A read event that triggers when a timeout has happened or a socket
        is ready to read data.  Only used by some subtypes of
        bufferevent. */
    struct event ev_read;
    /** A write event that triggers when a timeout has happened or a socket
        is ready to write data.  Only used by some subtypes of
        bufferevent. */
    struct event ev_write;

    /** An input buffer. Only the bufferevent is allowed to add data to
        this buffer, though the user is allowed to drain it. */
    struct evbuffer *input;

    /** An input buffer. Only the bufferevent is allowed to drain data
        from this buffer, though the user is allowed to add it. */
    struct evbuffer *output;

    struct event_watermark wm_read;
    struct event_watermark wm_write;

    bufferevent_data_cb readcb;
    bufferevent_data_cb writecb;
    /* This should be called ‘eventcb‘, but renaming it would break
     * backward compatibility */
    bufferevent_event_cb errorcb;
    void *cbarg;

    struct timeval timeout_read;
    struct timeval timeout_write;

    /** Events that are currently enabled: currently EV_READ and EV_WRITE
        are supported. */
    short enabled;
};

其中ev_base指向bufferevent所属的event_base ,ev_read,ev_write是读写事件,timeout_read和timeout_write是读写超时时间,readcb,writecb和errorcb分别是读回调,写回调和事件回调,因为版本兼容问题使用errorcb命名。input和output是evbuffer类型的读写缓存。enabled表示支持的事件,目前只有EV_READ 和EV_WRITE。wm_read和wm_write代表读写缓存的伐值,be_ops是bufferevent_ops类型变量,它的定义如下:

struct bufferevent_ops {
    /** The name of the bufferevent‘s type. */
    const char *type;
    /** At what offset into the implementation type will we find a
        bufferevent structure?

        Example: if the type is implemented as
        struct bufferevent_x {
           int extra_data;
           struct bufferevent bev;
        }
        then mem_offset should be offsetof(struct bufferevent_x, bev)
    */
    off_t mem_offset;

    /** Enables one or more of EV_READ|EV_WRITE on a bufferevent.  Does
        not need to adjust the ‘enabled‘ field.  Returns 0 on success, -1
        on failure.
     */
    int (*enable)(struct bufferevent *, short);

    /** Disables one or more of EV_READ|EV_WRITE on a bufferevent.  Does
        not need to adjust the ‘enabled‘ field.  Returns 0 on success, -1
        on failure.
     */
    int (*disable)(struct bufferevent *, short);

    /** Free any storage and deallocate any extra data or structures used
        in this implementation.
     */
    void (*destruct)(struct bufferevent *);

    /** Called when the timeouts on the bufferevent have changed.*/
    int (*adj_timeouts)(struct bufferevent *);

    /** Called to flush data. */
    int (*flush)(struct bufferevent *, short, enum bufferevent_flush_mode);

    /** Called to access miscellaneous fields. */
    int (*ctrl)(struct bufferevent *, enum bufferevent_ctrl_op, union bufferevent_ctrl_data *);

};

bufferevent_ops 的实现和eventop比较类似,都是定义了一组指针,不同的bufferevent实现可以自定义这些具体操作。比如bufferevent_sock的对应定义:

const struct bufferevent_ops bufferevent_ops_socket = {
    "socket",
    evutil_offsetof(struct bufferevent_private, bev),
    be_socket_enable,
    be_socket_disable,
    be_socket_destruct,
    be_socket_adj_timeouts,
    be_socket_flush,
    be_socket_ctrl,
};

接下来分析一下buffevent_sock的实现方式。

struct bufferevent *
bufferevent_new(evutil_socket_t fd,
    bufferevent_data_cb readcb, bufferevent_data_cb writecb,
    bufferevent_event_cb eventcb, void *cbarg)
{
    struct bufferevent *bufev;

    if (!(bufev = bufferevent_socket_new(NULL, fd, 0)))
        return NULL;

    bufferevent_setcb(bufev, readcb, writecb, eventcb, cbarg);

    return bufev;
}

struct bufferevent *
bufferevent_socket_new(struct event_base *base, evutil_socket_t fd,
    int options)
{
    struct bufferevent_private *bufev_p;
    struct bufferevent *bufev;

#ifdef WIN32
    if (base && event_base_get_iocp(base))
        return bufferevent_async_new(base, fd, options);
#endif

    if ((bufev_p = mm_calloc(1, sizeof(struct bufferevent_private)))== NULL)
        return NULL;

    if (bufferevent_init_common(bufev_p, base, &bufferevent_ops_socket,
                    options) < 0) {
        mm_free(bufev_p);
        return NULL;
    }
    bufev = &bufev_p->bev;
    evbuffer_set_flags(bufev->output, EVBUFFER_FLAG_DRAINS_TO_FD);

    event_assign(&bufev->ev_read, bufev->ev_base, fd,
        EV_READ|EV_PERSIST, bufferevent_readcb, bufev);
    event_assign(&bufev->ev_write, bufev->ev_base, fd,
        EV_WRITE|EV_PERSIST, bufferevent_writecb, bufev);

    evbuffer_add_cb(bufev->output, bufferevent_socket_outbuf_cb, bufev);

    evbuffer_freeze(bufev->input, 0);
    evbuffer_freeze(bufev->output, 1);

    return bufev;
}

bufferevent_socket_new方法用于创建一个bufferevent_socket类型的bufferevent。如果是在win32环境下则会默认调用iocp的对应实现,即bufferevent_async_new。该方法还会创建一个bufferevent_private变量,下面是他的定义:

struct bufferevent_private {
    /** The underlying bufferevent structure. */
    struct bufferevent bev;

    /** Evbuffer callback to enforce watermarks on input. */
    struct evbuffer_cb_entry *read_watermarks_cb;

    /** If set, we should free the lock when we free the bufferevent. */
    unsigned own_lock : 1;
    /** Flag: set if we have deferred callbacks and a read callback is
     * pending. */
    unsigned readcb_pending : 1;
    /** Flag: set if we have deferred callbacks and a write callback is
     * pending. */
    unsigned writecb_pending : 1;
    /** Flag: set if we are currently busy connecting. */
    unsigned connecting : 1;
    /** Flag: set if a connect failed prematurely; this is a hack for
     * getting around the bufferevent abstraction. */
    unsigned connection_refused : 1;
    /** Set to the events pending if we have deferred callbacks and
     * an events callback is pending. */
    short eventcb_pending;

    /** If set, read is suspended until one or more conditions are over.
     * The actual value here is a bitfield of those conditions; see the
     * BEV_SUSPEND_* flags above. */
    bufferevent_suspend_flags read_suspended;

    /** If set, writing is suspended until one or more conditions are over.
     * The actual value here is a bitfield of those conditions; see the
     * BEV_SUSPEND_* flags above. */
    bufferevent_suspend_flags write_suspended;

    /** Set to the current socket errno if we have deferred callbacks and
     * an events callback is pending. */
    int errno_pending;

    /** The DNS error code for bufferevent_socket_connect_hostname */
    int dns_error;

    /** Used to implement deferred callbacks */
    struct deferred_cb deferred;

    /** The options this bufferevent was constructed with */
    enum bufferevent_options options;

    /** Current reference count for this bufferevent. */
    int refcnt;

    /** Lock for this bufferevent.  Shared by the inbuf and the outbuf.
     * If NULL, locking is disabled. */
    void *lock;

    /** Rate-limiting information for this bufferevent */
    struct bufferevent_rate_limit *rate_limiting;
};

bufferevent_private结构的第一个变量是bufferevent类型的,其实每一个bufferevent都对应一个bufferevent_private变量,只是对用户来说是透明的,用户只需要了解bufferevent即可。bufferevent_private的变量基本都是用于记录状态。

继续分析bufferevent_socket_new函数,创建bufferevent_private之后调用bufferevent_init_common函数,这个函数定义在bufferevent.c文件中,该文件中的函数都是各个bufferevent类型通用的函数。

int bufferevent_init_common(struct bufferevent_private *bufev_private,
    struct event_base *base,
    const struct bufferevent_ops *ops,
    enum bufferevent_options options)
{
    struct bufferevent *bufev = &bufev_private->bev;

    if (!bufev->input) {
        if ((bufev->input = evbuffer_new()) == NULL)
            return -1;
    }

    if (!bufev->output) {
        if ((bufev->output = evbuffer_new()) == NULL) {
            evbuffer_free(bufev->input);
            return -1;
        }
    }

    bufev_private->refcnt = 1;
    bufev->ev_base = base;

    /* Disable timeouts. */
    evutil_timerclear(&bufev->timeout_read);
    evutil_timerclear(&bufev->timeout_write);

    bufev->be_ops = ops;

    /*
     * Set to EV_WRITE so that using bufferevent_write is going to
     * trigger a callback.  Reading needs to be explicitly enabled
     * because otherwise no data will be available.
     */
    bufev->enabled = EV_WRITE;

#ifndef _EVENT_DISABLE_THREAD_SUPPORT
    if (options & BEV_OPT_THREADSAFE) {
        if (bufferevent_enable_locking(bufev, NULL) < 0) {
            /* cleanup */
            evbuffer_free(bufev->input);
            evbuffer_free(bufev->output);
            bufev->input = NULL;
            bufev->output = NULL;
            return -1;
        }
    }
#endif
    if ((options & (BEV_OPT_DEFER_CALLBACKS|BEV_OPT_UNLOCK_CALLBACKS))
        == BEV_OPT_UNLOCK_CALLBACKS) {
        event_warnx("UNLOCK_CALLBACKS requires DEFER_CALLBACKS");
        return -1;
    }
    if (options & BEV_OPT_DEFER_CALLBACKS) {
        if (options & BEV_OPT_UNLOCK_CALLBACKS)
            event_deferred_cb_init(&bufev_private->deferred,
                bufferevent_run_deferred_callbacks_unlocked,
                bufev_private);
        else
            event_deferred_cb_init(&bufev_private->deferred,
                bufferevent_run_deferred_callbacks_locked,
                bufev_private);
    }

    bufev_private->options = options;

    evbuffer_set_parent(bufev->input, bufev);
    evbuffer_set_parent(bufev->output, bufev);

    return 0;
}

bufferevent_init_common比较简单,需要注意 bufferEvent默认开启EV_WRITE,EV_READ需要手动打开,如果需要支持多线程则为bufferevent创建锁,另外在目前的版本中,UNLOCK_CALLBACKS需要DEFER_CALLBACKS作为前置条件。

继续分析bufferevent_socket_new函数:

    evbuffer_set_flags(bufev->output, EVBUFFER_FLAG_DRAINS_TO_FD);
    event_assign(&bufev->ev_read, bufev->ev_base, fd,
        EV_READ|EV_PERSIST, bufferevent_readcb, bufev);
    event_assign(&bufev->ev_write, bufev->ev_base, fd,
        EV_WRITE|EV_PERSIST, bufferevent_writecb, bufev);

    evbuffer_add_cb(bufev->output, bufferevent_socket_outbuf_cb, bufev);

调用bufferevent_init_common之后会将bufev->output添加EVBUFFER_FLAG_DRAINS_TO_FD标记,这样output就支持从文件读取数据了,接下来设置读写事件回掉,最后添加bufferevent_socket_outbuf_cb作为output的call,当output中有数据时调用bufferevent_socket_outbuf_cb,这样就可以不必一直监听写事件,bufferevent_writecb在数据全部写出之后可以移除写事件, 然后在bufferevent_socket_outbuf_cb中判断是否需要添加事件。

初始化bufferevent之后需要调用be_socket_setfd设置网络套接字来监听读写事件:

static void
be_socket_setfd(struct bufferevent *bufev, evutil_socket_t fd)
{
    BEV_LOCK(bufev);
    EVUTIL_ASSERT(bufev->be_ops == &bufferevent_ops_socket);

    event_del(&bufev->ev_read);
    event_del(&bufev->ev_write);

    event_assign(&bufev->ev_read, bufev->ev_base, fd,
        EV_READ|EV_PERSIST, bufferevent_readcb, bufev);
    event_assign(&bufev->ev_write, bufev->ev_base, fd,
        EV_WRITE|EV_PERSIST, bufferevent_writecb, bufev);

    if (fd >= 0)
        bufferevent_enable(bufev, bufev->enabled);

    BEV_UNLOCK(bufev);
}
int bufferevent_enable(struct bufferevent *bufev, short event)
{
    struct bufferevent_private *bufev_private =
        EVUTIL_UPCAST(bufev, struct bufferevent_private, bev);
    short impl_events = event;
    int r = 0;

    _bufferevent_incref_and_lock(bufev);
    if (bufev_private->read_suspended)
        impl_events &= ~EV_READ;
    if (bufev_private->write_suspended)
        impl_events &= ~EV_WRITE;

    bufev->enabled |= event;

    if (impl_events && bufev->be_ops->enable(bufev, impl_events) < 0)
        r = -1;

    _bufferevent_decref_and_unlock(bufev);
    return r;
}

be_socket_setfd中设置的套接字必须是已经建立好连接的socket。如果是client端,bufferevent_sock提供了bufferevent_socket_connect用于连接套接字,但是如果是server端,libevent没有提供listen的相关函数,不过libevent提供了listener.c文件可以用于监听连接。

int
bufferevent_socket_connect(struct bufferevent *bev,
    struct sockaddr *sa, int socklen)
{
    struct bufferevent_private *bufev_p =
        EVUTIL_UPCAST(bev, struct bufferevent_private, bev);

    evutil_socket_t fd;
    int r = 0;
    int result=-1;
    int ownfd = 0;

    _bufferevent_incref_and_lock(bev);

    if (!bufev_p)
        goto done;

    fd = bufferevent_getfd(bev);
    if (fd < 0) {
        if (!sa)
            goto done;
        fd = socket(sa->sa_family, SOCK_STREAM, 0);
        if (fd < 0)
            goto done;
        if (evutil_make_socket_nonblocking(fd)<0)
            goto done;
        ownfd = 1;
    }
    if (sa) {
#ifdef WIN32
        if (bufferevent_async_can_connect(bev)) {
            bufferevent_setfd(bev, fd);
            r = bufferevent_async_connect(bev, fd, sa, socklen);
            if (r < 0)
                goto freesock;
            bufev_p->connecting = 1;
            result = 0;
            goto done;
        } else
#endif
        r = evutil_socket_connect(&fd, sa, socklen);
        if (r < 0)
            goto freesock;
    }
#ifdef WIN32
    /* ConnectEx() isn‘t always around, even when IOCP is enabled.
     * Here, we borrow the socket object‘s write handler to fall back
     * on a non-blocking connect() when ConnectEx() is unavailable. */
    if (BEV_IS_ASYNC(bev)) {
        event_assign(&bev->ev_write, bev->ev_base, fd,
            EV_WRITE|EV_PERSIST, bufferevent_writecb, bev);
    }
#endif
    bufferevent_setfd(bev, fd);
    if (r == 0) {
        if (! be_socket_enable(bev, EV_WRITE)) {
            bufev_p->connecting = 1;
            result = 0;
            goto done;
        }
    } else if (r == 1) {
        /* The connect succeeded already. How very BSD of it. */
        result = 0;
        bufev_p->connecting = 1;
        event_active(&bev->ev_write, EV_WRITE, 1);
    } else {
        /* The connect failed already.  How very BSD of it. */
        bufev_p->connection_refused = 1;
        bufev_p->connecting = 1;
        result = 0;
        event_active(&bev->ev_write, EV_WRITE, 1);
    }

    goto done;

freesock:
    _bufferevent_run_eventcb(bev, BEV_EVENT_ERROR);
    if (ownfd)
        evutil_closesocket(fd);
    /* do something about the error? */
done:
    _bufferevent_decref_and_unlock(bev);
    return result;
}

win32的connect使用iocp实现,将在下一节iocp章节详细分析,这里只看其他平台的实现方式。evutil_socket_connect是libevent提供的工具函数,它的返回值有三种情况:0代表正在连接,此时需要调用be_socket_enable激活写事件。当连接成功时会调用bufferevent_writecb回掉。1代表已经连接成功,此时调用event_active直接手动触发bufferevent_writecb事件处理连接。-1代表连接被拒绝,此时同样调用event_active处理连接失败情况。

下面是bufferevent_writecb的具体实现:

static void
bufferevent_writecb(evutil_socket_t fd, short event, void *arg)
{
    struct bufferevent *bufev = arg;
    struct bufferevent_private *bufev_p =
        EVUTIL_UPCAST(bufev, struct bufferevent_private, bev);
    int res = 0;
    short what = BEV_EVENT_WRITING;
    int connected = 0;
    ev_ssize_t atmost = -1;

    _bufferevent_incref_and_lock(bufev);

    if (event == EV_TIMEOUT) {
        /* Note that we only check for event==EV_TIMEOUT. If
         * event==EV_TIMEOUT|EV_WRITE, we can safely ignore the
         * timeout, since a read has occurred */
        what |= BEV_EVENT_TIMEOUT;
        goto error;
    }
    if (bufev_p->connecting) {
        int c = evutil_socket_finished_connecting(fd);
        /* we need to fake the error if the connection was refused
         * immediately - usually connection to localhost on BSD */
        if (bufev_p->connection_refused) {
          bufev_p->connection_refused = 0;
          c = -1;
        }

        if (c == 0)
            goto done;

        bufev_p->connecting = 0;
        if (c < 0) {
            event_del(&bufev->ev_write);
            event_del(&bufev->ev_read);
            _bufferevent_run_eventcb(bufev, BEV_EVENT_ERROR);
            goto done;
        } else {
            connected = 1;
#ifdef WIN32
            if (BEV_IS_ASYNC(bufev)) {
                event_del(&bufev->ev_write);
                bufferevent_async_set_connected(bufev);
                _bufferevent_run_eventcb(bufev,
                        BEV_EVENT_CONNECTED);
                goto done;
            }
#endif
            _bufferevent_run_eventcb(bufev,
                    BEV_EVENT_CONNECTED);
            if (!(bufev->enabled & EV_WRITE) ||
                bufev_p->write_suspended) {
                event_del(&bufev->ev_write);
                goto done;
            }
        }
    }

    atmost = _bufferevent_get_write_max(bufev_p);

    if (bufev_p->write_suspended)
        goto done;

    if (evbuffer_get_length(bufev->output)) {
        evbuffer_unfreeze(bufev->output, 1);
        res = evbuffer_write_atmost(bufev->output, fd, atmost);
        evbuffer_freeze(bufev->output, 1);
        if (res == -1) {
            int err = evutil_socket_geterror(fd);
            if (EVUTIL_ERR_RW_RETRIABLE(err))
                goto reschedule;
            what |= BEV_EVENT_ERROR;
        } else if (res == 0) {
            /* eof case
               XXXX Actually, a 0 on write doesn‘t indicate
               an EOF. An ECONNRESET might be more typical.
             */
            what |= BEV_EVENT_EOF;
        }
        if (res <= 0)
            goto error;

        _bufferevent_decrement_write_buckets(bufev_p, res);
    }

    if (evbuffer_get_length(bufev->output) == 0) {
        event_del(&bufev->ev_write);
    }

    /*
     * Invoke the user callback if our buffer is drained or below the
     * low watermark.
     */
    if ((res || !connected) &&
        evbuffer_get_length(bufev->output) <= bufev->wm_write.low) {
        _bufferevent_run_writecb(bufev);
    }

    goto done;

 reschedule:
    if (evbuffer_get_length(bufev->output) == 0) {
        event_del(&bufev->ev_write);
    }
    goto done;

 error:
    bufferevent_disable(bufev, EV_WRITE);
    _bufferevent_run_eventcb(bufev, what);

 done:
    _bufferevent_decref_and_unlock(bufev);
}

bufferevent_writecb需要处理两件事,一个是连接逻辑,一个是发送数据逻辑,注意当处理连接逻辑时,如果当前没有激活EV_WRITE或者处于write_suspended状态,需要删除写监听,因为当前的写监听事件只用于处理连接。另外当处理数据逻辑时如果output的数据长度为0,同样需要删除写事件,因为output的callback在有数据之后会重新添加写事件:

static void
bufferevent_socket_outbuf_cb(struct evbuffer *buf,
    const struct evbuffer_cb_info *cbinfo,
    void *arg)
{
    struct bufferevent *bufev = arg;
    struct bufferevent_private *bufev_p =
        EVUTIL_UPCAST(bufev, struct bufferevent_private, bev);

    if (cbinfo->n_added &&
        (bufev->enabled & EV_WRITE) &&
        !event_pending(&bufev->ev_write, EV_WRITE, NULL) &&
        !bufev_p->write_suspended) {
        /* Somebody added data to the buffer, and we would like to
         * write, and we were not writing.  So, start writing. */
        if (be_socket_add(&bufev->ev_write, &bufev->timeout_write) == -1) {
            /* Should we log this? */
        }
    }
}

bufferevent_writecb和bufferevent_readcb还需要调用用户自定义的读写事件:

void
_bufferevent_run_writecb(struct bufferevent *bufev)
{
    /* Requires that we hold the lock and a reference */
    struct bufferevent_private *p =
        EVUTIL_UPCAST(bufev, struct bufferevent_private, bev);
    if (bufev->writecb == NULL)
        return;
    if (p->options & BEV_OPT_DEFER_CALLBACKS) {
        p->writecb_pending = 1;
        if (!p->deferred.queued)
            SCHEDULE_DEFERRED(p);
    } else {
        bufev->writecb(bufev, bufev->cbarg);
    }
}
#define SCHEDULE_DEFERRED(bevp)                     \
    do {                                \
        bufferevent_incref(&(bevp)->bev);           \
        event_deferred_cb_schedule(             \
            event_base_get_deferred_cb_queue((bevp)->bev.ev_base), \
            &(bevp)->deferred);             \
    } while (0)

如果bufferevent的选项包含BEV_OPT_DEFER_CALLBACKS则需要使用延迟调用方式,延迟调用是在event_base的loop循环统一处理的。p->deferred.queued如果没有被设置过,则加入到ev_base的延迟调用队列。bufferevent_init_common中设置过defered对应的回掉:

    if (options & BEV_OPT_DEFER_CALLBACKS) {
        if (options & BEV_OPT_UNLOCK_CALLBACKS)
            event_deferred_cb_init(&bufev_private->deferred,
                bufferevent_run_deferred_callbacks_unlocked,
                bufev_private);
        else
            event_deferred_cb_init(&bufev_private->deferred,
                bufferevent_run_deferred_callbacks_locked,
                bufev_private);
    }

这两个回掉只是使用锁的方式不同:

static void
bufferevent_run_deferred_callbacks_locked(struct deferred_cb *_, void *arg)
{
    struct bufferevent_private *bufev_private = arg;
    struct bufferevent *bufev = &bufev_private->bev;

    BEV_LOCK(bufev);
    if ((bufev_private->eventcb_pending & BEV_EVENT_CONNECTED) &&
        bufev->errorcb) {
        /* The "connected" happened before any reads or writes, so
           send it first. */
        bufev_private->eventcb_pending &= ~BEV_EVENT_CONNECTED;
        bufev->errorcb(bufev, BEV_EVENT_CONNECTED, bufev->cbarg);
    }
    if (bufev_private->readcb_pending && bufev->readcb) {
        bufev_private->readcb_pending = 0;
        bufev->readcb(bufev, bufev->cbarg);
    }
    if (bufev_private->writecb_pending && bufev->writecb) {
        bufev_private->writecb_pending = 0;
        bufev->writecb(bufev, bufev->cbarg);
    }
    if (bufev_private->eventcb_pending && bufev->errorcb) {
        short what = bufev_private->eventcb_pending;
        int err = bufev_private->errno_pending;
        bufev_private->eventcb_pending = 0;
        bufev_private->errno_pending = 0;
        EVUTIL_SET_SOCKET_ERROR(err);
        bufev->errorcb(bufev, what, bufev->cbarg);
    }
    _bufferevent_decref_and_unlock(bufev);
}

static void
bufferevent_run_deferred_callbacks_unlocked(struct deferred_cb *_, void *arg)
{
    struct bufferevent_private *bufev_private = arg;
    struct bufferevent *bufev = &bufev_private->bev;

    BEV_LOCK(bufev);
#define UNLOCKED(stmt) \
    do { BEV_UNLOCK(bufev); stmt; BEV_LOCK(bufev); } while(0)

    if ((bufev_private->eventcb_pending & BEV_EVENT_CONNECTED) &&
        bufev->errorcb) {
        /* The "connected" happened before any reads or writes, so
           send it first. */
        bufferevent_event_cb errorcb = bufev->errorcb;
        void *cbarg = bufev->cbarg;
        bufev_private->eventcb_pending &= ~BEV_EVENT_CONNECTED;
        UNLOCKED(errorcb(bufev, BEV_EVENT_CONNECTED, cbarg));
    }
    if (bufev_private->readcb_pending && bufev->readcb) {
        bufferevent_data_cb readcb = bufev->readcb;
        void *cbarg = bufev->cbarg;
        bufev_private->readcb_pending = 0;
        UNLOCKED(readcb(bufev, cbarg));
    }
    if (bufev_private->writecb_pending && bufev->writecb) {
        bufferevent_data_cb writecb = bufev->writecb;
        void *cbarg = bufev->cbarg;
        bufev_private->writecb_pending = 0;
        UNLOCKED(writecb(bufev, cbarg));
    }
    if (bufev_private->eventcb_pending && bufev->errorcb) {
        bufferevent_event_cb errorcb = bufev->errorcb;
        void *cbarg = bufev->cbarg;
        short what = bufev_private->eventcb_pending;
        int err = bufev_private->errno_pending;
        bufev_private->eventcb_pending = 0;
        bufev_private->errno_pending = 0;
        EVUTIL_SET_SOCKET_ERROR(err);
        UNLOCKED(errorcb(bufev,what,cbarg));
    }
    _bufferevent_decref_and_unlock(bufev);
#undef UNLOCKED
}

以上就是bufferevent_sock的实现方式,下一节我们将分析IOCP的实现方式。

时间: 2024-10-17 22:25:16

Libevent源码分析(六)--- bufferevent的相关文章

libevent源码分析:bufferevent

struct bufferevent定义在文件bufferevent_struct.h中. 1 /** 2 Shared implementation of a bufferevent. 3 4 This type is exposed only because it was exposed in previous versions, 5 and some people's code may rely on manipulating it. Otherwise, you 6 should rea

Libevent源码分析-timer和signal处理

timer处理 Signal处理 timerfd和signalfd timerfd signalfd timer处理 在Libevent源码分析-event处理流程中,使用了定时器,来看一下源码: evtimer_set(&ev, time_cb, NULL);//设置定时器事件 其中evtimer_set是个宏定义 #define evtimer_set(ev, cb, arg) event_set((ev), -1, 0, (cb), (arg)) //event_set原型 void ev

Nouveau源码分析(六):NVIDIA设备初始化之nouveau_drm_load (3)

Nouveau源码分析(六) 上一篇中我们暂时忽略了两个函数,第一个是用于创建nvif_device对应的nouveau_object的ctor函数: // /drivers/gpu/drm/nouveau/core/engine/device/base.c 488 static struct nouveau_ofuncs 489 nouveau_devobj_ofuncs = { 490 .ctor = nouveau_devobj_ctor, 491 .dtor = nouveau_devo

【转】libevent源码分析

libevent源码分析 转自:http://www.cnblogs.com/hustcat/archive/2010/08/31/1814022.html 这两天没事,看了一下Memcached和libevent的源码,做个小总结. 1.入门 1.1.概述Libevent是一个用于开发可扩展性网络服务器的基于事件驱动(event-driven)模型的网络库.Libevent有几个显著的亮点: (1)事件驱动(event-driven),高性能:(2)轻量级,专注于网络,不如 ACE 那么臃肿庞

Libevent源码分析 (1) hello-world

Libevent源码分析 (1) hello-world ⑨月份接触了久闻大名的libevent,当时想读读源码,可是由于事情比较多一直没有时间,现在手头的东西基本告一段落了,我准备读读libevent的源码,凡是我觉得有必要的内容均一一记录,与君共勉. 首先要说说什么是libevent: libevent是一个事件通知库,libevent API提供一种机制使得我们可以在一个文件描述符(file descriptor)发生特定事件时或者timeout发生时执行指定的回调函数.libevent意

libevent源码分析-event

event结构 event相关接口 Libevent对event的管理 event结构 event是Reactor模式中的最重要的组件.它包含了了一个句柄fd,并设置监听这个句柄上的哪些事件(读/写等),设置了对应的函数指针,在事件到来时,回调函数指针来处理事件. 先看一下event的结构.它位于include/event2/event_struct.h中 struct event { TAILQ_ENTRY(event) ev_active_next; TAILQ_ENTRY(event) e

[libevent源码分析] event_init

libevent采用的是经典的reactor网络框架,集成了信号.定时.网络事件于一体 首先对event_init进行源码剖析 event_init 主要创建event_base对象, struct event_base { const struct eventop *evsel; //lievent支持select epoll kequeue..等网络api,包括init.add.del.dispatch的接口,每种网络框架都支持 void *evbase; //支持相应网络api的 结构对象

[libevent源码分析] event_set

libevent使用event来封装网络事件回调,参数.fd...等一些信息,函数很简单 void event_set(struct event *ev, int fd, short events, void (*callback)(int, short, void *), void *arg) { /* Take the current base - caller needs to set the real base later */ ev->ev_base = current_base; /

[libevent源码分析] event_add

event_add 把event往当前event中的ev_base追加,如果需要定时,那么tv不能为空 int event_add(struct event *ev, const struct timeval *tv) { struct event_base *base = ev->ev_base; //event_add 会把event加入到他的ev_base成员里 const struct eventop *evsel = base->evsel; //对应linux的epoll相关函数

[libevent源码分析] event_base_dispatch

分析下事件循环 event_base_dispatch int event_base_dispatch(struct event_base *event_base) { return (event_base_loop(event_base, 0)); } int event_base_loop(struct event_base *base, int flags) { const struct eventop *evsel = base->evsel; void *evbase = base->