kqueue示例

网络服务器通常都使用epoll进行异步IO处理,而开发者通常使用mac,为了方便开发,我把自己的handy库移植到了mac平台上。移植过程中,网上居然没有搜到kqueue的使用例子,让我惊讶不已。为了让大家不用像我一样再次花费大力气搞定kqueue,我整理了一个简单清晰可运行的kqueue例子,供大家参考。

kqueue一共有几个函数:

int  kqueue(void); //类似epoll_create
int kevent(int kq, const struct kevent *changelist, int nchanges, struct kevent *eventlist, int nevents, const struct timespec *timeout); //兼具epoll_ctl及epoll_wait功能
EV_SET(&kev, ident, filter, flags, fflags, data, udata); //设定kevent参数的宏
struct kevent {
     uintptr_t       ident;          /* identifier for this event */
     int16_t         filter;         /* filter for event */
     uint16_t        flags;          /* general flags */
     uint32_t        fflags;         /* filter-specific flags */
     intptr_t        data;           /* filter-specific data */
     void            *udata;         /* opaque user data identifier */
};

函数调用示例:

    //创建kqueue
    int epollfd = kqueue();
    //添加或者修改fd
    struct kevent ev[2];
    int n = 0;
    if (events & kReadEvent) {
        EV_SET(&ev[n++], fd, EVFILT_READ, EV_ADD|EV_ENABLE, 0, 0, (void*)(intptr_t)fd);
    } else if (modify){
        EV_SET(&ev[n++], fd, EVFILT_READ, EV_DELETE, 0, 0, (void*)(intptr_t)fd);
    }
    if (events & kWriteEvent) {
        EV_SET(&ev[n++], fd, EVFILT_WRITE, EV_ADD|EV_ENABLE, 0, 0, (void*)(intptr_t)fd);
    } else if (modify){
        EV_SET(&ev[n++], fd, EVFILT_WRITE, EV_DELETE, 0, 0, (void*)(intptr_t)fd);
    }
    printf("%s fd %d events read %d write %d\n",
           modify ? "mod" : "add", fd, events & kReadEvent, events & kWriteEvent);
    int r = kevent(efd, ev, n, NULL, 0, NULL);
    //获取ready的fd
    struct timespec timeout;
    timeout.tv_sec = waitms / 1000;
    timeout.tv_nsec = (waitms % 1000) * 1000 * 1000;
    const int kMaxEvents = 20;
    struct kevent activeEvs[kMaxEvents];
    int n = kevent(efd, NULL, 0, activeEvs, kMaxEvents, &timeout);
    //处理IO事件
    for (int i = 0; i < n; i ++) {
        int fd = (int)(intptr_t)activeEvs[i].udata;
        int events = activeEvs[i].filter;
        if (events == EVFILT_READ) {
            handleRead(efd, fd);
        } else if (events == EVFILT_WRITE) {
            handleWrite(efd, fd);
        }
    }

注意kevent与epoll最大的不同在于READ/WRITE事件是分开注册并且分开返回的,而Epoll则是一个fd一次返回读和写事件,用标志位来判断。

可以运行的代码如下:kqueue-example(handy对kqueue提供了封装版本)

#include <sys/socket.h>
#include <sys/event.h>
#include <netinet/in.h>
#include <arpa/inet.h>
#include <fcntl.h>
#include <unistd.h>
#include <stdio.h>
#include <errno.h>
#include <string.h>
#include <stdlib.h>

#define exit_if(r, ...) if(r) {printf(__VA_ARGS__); printf("error no: %d error msg %s\n", errno, strerror(errno)); exit(1);}

const int kReadEvent = 1;
const int kWriteEvent = 2;

void setNonBlock(int fd) {
    int flags = fcntl(fd, F_GETFL, 0);
    exit_if(flags<0, "fcntl failed");
    int r = fcntl(fd, F_SETFL, flags | O_NONBLOCK);
    exit_if(r<0, "fcntl failed");
}

void updateEvents(int efd, int fd, int events, bool modify) {
    struct kevent ev[2];
    int n = 0;
    if (events & kReadEvent) {
        EV_SET(&ev[n++], fd, EVFILT_READ, EV_ADD|EV_ENABLE, 0, 0, (void*)(intptr_t)fd);
    } else if (modify){
        EV_SET(&ev[n++], fd, EVFILT_READ, EV_DELETE, 0, 0, (void*)(intptr_t)fd);
    }
    if (events & kWriteEvent) {
        EV_SET(&ev[n++], fd, EVFILT_WRITE, EV_ADD|EV_ENABLE, 0, 0, (void*)(intptr_t)fd);
    } else if (modify){
        EV_SET(&ev[n++], fd, EVFILT_WRITE, EV_DELETE, 0, 0, (void*)(intptr_t)fd);
    }
    printf("%s fd %d events read %d write %d\n",
           modify ? "mod" : "add", fd, events & kReadEvent, events & kWriteEvent);
    int r = kevent(efd, ev, n, NULL, 0, NULL);
    exit_if(r, "kevent failed ");
}

void handleAccept(int efd, int fd) {
    struct sockaddr_in raddr;
    socklen_t rsz = sizeof(raddr);
    int cfd = accept(fd,(struct sockaddr *)&raddr,&rsz);
    exit_if(cfd<0, "accept failed");
    sockaddr_in peer, local;
    socklen_t alen = sizeof(peer);
    int r = getpeername(cfd, (sockaddr*)&peer, &alen);
    exit_if(r<0, "getpeername failed");
    printf("accept a connection from %s\n", inet_ntoa(raddr.sin_addr));
    setNonBlock(cfd);
    updateEvents(efd, cfd, kReadEvent|kWriteEvent, false);
}

void handleRead(int efd, int fd) {
    char buf[4096];
    int n = 0;
    while ((n=::read(fd, buf, sizeof buf)) > 0) {
        printf("read %d bytes\n", n);
        int r = ::write(fd, buf, n); //写出读取的数据
        //实际应用中,写出数据可能会返回EAGAIN,此时应当监听可写事件,当可写时再把数据写出
        exit_if(r<=0, "write error");
    }
    if (n<0 && (errno == EAGAIN || errno == EWOULDBLOCK))
        return;
    exit_if(n<0, "read error"); //实际应用中,n<0应当检查各类错误,如EINTR
    printf("fd %d closed\n", fd);
    close(fd);
}

void handleWrite(int efd, int fd) {
    //实际应用应当实现可写时写出数据,无数据可写才关闭可写事件
    updateEvents(efd, fd, kReadEvent, true);
}

void loop_once(int efd, int lfd, int waitms) {
    struct timespec timeout;
    timeout.tv_sec = waitms / 1000;
    timeout.tv_nsec = (waitms % 1000) * 1000 * 1000;
    const int kMaxEvents = 20;
    struct kevent activeEvs[kMaxEvents];
    int n = kevent(efd, NULL, 0, activeEvs, kMaxEvents, &timeout);
    printf("epoll_wait return %d\n", n);
    for (int i = 0; i < n; i ++) {
        int fd = (int)(intptr_t)activeEvs[i].udata;
        int events = activeEvs[i].filter;
        if (events == EVFILT_READ) {
            if (fd == lfd) {
                handleAccept(efd, fd);
            } else {
                handleRead(efd, fd);
            }
        } else if (events == EVFILT_WRITE) {
            handleWrite(efd, fd);
        } else {
            exit_if(1, "unknown event");
        }
    }
}

int main() {
    short port = 99;
    int epollfd = kqueue();
    exit_if(epollfd < 0, "epoll_create failed");
    int listenfd = socket(AF_INET, SOCK_STREAM, 0);
    exit_if(listenfd < 0, "socket failed");
    struct sockaddr_in addr;
    memset(&addr, 0, sizeof addr);
    addr.sin_family = AF_INET;
    addr.sin_port = htons(port);
    addr.sin_addr.s_addr = INADDR_ANY;
    int r = ::bind(listenfd,(struct sockaddr *)&addr, sizeof(struct sockaddr));
    exit_if(r, "bind to 0.0.0.0:%d failed %d %s", port, errno, strerror(errno));
    r = listen(listenfd, 20);
    exit_if(r, "listen failed %d %s", errno, strerror(errno));
    printf("fd %d listening at %d\n", listenfd, port);
    setNonBlock(listenfd);
    updateEvents(epollfd, listenfd, kReadEvent, false);
    for (;;) { //实际应用应当注册信号处理函数,退出时清理资源
        loop_once(epollfd, listenfd, 10000);
    }
    return 0;
}

版权声明:本文为博主原创文章,未经博主允许不得转载。

时间: 2024-08-14 07:29:59

kqueue示例的相关文章

(转)libevent介绍及示例

一.Libevent简介 libevent是一个基于事件触发的网络库,适用于windows.linux.bsd等多种平台,内部使用select.epoll.kqueue等系统调用管理事件机制.官网:http://libevent.org/ 特点: 事件驱动,高性能; 轻量级,专注于网络,不如ACE那么臃肿庞大,只提供了简单的网络API的封装,线程池,内存池,递归锁等均需要自己实现: 开放源码,代码相当精炼.易读: 跨平台,支持Windows.Linux.BSD和Mac OS: 支持多种I/O多路

Python Web框架Tornado的异步处理代码示例

1. What is Tornado Tornado是一个轻量级但高性能的Python web框架,与另一个流行的Python web框架Django相比,tornado不提供操作数据库的ORM接口及严格的MVC开发模式,但可以提供基本的web server功能,故它是轻量级的:它借助non-blocking and event-driven的I/O模型(epoll或kqueue)实现了一套异步网络库,故它是高性能的. Tornado的轻量级+高性能特性使得它特别适用于提供web api的场合,

pfsense Web服务器负载平衡配置示例

在pfsense的网关和服务器中有两种类型的负载平衡功能.网关负载平衡可以通过多个WAN连接分发Internet绑定的流量.服务器负载平衡管理传入流量,因此它利用多个内部服务器进行负载分配和冗余,服务器负载平衡允许流量在多个内部服务器之间分配,它最常用于Web服务器和SMTP服务器.下面我们就以实例来介绍服务器负载平衡的设置. 下面介绍如何通过pfsense2.32配置Web服务器的负载平衡. 网络环境 服务器负载平衡示例网络环境 上图为示例网络环境.它由单个防火墙组成,使用其WAN IP地址池

docker深入2-API示例

2017/9/18 一.目的 演示 http API 使用的方式 注1:本次实例是在 docker swarm mode 下使用的,目的是:更新指定服务的镜像. 注2:要在 swarm manager node 上执行. docker 的 API 文档是自动生成的,没有太多有用的示例可用. [版本] ~]# docker version Client:  Version:      17.06.0-ce  API version:  1.30  Go version:   go1.8.3  Gi

Storm入门(四)WordCount示例

Storm API文档网址如下: http://storm.apache.org/releases/current/javadocs/index.html 一.关联代码 使用maven,代码如下. pom.xml  和Storm入门(三)HelloWorld示例相同 RandomSentenceSpout.java /** * Licensed to the Apache Software Foundation (ASF) under one * or more contributor lice

java第15章示例代码

import java.util.Scanner; /** * * @author asus第15章示例代码1 全桂群2017.4.9 * */public class Registter { /** * @param args */ public static void main(String[] args) { // TODO Auto-generated method stub Scanner input = new Scanner(System.in); String uname, pw

算法之冒泡排序(Java示例)

冒泡排序(英语:Bubble Sort) 是一种简单的排序算法.它重复地走访过要排序的数列,一次比较两个元素,如果他们的顺序错误就把他们交换过来.走访数列的工作是重复地进行直到没有再需要交换,也就是说该数列已经排序完成.这个算法的名字由来是因为越小的元素会经由交换慢慢"浮"到数列的顶端. 动画示意 实现示例 Java 1 public class BubbleSortExample { 2 3 static void bubbleSort(int[] arr){ 4 int len =

Java多线程系列--“JUC锁”11之 Semaphore信号量的原理和示例

概要 本章,我们对JUC包中的信号量Semaphore进行学习.内容包括:Semaphore简介Semaphore数据结构Semaphore源码分析(基于JDK1.7.0_40)Semaphore示例 转载请注明出处:http://www.cnblogs.com/skywang12345/p/3534050.html Semaphore简介 Semaphore是一个计数信号量,它的本质是一个"共享锁". 信号量维护了一个信号量许可集.线程可以通过调用acquire()来获取信号量的许可

Java多线程系列--“JUC锁”10之 CyclicBarrier原理和示例

概要 本章介绍JUC包中的CyclicBarrier锁.内容包括:CyclicBarrier简介CyclicBarrier数据结构CyclicBarrier源码分析(基于JDK1.7.0_40)CyclicBarrier示例 转载请注明出处:http://www.cnblogs.com/skywang12345/p/3533995.html CyclicBarrier简介 CyclicBarrier是一个同步辅助类,允许一组线程互相等待,直到到达某个公共屏障点 (common barrier p