项目中的Libevent(多线程)

多线程版Libevent
//保存线程的结构体
struct LibeventThread
{
    LibEvtServer* that;                            //用作传参
    std::shared_ptr<std::thread>   spThread;    // 线程
    struct event_base * thread_base;            // 事件根基
    struct event   notify_event;
    evutil_socket_t  notfiy_recv_fd;            // socketpair 接收端fd(工作线程接收通知)
    evutil_socket_t  notfiy_send_fd;            // socketpair 发送端fd(监听线程发送通知)
#ifdef BOOST_LOCKFREE
    boost::lockfree::spsc_queue<conn_queue_item, boost::lockfree::capacity<1000> > conn_queue;
#else
    std::mutex conn_mtx;                        //维护连接队列的锁
    std::queue<conn_queue_item>  conn_queue;    //conn_queue 是一个管理conn_queue_item的队列
#endif
};

bool LibEvtServer::init(I_NetServerEvent* event, int start, int size)
{
    m_ids = new ChannelIDGenerator();
    m_ids->init(start, size);
    m_allChannels.resize(m_ids->getSize());

    m_event = event;

    //event支持windows下线程的函数
    int hr = evthread_use_windows_threads();
    m_base = event_base_new();
    if (!m_base) {
        fprintf(stderr, "Could not initialize libevent!\n");
        return false;
    }
#ifdef MUL_LIBEVENT_THREAD
    m_last_thread = -1; //注意初始化为-1
    //初始化线程
    init_threads(THREAD_NUMB);
#endif
    return true;
}

bool LibEvtServer::init_threads(int thread_numb)
{
    m_libevent_threads.resize(thread_numb);

    //为每个线程指定双向通道(类似于管道)
    for(int i = 0; i < thread_numb; ++i)
    {

        LibeventThread* plt = new LibeventThread();
#ifdef WIN32
        //创建一个socketpair即可与互相通信的两个socket,保存在fds里面
        evutil_socket_t fds[2];
        if(evutil_socketpair(AF_INET, SOCK_STREAM, 0, fds) < 0)
        {
            std::cout << "创建socketpair失败\n";
            return false;
        }
        //设置成无阻赛的socket
        evutil_make_socket_nonblocking(fds[0]);
        evutil_make_socket_nonblocking(fds[1]);
#else
        int fds[2];
        if (pipe(fds)) {
            perror("Can‘t create notify pipe");
            exit(1);
        }
#endif
        plt->notfiy_recv_fd = fds[0];
        plt->notfiy_send_fd = fds[1];

        //安装libevent线程[创建base,注册通道事件(用于监听新链接)]
        setup_libevent_thread(plt);

        //线程放入容器中
        m_libevent_threads[i] = plt;
    }

    //开始创建并启动线程
    for(int i = 0; i < thread_numb; ++i)
    {
        m_libevent_threads[i]->spThread.reset(new std::thread([]
        (void* arg)
        {
            auto me = (LibeventThread*) arg;
            //  Wait for events to become active, and run their callbacks.
            //This is a more flexible version of event_base_dispatch().
            event_base_loop(me->thread_base, 0);
        }, m_libevent_threads[i]));
    }
    return true;
}

//设置线程信息
void LibEvtServer::setup_libevent_thread(LibeventThread * pLibeventThread)
{
    auto plt = pLibeventThread;
    plt->thread_base = event_base_new(); // 创建线程的event_base

    //给每个libevent线程设置连接通知回调函数。
    plt->that = this;
    //设置线程事件notify_event
    event_set(&plt->notify_event, plt->notfiy_recv_fd,//EV_READ表示只要这个socket可读就调用notify_cb函数
        EV_READ | EV_PERSIST, ::notify_cb, plt);
    //设置事件和event_base的关系
    event_base_set(plt->thread_base, &plt->notify_event); // 设置事件的从属关系(相当于指明事件属于哪个event_base)
    //添加事件
    event_add(&plt->notify_event, 0); // 正式添加事件
}

void LibEvtServer::listener_cb(struct evconnlistener *listener, evutil_socket_t fd,
    struct sockaddr *sa, int socklen, void *user_data)
{
#ifdef MUL_LIBEVENT_THREAD
    int cur_thread = (m_last_thread + 1) %  THREAD_NUMB; // 轮循选择工作线程
    m_last_thread = cur_thread;

    conn_queue_item item;
    item.fd = fd;
    //item.ch2 = NULL;

    auto  plt = m_libevent_threads[cur_thread];
    {
        //向线程的队列中放入一个item,每个线程有个队列,保存连接的socketfd
#ifdef BOOST_LOCKFREE
        while(!plt->conn_queue.push(item))
        {
#ifndef _DEBUG
            boost::this_thread::interruptible_wait(1);
#else
            Sleep(1);
#endif
            Plug::PlugMessageBox("连接队列居然满了,超过1000的未处理数!");
        }
#else
        std::lock_guard<std::mutex> lock(plt->conn_mtx);
        plt->conn_queue.push(item);
#endif
    }
    //激活读线程的读事件
    send(plt->notfiy_send_fd, "c", 1, 0);

#else
    auto base = evconnlistener_get_base(listener);

    auto bev = bufferevent_socket_new(base, fd, BEV_OPT_THREADSAFE);//|BEV_OPT_CLOSE_ON_FREE);
    if (!bev)
    {
        fprintf(stderr, "Error constructing bufferevent!");
        event_base_loopbreak(base);
        return ;
    }

    auto c2 = CreateChannel(bev);

    bufferevent_setcb(bev, conn_readcb, NULL, conn_eventcb, c2);
    bufferevent_enable(bev, EV_READ | EV_WRITE );

#endif
}

//侦听端口,-1表示向系统申请一个任意可用端口
bool LibEvtServer::listen(int* port)
{
    struct sockaddr_in sin;

    memset(&sin, 0, sizeof(sin));
    sin.sin_family = AF_INET;
    if(-1 == *port)
        sin.sin_port = htons(10000);
    else
        sin.sin_port = htons(*port);

    m_listener = evconnlistener_new_bind(m_base, ::listener_cb, (void*)this,
        LEV_OPT_REUSEABLE|LEV_OPT_CLOSE_ON_FREE, -1,
        (struct sockaddr*)&sin,
        sizeof(sin));
    if (!m_listener)
    {
        return false;
    }

    if( -1 == *port)
        *port = ntohs(sin.sin_port);

    if (!m_listener) {
        fprintf(stderr, "Could not create a listener!\n");
        return false;
    }
    m_spListenThread.reset(new std::thread([this]//现在看这个线程只是收到连接,然后交给线程,然后通知线程
    {
        //SetThreadPriority(GetCurrentThread(), THREAD_PRIORITY_HIGHEST);
        //event_base_loop(m_base, EVLOOP_ONCE);
        event_base_dispatch(m_base);
        if(WSAENOTSOCK == WSAGetLastError())
        {
            Plug::PlugMessageBox(L"操作无效套接字啊!");
        }

        Plug::PlugMessageBox(L"Libevent派发线程退出!");
    }));
    return true;
}

void LibEvtServer::notify_cb(evutil_socket_t fd, short which, LibeventThread *pLibeventThread)
{
    //首先将socketpair的1个字节通知信号读出(这是必须的,在水平触发模式下如果不处理该事件,则会循环通知,直到事件被处理)
    char  buf[1];
    recv(fd, buf, 1, 0);//从sockpair的另一端读数据

    auto plt = pLibeventThread;

    conn_queue_item  item;

    //从自己的连接队列中取出连接数
    {
        //取出队列中的第一个元素
#ifdef BOOST_LOCKFREE
        while(!plt->conn_queue.pop(item))//pop一个出来
        {
#ifndef _DEBUG
            boost::this_thread::interruptible_wait(1);
#else
            Sleep(1);
#endif
            Plug::PlugMessageBox("通知队列居然弹空了啊!");
        }
#else
        std::lock_guard<std::mutex>  lck(plt->conn_mtx);
        item = plt->conn_queue.front();
#endif
    }

    //创建每个socket的bufferevent
    auto bev = bufferevent_socket_new(plt->thread_base, item.fd, BEV_OPT_THREADSAFE);

    Channel2* c2 = CreateChannel(bev);

    //设置接收、状态改变 回调函数
    bufferevent_setcb(bev, conn_readcb, NULL, conn_eventcb, c2);
    bufferevent_enable(bev, EV_READ | EV_WRITE );
}

//看了这个过程就是这个样子的,监听线程接收到连接之后把这个socket丢给Libevent线程,libevent创建bufferevent
//处理相关读和写事件,这个工程通过每个线程的连接队列,然后一个socketpair通知的。这样每个线程就很平均的处理所有的连接事件
//多线程比单线程的是复杂很多,只是这种模式不知道,但bufferevent还是一样的
时间: 2024-10-06 03:29:45

项目中的Libevent(多线程)的相关文章

面试常问问题:银行网上支付项目中怎么控制多线程高并发访问?

面试常问问题:银行网上支付项目中怎么控制多线程高并发访问? synchronized关键字主要解决多线程共享数据同步问题. ThreadLocal使用场合主要解决多线程中数据因并发产生不一致问题. ThreadLocal和Synchonized都用于解决多线程并发访问.但是ThreadLocal与synchronized有本质的区别: synchronized是利用锁的机制,使变量或代码块在某一时该只能被一个线程访问.而ThreadLocal为每一个线程都提供了变量的副本,使 得每个线程在某一时

项目中的libevent

单线程libevent模式 项目里面是多线程版的,我先理解下单线程的. //client 1.调用NGP::init() bool NGP::init(NGPcontext context) { _context = context; //_TcpLink = NEWSP(TcpLink); _TcpLink = NEWSP(TcpLinkEx); _TcpLink->Init(this); return true; } 2.初始化Libevent bool LibEvtServer::init

[多线程] Web 项目中,少有涉及到的一次多线程编程的经验

如今框架横行,Spring 已经是非常成熟的容器体系,我们在日常开发 JavaWeb 的工作中,大多已经不需要考虑多线程的问题,这些问题都已经在Spring容器中实现,框架的意义就是让程序员们可以专注于逻辑的实现.然而这种编程工作是非常无趣无味的,如果长期从事这个工作,技术不一定见长,业务知识一定很熟悉!= =但说实在的,我并不喜欢这类工作,因为这种工作大多情况下知识对代码的简单复制,或是简单的一些编写,并没有什么真正的创造性,不会给人成就感. 需求背景 我们的项目,是 Mysql+Elasti

实际项目中Java多线程模型的总结整理

分享一下最近项目中用到的多线程模型. 需要实现:根据租户填写的表单,自动部署ES集群,提供ES服务. 基本思路: 就是将一个事务生命周期分成不同的阶段,每个阶段都是用线程去负责执行. 目前主要分为四个阶段:事件监听阶段,事件提交阶段,执行器阶段,状态校验阶段 流程图如下: 线程分类: A.监听线程 B.工作线程 C.状态校验线程 D.执行器线程 原理图简单介绍: 1.AcceptorThread线程:监听操作对列表,将新产生的事件记录扔进事件分类器,并且同时往内存容器中添加一条记录. 2.事件分

iOS开发多线程在实际项目中的运用

实际项目开发中为了能够给用户更好的体验,有些延时操作我们都会放在子线程中进行. 今天我们就来聊聊多线程在实际项目中的运用. 我们先来看看多线程的基础知识: 1.多线程的原理: 同一时间,CPU只能处理一条线程,也就是只有一条线程在工作.所谓多线程并发(同时)执行, 其实是CPU快速的在多线程之间调度(切换).如果CPU调度线程的时间足够快,就造成了多线程并 发执行的假象. 2.在实际项目开发中并不是线程越多越好,如果开了大量的线程,会消耗大量的CPU资源,CPU会 被累死,所以一般手机只开1~3

项目中使用Redis的一些总结和体会

第一部分:为什么我的项目中要使用Redis 我知道有些地方没说到位,希望大神们提出来,我会吸取教训,大家共同进步! 注册时邮件激活的部分使用Redis 发送邮件时使用Redis的消息队列,减轻网站压力. 使用Lucene.Net在进行分词时使用Redis消息队列和多线程来避免界面卡死等性能问题. 请大家先思考一个问题:这个问题在大并发.高负载的网站中必须考虑!大家思考如何让速度更快. 三种方法:(1)数据库(2)页面静态化(3)Redis.Memcached 第二部分:Redis是什么 概述:r

项目中遇到的某些问题及解决办法(一)

简介 该博文记录了一些平时在工作中遇到的问题及解决办法,某些问题有解决办法,某些问题暂时没有解决办法,如果有大神知道的,请多多指点. 如果某些问题有更好的解决办法,也请指教. 正文 1.在一个方法中用泛型操作两个不同的类型(Type). 难点:需要实现一个方法,进入参数一个泛型,返回信息一个泛型.但是一个方法中泛型只支持一种类型. 解决办法:将进入和返回放在一个类型中,用特性将进入参数和返回参数区分开. 2.微信三方登录,需要在PC桌面应用端+API服务实现. 难点:微信官网只提供了网页三方登录

C#项目中常用到的设计模式

1. 引言 一个项目的通常都是从Demo开始,不断为项目添加新的功能以及重构,也许刚开始的时候代码显得非常凌乱,毫无设计可言.但是随着项目的迭代,往往需要将很多相同功能的代码抽取出来,这也是设计模式的开始.熟练运用设计模式应该是每一个软件开发人员的必备技能.今天给大家介绍几个常用的设计模式. 2. 单例模式 单例模式恐怕是很多开发人员最先接触到的模式之一,可以认为就是一个全局变量.它的初始化过程无非就是一开始就new 一个instance,或者惰性初始化等需要用到的时候new 一个instanc

对libevent+多线程服务器模型的C++封装类

最近在看memcached的源码,觉得它那种libevent+多线程的服务器模型真的很不错,我将这个模型封装成一个C++类,根据我的简单测试,这个模型的效率真的很不错,欢迎大家试用. 这个类的使用方法很简单(缺点是不太灵活),只要派生一个类,根据需要重写以下这几个虚函数就行了: //新建连接成功后,会调用该函数virtual void ConnectionEvent(Conn *conn) { }//读取完数据后,会调用该函数virtual void ReadEvent(Conn *conn)