libevent(九)bufferevent

接上文: libevent(八)bufferevent

在用户的回调函数中,通过bufferevent_read从输入缓冲input中读数据,相应地,通过bufferevent_write向输出缓冲output中写数据。

关于写事件,这里要多说几句。

对于一个fd,只要它的写缓冲区没有满,就会触发写事件。一般情况下,如果不向这个fd发送大量的数据,它的写缓冲区是不会满的。

所以如果一开始就监听写事件,那么写事件会一直被触发。

libevent的做法是:当我们确实要写入数据时,才监听写事件。

下面是bufferevent_write的实现:

int
bufferevent_write(struct bufferevent *bufev, const void *data, size_t size)
{
    if (evbuffer_add(bufev->output, data, size) == -1)
        return (-1);

    return 0;
}
/* Adds data to an event buffer */

int
evbuffer_add(struct evbuffer *buf, const void *data_in, size_t datlen)
{
    struct evbuffer_chain *chain, *tmp;
    const unsigned char *data = data_in;
    size_t remain, to_alloc;
    int result = -1;

    EVBUFFER_LOCK(buf);

    if (buf->freeze_end) {
        goto done;
    }
    /* Prevent buf->total_len overflow */
    if (datlen > EV_SIZE_MAX - buf->total_len) {
        goto done;
    }

    chain = buf->last;

    /* If there are no chains allocated for this buffer, allocate one
     * big enough to hold all the data. */
    if (chain == NULL) {
        chain = evbuffer_chain_new(datlen);
        if (!chain)
            goto done;
        evbuffer_chain_insert(buf, chain);
    }

    if ((chain->flags & EVBUFFER_IMMUTABLE) == 0) {
        /* Always true for mutable buffers */
        EVUTIL_ASSERT(chain->misalign >= 0 &&
            (ev_uint64_t)chain->misalign <= EVBUFFER_CHAIN_MAX);
        remain = chain->buffer_len - (size_t)chain->misalign - chain->off;
        if (remain >= datlen) {
            /* there‘s enough space to hold all the data in the
             * current last chain */
            memcpy(chain->buffer + chain->misalign + chain->off,
                data, datlen);
            chain->off += datlen;
            buf->total_len += datlen;
            buf->n_add_for_cb += datlen;
            goto out;
        } else if (!CHAIN_PINNED(chain) &&
            evbuffer_chain_should_realign(chain, datlen)) {
            /* we can fit the data into the misalignment */
            evbuffer_chain_align(chain);

            memcpy(chain->buffer + chain->off, data, datlen);
            chain->off += datlen;
            buf->total_len += datlen;
            buf->n_add_for_cb += datlen;
            goto out;
        }
    } else {
        /* we cannot write any data to the last chain */
        remain = 0;
    }

    /* we need to add another chain */
    to_alloc = chain->buffer_len;
    if (to_alloc <= EVBUFFER_CHAIN_MAX_AUTO_SIZE/2)
        to_alloc <<= 1;
    if (datlen > to_alloc)
        to_alloc = datlen;
    tmp = evbuffer_chain_new(to_alloc);
    if (tmp == NULL)
        goto done;

    if (remain) {
        memcpy(chain->buffer + chain->misalign + chain->off,
            data, remain);
        chain->off += remain;
        buf->total_len += remain;
        buf->n_add_for_cb += remain;
    }

    data += remain;
    datlen -= remain;

    memcpy(tmp->buffer, data, datlen);
    tmp->off = datlen;
    evbuffer_chain_insert(buf, tmp);
    buf->n_add_for_cb += datlen;

out:
    evbuffer_invoke_callbacks(buf);
    result = 0;
done:
    EVBUFFER_UNLOCK(buf);
    return result;
}

有一点要注意的是:

evbuffer_add将用户数据写入输出缓冲output之后,还要通过evbuffer_invoke_callbacks调用回调函数。

在上文libevent(八)bufferevent中,我们通过bufferevent_socket_new创建bufferevent的时候,设置了输出缓冲output的回调函数为bufferevent_socket_outbuf_cb,此时就派上用场了。

static void
bufferevent_socket_outbuf_cb(struct evbuffer *buf,
    const struct evbuffer_cb_info *cbinfo,
    void *arg)
{
    struct bufferevent *bufev = arg;
    struct bufferevent_private *bufev_p =
        EVUTIL_UPCAST(bufev, struct bufferevent_private, bev);

    if (cbinfo->n_added &&
        (bufev->enabled & EV_WRITE) &&
        !event_pending(&bufev->ev_write, EV_WRITE, NULL) &&
        !bufev_p->write_suspended) {
        /* Somebody added data to the buffer, and we would like to
         * write, and we were not writing.  So, start writing. */
        if (be_socket_add(&bufev->ev_write, &bufev->timeout_write) == -1) {
            /* Should we log this? */
        }
    }
}

可以看到,在bufferevent_socket_outbuf_cb中,我们将写事件添加到了event_base中。

总结一下bufferevent_write:

1. 将用户数据写入输出缓冲区output

2. 添加写事件到事件循环

由于此时fd的写缓冲区没有满,所以写事件被触发,开始执行写事件的回调函数bufferevent_writecb

static void
bufferevent_writecb(evutil_socket_t fd, short event, void *arg)
{
    struct bufferevent *bufev = arg;
    struct bufferevent_private *bufev_p =
        EVUTIL_UPCAST(bufev, struct bufferevent_private, bev);
    int res = 0;
    short what = BEV_EVENT_WRITING;
    int connected = 0;
    ev_ssize_t atmost = -1;

    _bufferevent_incref_and_lock(bufev);

    if (event == EV_TIMEOUT) {
        /* Note that we only check for event==EV_TIMEOUT. If
         * event==EV_TIMEOUT|EV_WRITE, we can safely ignore the
         * timeout, since a read has occurred */
        what |= BEV_EVENT_TIMEOUT;
        goto error;
    }
    if (bufev_p->connecting) {
        int c = evutil_socket_finished_connecting(fd);
        /* we need to fake the error if the connection was refused
         * immediately - usually connection to localhost on BSD */
        if (bufev_p->connection_refused) {
          bufev_p->connection_refused = 0;
          c = -1;
        }

        if (c == 0)
            goto done;

        bufev_p->connecting = 0;
        if (c < 0) {
            event_del(&bufev->ev_write);
            event_del(&bufev->ev_read);
            _bufferevent_run_eventcb(bufev, BEV_EVENT_ERROR);
            goto done;
        } else {
            connected = 1;
#ifdef WIN32
            if (BEV_IS_ASYNC(bufev)) {
                event_del(&bufev->ev_write);
                bufferevent_async_set_connected(bufev);
                _bufferevent_run_eventcb(bufev,
                        BEV_EVENT_CONNECTED);
                goto done;
            }
#endif
            _bufferevent_run_eventcb(bufev,
                    BEV_EVENT_CONNECTED);
            if (!(bufev->enabled & EV_WRITE) ||
                bufev_p->write_suspended) {
                event_del(&bufev->ev_write);
                goto done;
            }
        }
    }

    atmost = _bufferevent_get_write_max(bufev_p);

    if (bufev_p->write_suspended)
        goto done;

    if (evbuffer_get_length(bufev->output)) {
        evbuffer_unfreeze(bufev->output, 1);
        res = evbuffer_write_atmost(bufev->output, fd, atmost);
        evbuffer_freeze(bufev->output, 1);
        if (res == -1) {
            int err = evutil_socket_geterror(fd);
            if (EVUTIL_ERR_RW_RETRIABLE(err))
                goto reschedule;
            what |= BEV_EVENT_ERROR;
        } else if (res == 0) {
            /* eof case
               XXXX Actually, a 0 on write doesn‘t indicate
               an EOF. An ECONNRESET might be more typical.
             */
            what |= BEV_EVENT_EOF;
        }
        if (res <= 0)
            goto error;

        _bufferevent_decrement_write_buckets(bufev_p, res);
    }

    if (evbuffer_get_length(bufev->output) == 0) {
        event_del(&bufev->ev_write);
    }

    /*
     * Invoke the user callback if our buffer is drained or below the
     * low watermark.
     */
    if ((res || !connected) &&
        evbuffer_get_length(bufev->output) <= bufev->wm_write.low) {
        _bufferevent_run_writecb(bufev);
    }

    goto done;

 reschedule:
    if (evbuffer_get_length(bufev->output) == 0) {
        event_del(&bufev->ev_write);
    }
    goto done;

 error:
    bufferevent_disable(bufev, EV_WRITE);
    _bufferevent_run_eventcb(bufev, what);

 done:
    _bufferevent_decref_and_unlock(bufev);
}

三个注意点:

1. 调用evbuffer_write_atmost写数据

  evbuffer_write_atmost通过write系统调用将数据写入fd写缓冲区。

2. 如果数据写完,删除写事件

3. 最后会调用用户定义的回调函数

整个写事件流程如下:

1. 用户主动调用

p.p1 { margin: 0.0px 0.0px 0.0px 0.0px; font: 14.0px Menlo; color: #93c86a }
span.s1 { }
p.p1 { margin: 0.0px 0.0px 0.0px 0.0px; font: 14.0px Menlo; color: #93c86a }
span.s1 { }

时间: 2024-12-12 20:39:26

libevent(九)bufferevent的相关文章

libevent(十)bufferevent 2

接上文libevent(九)bufferevent 上文主要讲了bufferevent如何监听读事件,那么bufferevent如何监听写事件呢? 对于一个fd,只要它的写缓冲区没有满,就会触发写事件. 一般情况下,如果不向这个fd发送大量的数据,它的写缓冲区是不会满的. 所以,如果一开始就监听写事件,写事件会一直被触发. libevent的做法是: 当我们确实要向fd写入数据时,才监听该fd的写事件. 监听写事件 在用户回调函数中,可以通过 bufferevent_write 向输出缓冲out

libevent+bufferevent总结

libevent+bufferevent总结 1 学习参考网址 libevent学习网址:http://blog.csdn.net/feitianxuxue/article/details/9372535 http://www.cnblogs.com/hustcat/archive/2010/08/31/1814022.html http://www.cppblog.com/mysileng/archive/2013/02/04/197719.html bufferevent学习网址:http:

libevent粘包分包解决方案:bufferevent + evbuffer

转自:http://blog.sina.com.cn/s/blog_9f1496990102vshz.html 原文:http://www.lvtao.net/c/631.html Libevent介绍 libevent是一个事件触发的网络库,适用于windows.linux.bsd等多种平台,内部使用select.epoll.kqueue等系统调用管理事件机制.著名分布式缓存软件memcached也是libevent based,而且libevent在使用上可以做到跨平台,而且根据libeve

libevent网络编程汇总

libevent源码剖析: ========================================================== 1.libevent源码剖析一(序) 2.libevent源码剖析二(Reactor框架) 3.libevent源码剖析三(基础使用) 4.libevent源码剖析四(代码组织) 5.libevent源码剖析五(核心:event) 6.libevent源码剖析六(事件处理:event_base) 7.libevent源码剖析七(事件主循环) 8.lib

libevent学习七

Bufferevents:概念和基础 很多时候,一个程序需要处理一些数据的缓存,不止应用在答复event上.例如:当我们需要去写出数据,通常会这样做: 1. 发现有数据需要写出到一条连接上:把这些数据放到buffer里. 2. 等连接变成可写的状态. 3. 尽可能的写入数据. 4. 记住我们写了多少数据,然后如果数据没有全部写完,就等连接再次变为可写的状态. 这种IO缓冲模式已经足够Libevent的日常使用.一个"bufferevent"是由一个底层传输渠道(如socket),一个读

项目中的Libevent(多线程)

多线程版Libevent //保存线程的结构体 struct LibeventThread { LibEvtServer* that; //用作传参 std::shared_ptr<std::thread> spThread; // 线程 struct event_base * thread_base; // 事件根基 struct event notify_event; evutil_socket_t notfiy_recv_fd; // socketpair 接收端fd(工作线程接收通知)

libevent(1)

很多时候,除了响应事件之外,应用还希望做一定的数据缓冲.比如说,写入数据的时候,通常的运行模式是: l 决定要向连接写入一些数据,把数据放入到缓冲区中 l 等待连接可以写入 l 写入尽量多的数据 l 记住写入了多少数据,如果还有更多数据要写入,等待连接再次可以写入 这种缓冲IO模式很通用,libevent为此提供了一种通用机制,即bufferevent.bufferevent由一个底层的传输端口(如套接字),一个读取缓冲区和一个写入缓冲区组成.与通常的事件在底层传输端口已经就绪,可以读取或者写入

reactor模型框架图和流程图 libevent

学习libevent有助于提升程序设计功力,除了网络程序设计方面外,libevent的代码里有很多有用的设计技巧和基础数据结构,比如信息隐藏.函数指针.c语言的多态支持.链表和堆等等,都有助于提升自身的程序功力.       程序设计不止要了解框架,很多细节之处恰恰也是事关整个系统成败的关键.只对libevent本身的框架大概了解,那或许仅仅是一知半解,不深入代码分析,就难以了解其设计的精巧之处,也就难以为自己所用.       事实上libevent本身就是一个典型的Reactor模型,理解R

libevent学习七(bufferevent)

1. 每个bufferevent 都拥有类型为struct evbuffer的input buffer和out buffer,分别供数据读取和数据写入使用. 2.读取和写入数据是通过编写和设置对应的回调函数进行,而调用回调函数的时机则根据水位是否满足来的,水位又是可以设置的.默认情况下读的低水位是0,就是说libevent从底层读到大于0的数据到input buffer中,读回调函数就会调用,读回调函数读取input buffer的数据:同样默认的写水位也为0,就是说一旦output buffe