libevent(十)bufferevent 2

接上文libevent(九)bufferevent

上文主要讲了bufferevent如何监听读事件,那么bufferevent如何监听写事件呢?


对于一个fd,只要它的写缓冲区没有满,就会触发写事件。

一般情况下,如果不向这个fd发送大量的数据,它的写缓冲区是不会满的。

所以,如果一开始就监听写事件,写事件会一直被触发。

libevent的做法是:

当我们确实要向fd写入数据时,才监听该fd的写事件。

监听写事件

在用户回调函数中,可以通过 bufferevent_write 向输出缓冲output中写数据。

int
bufferevent_write(struct bufferevent *bufev, const void *data, size_t size)
{
    if (evbuffer_add(bufev->output, data, size) == -1)
        return (-1);

    return 0;
}
/* Adds data to an event buffer */

int
evbuffer_add(struct evbuffer *buf, const void *data_in, size_t datlen)
{
    struct evbuffer_chain *chain, *tmp;
    const unsigned char *data = data_in;
    size_t remain, to_alloc;
    int result = -1;

    EVBUFFER_LOCK(buf);

    if (buf->freeze_end) {
        goto done;
    }
    /* Prevent buf->total_len overflow */
    if (datlen > EV_SIZE_MAX - buf->total_len) {
        goto done;
    }

    chain = buf->last;

    /* If there are no chains allocated for this buffer, allocate one
     * big enough to hold all the data. */
    if (chain == NULL) {
        chain = evbuffer_chain_new(datlen);
        if (!chain)
            goto done;
        evbuffer_chain_insert(buf, chain);
    }

    if ((chain->flags & EVBUFFER_IMMUTABLE) == 0) {
        /* Always true for mutable buffers */
        EVUTIL_ASSERT(chain->misalign >= 0 &&
            (ev_uint64_t)chain->misalign <= EVBUFFER_CHAIN_MAX);
        remain = chain->buffer_len - (size_t)chain->misalign - chain->off;
        if (remain >= datlen) {
            /* there‘s enough space to hold all the data in the
             * current last chain */
            memcpy(chain->buffer + chain->misalign + chain->off,
                data, datlen);
            chain->off += datlen;
            buf->total_len += datlen;
            buf->n_add_for_cb += datlen;
            goto out;
        } else if (!CHAIN_PINNED(chain) &&
            evbuffer_chain_should_realign(chain, datlen)) {
            /* we can fit the data into the misalignment */
            evbuffer_chain_align(chain);

            memcpy(chain->buffer + chain->off, data, datlen);
            chain->off += datlen;
            buf->total_len += datlen;
            buf->n_add_for_cb += datlen;
            goto out;
        }
    } else {
        /* we cannot write any data to the last chain */
        remain = 0;
    }

    /* we need to add another chain */
    to_alloc = chain->buffer_len;
    if (to_alloc <= EVBUFFER_CHAIN_MAX_AUTO_SIZE/2)
        to_alloc <<= 1;
    if (datlen > to_alloc)
        to_alloc = datlen;
    tmp = evbuffer_chain_new(to_alloc);
    if (tmp == NULL)
        goto done;

    if (remain) {
        memcpy(chain->buffer + chain->misalign + chain->off,
            data, remain);
        chain->off += remain;
        buf->total_len += remain;
        buf->n_add_for_cb += remain;
    }

    data += remain;
    datlen -= remain;

    memcpy(tmp->buffer, data, datlen);
    tmp->off = datlen;
    evbuffer_chain_insert(buf, tmp);
    buf->n_add_for_cb += datlen;

out:
    evbuffer_invoke_callbacks(buf);
    result = 0;
done:
    EVBUFFER_UNLOCK(buf);
    return result;
}

现在回顾一下bufferevent_socket_new,我们在这个函数中,设置了输出缓冲区的回调函数

evbuffer_add_cb(bufev->output, bufferevent_socket_outbuf_cb, bufev);
static void
bufferevent_socket_outbuf_cb(struct evbuffer *buf,
    const struct evbuffer_cb_info *cbinfo,
    void *arg)
{
    struct bufferevent *bufev = arg;
    struct bufferevent_private *bufev_p =
        EVUTIL_UPCAST(bufev, struct bufferevent_private, bev);

    if (cbinfo->n_added &&
        (bufev->enabled & EV_WRITE) &&
        !event_pending(&bufev->ev_write, EV_WRITE, NULL) &&
        !bufev_p->write_suspended) {
        /* Somebody added data to the buffer, and we would like to
         * write, and we were not writing.  So, start writing. */
        if (be_socket_add(&bufev->ev_write, &bufev->timeout_write) == -1) {
            /* Should we log this? */
        }
    }
}

可以看出,我们在输出缓冲区的回调函数中,将该fd的写事件添加到了epoll中。

事件流程

上面我们监听了fd的写事件,而此时该fd的写缓冲区没有满,所以写事件被触发,继而调用我们在上文设置的写事件回调函数 bufferevent_writecb。

static void
bufferevent_writecb(evutil_socket_t fd, short event, void *arg)
{
    struct bufferevent *bufev = arg;
    struct bufferevent_private *bufev_p =
        EVUTIL_UPCAST(bufev, struct bufferevent_private, bev);
    int res = 0;
    short what = BEV_EVENT_WRITING;
    int connected = 0;
    ev_ssize_t atmost = -1;

    _bufferevent_incref_and_lock(bufev);

    if (event == EV_TIMEOUT) {
        /* Note that we only check for event==EV_TIMEOUT. If
         * event==EV_TIMEOUT|EV_WRITE, we can safely ignore the
         * timeout, since a read has occurred */
        what |= BEV_EVENT_TIMEOUT;
        goto error;
    }
    if (bufev_p->connecting) {
        int c = evutil_socket_finished_connecting(fd);
        /* we need to fake the error if the connection was refused
         * immediately - usually connection to localhost on BSD */
        if (bufev_p->connection_refused) {
          bufev_p->connection_refused = 0;
          c = -1;
        }

        if (c == 0)
            goto done;

        bufev_p->connecting = 0;
        if (c < 0) {
            event_del(&bufev->ev_write);
            event_del(&bufev->ev_read);
            _bufferevent_run_eventcb(bufev, BEV_EVENT_ERROR);
            goto done;
        } else {
            connected = 1;
#ifdef WIN32
            if (BEV_IS_ASYNC(bufev)) {
                event_del(&bufev->ev_write);
                bufferevent_async_set_connected(bufev);
                _bufferevent_run_eventcb(bufev,
                        BEV_EVENT_CONNECTED);
                goto done;
            }
#endif
            _bufferevent_run_eventcb(bufev,
                    BEV_EVENT_CONNECTED);
            if (!(bufev->enabled & EV_WRITE) ||
                bufev_p->write_suspended) {
                event_del(&bufev->ev_write);
                goto done;
            }
        }
    }

    atmost = _bufferevent_get_write_max(bufev_p);

    if (bufev_p->write_suspended)
        goto done;

    if (evbuffer_get_length(bufev->output)) {
        evbuffer_unfreeze(bufev->output, 1);
        res = evbuffer_write_atmost(bufev->output, fd, atmost);
        evbuffer_freeze(bufev->output, 1);
        if (res == -1) {
            int err = evutil_socket_geterror(fd);
            if (EVUTIL_ERR_RW_RETRIABLE(err))
                goto reschedule;
            what |= BEV_EVENT_ERROR;
        } else if (res == 0) {
            /* eof case
               XXXX Actually, a 0 on write doesn‘t indicate
               an EOF. An ECONNRESET might be more typical.
             */
            what |= BEV_EVENT_EOF;
        }
        if (res <= 0)
            goto error;

        _bufferevent_decrement_write_buckets(bufev_p, res);
    }

    if (evbuffer_get_length(bufev->output) == 0) {
        event_del(&bufev->ev_write);
    }

    /*
     * Invoke the user callback if our buffer is drained or below the
     * low watermark.
     */
    if ((res || !connected) &&
        evbuffer_get_length(bufev->output) <= bufev->wm_write.low) {
        _bufferevent_run_writecb(bufev);
    }

    goto done;

 reschedule:
    if (evbuffer_get_length(bufev->output) == 0) {
        event_del(&bufev->ev_write);
    }
    goto done;

 error:
    bufferevent_disable(bufev, EV_WRITE);
    _bufferevent_run_eventcb(bufev, what);

 done:
    _bufferevent_decref_and_unlock(bufev);
}

主要做了三件事:

  1. 通过evbuffer_write_atmost将输出缓冲output中的数据写入fd中。
  2. 如果output中的数据全部处理完毕,删除写事件
  3. 调用用户定义的写事件回调函数

参考资料:

Libevent源码分析-----bufferevent工作流程探究

p.p1 { margin: 0.0px 0.0px 0.0px 0.0px; font: 14.0px Menlo; color: #93c86a }
span.s1 { }
span.s2 { color: #ffffff }
p.p1 { margin: 0.0px 0.0px 0.0px 0.0px; font: 14.0px Menlo; color: #93c86a }
span.s1 { }

时间: 2024-10-13 19:43:20

libevent(十)bufferevent 2的相关文章

libevent(九)bufferevent

接上文: libevent(八)bufferevent 在用户的回调函数中,通过bufferevent_read从输入缓冲input中读数据,相应地,通过bufferevent_write向输出缓冲output中写数据. 关于写事件,这里要多说几句. 对于一个fd,只要它的写缓冲区没有满,就会触发写事件.一般情况下,如果不向这个fd发送大量的数据,它的写缓冲区是不会满的. 所以如果一开始就监听写事件,那么写事件会一直被触发. libevent的做法是:当我们确实要写入数据时,才监听写事件. 下面

libevent(十二)信号事件

libevent通过socketpair实现对信号事件的监听. event_base结构体有个成员sig,结构如下: struct evsig_info { /* Event watching ev_signal_pair[1] */ struct event ev_signal; /* Socketpair used to send notifications from the signal handler */ evutil_socket_t ev_signal_pair[2]; /* Tr

libevent(十)evhttp

用libevent构建一个http server非常方便,可参考libevent(七)http server. 主要涉及的一个结构体是 evhttp: struct evhttp { /* Next vhost, if this is a vhost. */ TAILQ_ENTRY(evhttp) next_vhost; /* All listeners for this host */ TAILQ_HEAD(boundq, evhttp_bound_socket) sockets; TAILQ

libevent+bufferevent总结

libevent+bufferevent总结 1 学习参考网址 libevent学习网址:http://blog.csdn.net/feitianxuxue/article/details/9372535 http://www.cnblogs.com/hustcat/archive/2010/08/31/1814022.html http://www.cppblog.com/mysileng/archive/2013/02/04/197719.html bufferevent学习网址:http:

libevent粘包分包解决方案:bufferevent + evbuffer

转自:http://blog.sina.com.cn/s/blog_9f1496990102vshz.html 原文:http://www.lvtao.net/c/631.html Libevent介绍 libevent是一个事件触发的网络库,适用于windows.linux.bsd等多种平台,内部使用select.epoll.kqueue等系统调用管理事件机制.著名分布式缓存软件memcached也是libevent based,而且libevent在使用上可以做到跨平台,而且根据libeve

PHP网络编程之深入Libevent(十五节)

大家周末好,这里有趣有用广告少的公众号高性能API社区,我是老李,本文属于<PHP网络编程>系列中的一个章节. 前两天老孟跟我说: 毫不要脸地说,我写的这些文章都不属于快餐消耗品,你不动手亲自实践是压根搞不定的,哪儿有那么容易就能得到的认知啊!况且我讲的并不全,有很多资料知识是需要你自己搜索补充的.而且老李自认为很少在公众号里瞎TM发没用的文章,几乎篇篇都是干货.水很少.很紧致,老铁们啊,听我一句劝: 春宵一刻值千金,绝知此事要躬行 我看了一下<PHP网络编程>整本书的整体进度,由

libevent网络编程汇总

libevent源码剖析: ========================================================== 1.libevent源码剖析一(序) 2.libevent源码剖析二(Reactor框架) 3.libevent源码剖析三(基础使用) 4.libevent源码剖析四(代码组织) 5.libevent源码剖析五(核心:event) 6.libevent源码剖析六(事件处理:event_base) 7.libevent源码剖析七(事件主循环) 8.lib

libevent学习七

Bufferevents:概念和基础 很多时候,一个程序需要处理一些数据的缓存,不止应用在答复event上.例如:当我们需要去写出数据,通常会这样做: 1. 发现有数据需要写出到一条连接上:把这些数据放到buffer里. 2. 等连接变成可写的状态. 3. 尽可能的写入数据. 4. 记住我们写了多少数据,然后如果数据没有全部写完,就等连接再次变为可写的状态. 这种IO缓冲模式已经足够Libevent的日常使用.一个"bufferevent"是由一个底层传输渠道(如socket),一个读

项目中的Libevent(多线程)

多线程版Libevent //保存线程的结构体 struct LibeventThread { LibEvtServer* that; //用作传参 std::shared_ptr<std::thread> spThread; // 线程 struct event_base * thread_base; // 事件根基 struct event notify_event; evutil_socket_t notfiy_recv_fd; // socketpair 接收端fd(工作线程接收通知)