用一个示例讲解我是如何处理高并发的

去年做了一个远程升级的服务。客户端连接此服务可以下载更新程序。简单点说就是个TCP sever。基于C++。

运行环境是centOS 6.5

刚开始客户端数量少而且访问不频繁,所以没太关注并发的问题。当时用工具测试大概只能支持的40次/秒的并发访问,而且已经有数据串包的情况出现了。最近有空做了不少的优化并记录了笔记备忘。

下面给出的代码都不是完整的项目源码,我只是截取了关键部分用于说明主题

我选择的测试工具是一个小的tcp客户端工具,可以比价快捷的进行多客户端连接的测试。

线程安全的单例模式

最初的版本是通过多线程实现高并发的。我的工程里有两个类是单例模式,一个参数文件管理类,一个是日志管理类。原来我实现的时候没有考虑线程安全,于是第一步我先把这两个类改成线程安全测试看看效果。

增加线程安全前的代码片段(只给出参数文件管理类的实现)

//.h
class AppCof {
public:
    static AppCof* open_cof();

private:
    AppCof();

    class CGarbo   //它的唯一工作就是在析构函数中删除CSingleton的实例
    {
    public:
        ~CGarbo()
        {
            if(AppCof::m_pInstance)
                delete AppCof::m_pInstance;
        }
    };
    static CGarbo Garbo;  //定义一个静态成员变量,程序结束时,系统会自动调用它的析构函数
...
//.cpp
AppCof* AppCof::open_cof(){

    if(m_pInstance == NULL){
        m_pInstance = new AppCof();
    }
    return m_pInstance;
}
...

增加线程安全后

//.h
class AppCof:boost::noncopyable
{
public:
    static AppCof* open_cof();
private:
    AppCof();
    static AppCof *m_pInstance;
    static void init();
    static pthread_once_t ponce_;
    ...
void AppCof::init()
{
    m_pInstance = new AppCof();
    if(m_pInstance != NULL)
    {
        m_pInstance->get_env();
        m_pInstance->read_cof();
    }
}

AppCof* AppCof::open_cof(){

    pthread_once(&ponce_, &AppCof::init);
    return m_pInstance;
}

...

这里有两个重点,一是pthread_once的用法,还有就是boost::noncopyable。

先说说前者,

int pthread_once(pthread_once_t *once_control, void (*init_routine) (void));

本函数使用初值为PTHREAD_ONCE_INIT的once_control变量保证init_routine()函数在本进程执行序列中仅执行一次。在多线程编程环境下,尽管pthread_once()调用会出现在多个线程中,init_routine()函数仅执行一次,究竟在哪个线程中执行是不定的,是由内核调度来决定。

boost::noncopyable这种用法其实从名字可以窥探一二,一个类继承自它就表示该类不能通过赋值,复制等手段创建新的对象了。

优化IO读写机制

这部分从select,epoll这些IO处理上下手。我优化过三个方案分别测试,效果还是比较明显。

最初的代码是这样的:

void run_srv(const char* i_port){
    for(;;){
        client = server.accept_client();
        std::thread t1(base_proc,client,i_port);
        usleep(200);
        t1.detach();
    }

}

void base_proc(Socket::TCP* i_client,const char* i_port){
    Socket::TCP* client = i_client;
    i_client = NULL;
    TmsProc* tmpc =new  TmsProc(client);
    tmpc->run();
    delete tmpc;
    return ;
}
void TmsProc::run(){
    Writelog::Trace(9,"业务处理开始");
    try{

        for(;;){
        tm.tv_sec = 3;
        tm.tv_usec = 0;
        FD_ZERO(&set);
        FD_SET(p_client->_socket_id,&set);

            int iret = select(p_client->_socket_id+1,&set,NULL,NULL,&tm);
            if(iret < 0){
                Writelog::Trace(2,"select出错:%s",strerror(errno));
                return;
            }
            if(iret == 0){
                Writelog::Trace(2,"select超时");
                return;
            }
            Writelog::Trace(9,"监控到可以进行接收");
            //收取信息
            if(read_sock() == false){
                Writelog::Trace(3,"检测到客户端套接字异常,准备断开连接");
                break;
            }
            ...

很简单,主要流程都在run函数里。这个函数可以优化的地方有几处。比如两个if的判断可以改成if elseif的形式。因为两次if虽然是互斥的但是程序都会判断一次,效率非常低。

另外接收数据的条件可以用FD_ISSET判断是否有数据可读,如果有才真正接收,否则不处理。

所以第一种优化方案很快出炉


void TmsProc::run(){

        tm.tv_sec = 60;
        tm.tv_usec = 0;
    try{

        for(;;){

        FD_ZERO(&set);
        FD_SET(p_client->_socket_id,&set);

            int iret = select(p_client->_socket_id+1,&set,NULL,NULL,&tm);
            if(iret < 0){
                pLog_tmsProc->Trace(2,"select出错:%s",strerror(errno));
                return;
            }
            else if(iret == 0){
                pLog_tmsProc->Trace(2,"select超时");
                return;
            }
            if(FD_ISSET(p_client->_socket_id,&set))
            {

            pLog_tmsProc->Trace(9,"监控到可以进行接收");
              if(read_sock() == false){
                      pLog_tmsProc->Trace(3,"检测到客户端套接字异常,准备断开连接");
                      break;
              }
                   ...

注意到我把超时时间改成了60秒,

tm.tv_sec = 60;

这是我在实际测试时发现,当并发量大时,程序在处理数量多的连接时,前面分配成功的线程会超时退出,看下日志就明白了:

14:39:40][140579076663072]:准备accept
[14:39:40][140579076663072]:接待并分配文件描述符[44],主服务描述符[3]
[14:39:40][140579076663072]:接到连接请求,准备启动线程TCP:0x1e89e90,IP:10.0.0.106,PORT:19803
[14:39:40][140579076663072]:启动服务线程于140577928623872
[14:39:40][140579076663072]:等待接收客户端连接
[14:39:40][140579076663072]:准备accept
[14:39:40][140579076663072]:接待并分配文件描述符[46],主服务描述符[3]
[14:39:40][140579076663072]:接到连接请求,准备启动线程TCP:0x1e8a170,IP:10.0.0.106,PORT:19804
[14:39:40][140577928623872]:接到来自TMS端口的请求
[14:39:40][140577928623872]:业务处理开始
[14:39:40][140577918134016]:接到来自TMS端口的请求
[14:39:40][140577918134016]:业务处理开始
[14:39:40][140579076663072]:启动服务线程于140577918134016
[14:39:40][140579076663072]:等待接收客户端连接
[14:39:40][140579076663072]:准备accept
[14:39:40][140579076663072]:接待并分配文件描述符[48],主服务描述符[3]
[14:39:40][140579076663072]:接到连接请求,准备启动线程TCP:0x1e8a450,IP:10.0.0.106,PORT:19805
[14:39:40][140577500821248]:接到来自TMS端口的请求
[14:39:40][140577500821248]:业务处理开始
[14:39:40][140579076663072]:启动服务线程于140577500821248
[14:39:40][140579076663072]:等待接收客户端连接
[14:39:40][140579076663072]:准备accept
[14:39:41][140579076663072]:接待并分配文件描述符[50],主服务描述符[3]

因为工具是模拟多个客户端同时发起请求,于是就有了上面这样的分配线程的过程,会持续的时间比较长(还要写日志),也就是同时发生的连接数越多,超时时间就要设置越长。超时改成60秒后。经过工具实测,500连接/500毫秒(应该相当于1000次/秒的并发量了吧?)的处理都正常。

性能大大提高。

但是问题还是很明显,就是超时时间。随着连接数的增大,超时也要一直增大才能保证没有线程”掉队”,但是这个时间太大了会影响真正接收数据时的效率。

第二种优化方案思路来源于apache和nginx的性能差异。

我做过java的web应用,对apache和nginx都有过了解。他俩的一个重要区别是前者基于线程并发,而后者基于进程(fork)。众所周知,nginx很多场景的高并发是好于apache的。

所以我的第二种方案,基本思路是为每个连接fork一个单独的进程处理。独立进程有个最大的好处是不需要加锁了(不解释)。修改好的代码片段如下(我已经把所有带锁的地方都去掉了,这个不贴出来了)。


void run_srv(const char* i_port){
    for(;;)
    {
        client = server.accept_client();
        pWriteLogInstance->Trace(1,"接到连接请求,准备启动进程TCP:%p,IP:%s,PORT:%u",client,client->ip().c_str(),client->port());

        if(client->_socket_id < 0)
        {
            if(errno == EINTR || errno == ECONNABORTED)
                continue;
            else
            {
                cout << "accept error" << endl;
                return;
            }
        }

        fpid = fork();
        if(fpid < 0)
        {
            pWriteLogInstance->Trace(9,"fork error");
        }
        else if(fpid > 0)//father
        {
            pWriteLogInstance->Trace(1,"father process start");
            client->close();
        }
        else //child
        {
            server.close();
            pWriteLogInstance->Trace(1,"child process start");

            TmsProc* tmpc =new  TmsProc(client);

            tmpc->run();

            delete tmpc;

            if(client != NULL)
            {
                client->close();
                delete client;
                client = NULL;
            }

            exit(-6);
        }

        usleep(10);

void TmsProc::run(){
        while(1)
        {

        //收取信息
            if(read_sock() == false){
                pLog_tmsProc->Trace(3,"检测到客户端套接字异常,准备断开连接");
                send_info.is_bad_qry = true;
                break;
            }

            if(stc_tms.un_parse_size == 0)
            {
                pLog_tmsProc->Trace(3,"没有接受到有效数据,客户端关闭了");
                break;
            }
                   ....

测试结果跟我预想的差不多。效果也是不错的。同样是1000次/秒的并发量数据没有出现问题。而且相比较前一种方案,没有了超时时间的困扰。

第三种方案,我考虑试试游戏行业用的比较多的epoll,这个以前从来没玩过,刚好学习下。

epoll是Linux内核为处理大批量文件描述符而作了改进的poll,是Linux下多路复用IO接口select/poll的增强版本,它能显著提高程序在大量并发连接中只有少量活跃的情况下的系统CPU利用率。另一点原因就是获取事件的时候,它无须遍历整个被侦听的描述符集,只要遍历那些被内核IO事件异步唤醒而加入Ready队列的描述符集合就行了。

为了简单起见,我这里只是用了单线程的epoll,用循环来轮询客户端的socket id来处理多个客户端连接的情况。单线程的epoll号称也能处理1万以上的并发量,我要测试下是不是有这边牛X。

epoll方案的代码如下:

void TmsProc::run(){

        struct epoll_event event;   // 告诉内核要监听什么事件
        struct epoll_event wait_event[OPEN_MAX]; //内核监听完的结果 

        Socket::TCP server;
       pLog_tmsProc->Trace(9,"准备监听端口:%d",this->port);
       server.listen_on_port(this->port,OPEN_MAX);

       Socket::TCP* client;

       //4.epoll相应参数准备
        int fd[OPEN_MAX+1];
        int i = 0, maxi = 0;
        int number = 0;
        memset(fd,-1, sizeof(fd));
        fd[0] = server._socket_id;

        pLog_tmsProc->Trace(9,"epoll 开始准备");
        int epfd = epoll_create(OPEN_MAX+1);
        if( -1 == epfd )
        {
            pLog_tmsProc->Trace(9,"epoll create error");
            return;
        }    

        event.data.fd = server._socket_id;     //监听套接字
        event.events = EPOLLIN; // 表示对应的文件描述符可以读

        //5.事件注册函数,将监听套接字描述符 sockfd 加入监听事件
        int ret = epoll_ctl(epfd, EPOLL_CTL_ADD, server._socket_id, &event);
        if(-1 == ret){
            pLog_tmsProc->Trace(9,"epoll_ctl error");
            return;
        }   

        pLog_tmsProc->Trace(9,"业务处理开始");

        while(1)
        {
            // 监视并等待多个文件(标准输入,udp套接字)描述符的属性变化(是否可读)
            // 没有属性变化,这个函数会阻塞,直到有变化才往下执行,这里没有设置超时    

            pLog_tmsProc->Trace(9,"epoll 开始监听");
            number = epoll_wait(epfd, wait_event, OPEN_MAX, -1);   

            for(int i = 0; i < number; i++)
            {
                if( (wait_event[i].events & EPOLLERR) || ( wait_event[i].events & EPOLLHUP ) || !(wait_event[i].events & EPOLLIN) )
                {
                    pLog_tmsProc->Trace(9,"epoll error");
                    close(wait_event[i].data.fd);
                    continue;
                }
                else if(server._socket_id == wait_event[i].data.fd )
                {
                    while(1)
                    {
                        client = server.accept_client();
                        if(client->_socket_id == -1)
                         {
                             if( errno == EAGAIN || errno == EWOULDBLOCK )
                             {
                                 break;
                             }
                             else
                             {
                                 pLog_tmsProc->Trace(9,"accept error");
                                 break;
                             }
                         }

                        Socket::TCP::make_socket_non_blocking(client->_socket_id);

                        event.data.fd = client->_socket_id; //监听套接字
                        event.events = EPOLLIN | EPOLLERR | EPOLLHUP | EPOLLET; // 表示对应的文件描述符可以读  

                        //6.1.3.事件注册函数,将监听套接字描述符 connfd 加入监听事件
                        pLog_tmsProc->Trace(9,"为客户端注册epoll监听");
                        ret = epoll_ctl(epfd, EPOLL_CTL_ADD, client->_socket_id, &event);

                        if(ret < 0){
                            pLog_tmsProc->Trace(9,"epoll_ctl error");
                        }
                        event.data.fd = client->_socket_id;
                    }
                }
                else
                {
                    //收取信息
                   if(read_sock(wait_event[i].data.fd) == false){
                        pLog_tmsProc->Trace(3,"检测到客户端套接字异常,准备断开连接");
                        close(wait_event[i].data.fd);
                   }
                   ...

只能说epoll确实比较给力,我这只是个单线程的服务,用工具测试上述代码,500个并发也是妥妥的。

时间: 2024-10-11 05:54:36

用一个示例讲解我是如何处理高并发的的相关文章

Swoole如何处理高并发

有需要学习交流的友人请加入swoole交流群的咱们一起,有问题一起交流,一起进步!前提是你是学技术的.感谢阅读! 点此加入该群 swoole如何处理高并发 ①Reactor模型介绍 IO复用异步非阻塞程序使用经典的Reactor模型,Reactor顾名思义就是反应堆的意思,它本身不处理任何数据收发.只是可以监视一个socket(也可以是管道.eventfd.信号)句柄的事件变化. Reactor只是一个事件发生器,实际对socket句柄的操作,如connect/accept.send/recv.

我是如何处理大并发量订单处理的 KafKa部署总结

今天要介绍的是消息中间件KafKa,应该说是一个很牛的中间件吧,背靠Apache 与很多有名的中间件搭配起来用效果更好哦 ,为什么不用RabbitMQ,因为公司需要它. 网上已经有很多怎么用和用到哪的内容,但结果很多人都倒在了入门第一步 环境都搭不起来,可谓是从了解到放弃,所以在此特记录如何在linux环境搭建,windows中配置一样,只是启动运行bat文件. 想要用它就先必须了解它能做什么及能做到什么程度,先看看它是什么吧. 当今社会各种应用系统诸如商业.社交.搜索.浏览等像信息工厂一样不断

一个epoll事件实现的高并发服务/客户端(C语言实现,服务端存储基于hashtable)

代码路径:https://github.com/prophetss/epoll-event 之前实现了一个简单高效的hashtable(点这里),之后一直想利用它再延伸一些功能,后来偶然看到一个hashtable与epoll事件的结合感觉效率很高所以自己尝试着实现了下.大体思想是将epoll接到的每一个服务请求存储到hashtable里来管理,每一个请求都可以设置独立的回调函数.具体可以先看代码,注释已经写得很详细.代码实现了一个简单实例,由于条件有限,client端我是fork了大量子进程来模

浅谈java中如何处理高并发的问题

1.从最基础的地方做起,优化我们写的代码,减少必要的资源浪费     a.避免频繁的使用new对象,对于整个应用只需要存在一个实例的类,我们可以使用单例模式.对于String连接操作,使用StringBuffer或StringBuilder,对于工具类可以通过静态方法来访问.     b.避免使用错误的方式,尽量不用instanceof做条件判断.使用java中效率高的类,比如ArrayList比Vector性能好. 2.html静态化     我们通过一个链接地址访问,通过这个链接地址,服务器

谈论java中如何处理高并发的问题

1 从最基础的地方做起,优化我们写的代码,减少必要的资源浪费.         a.避免频繁的使用new对象,对于整个应用只需要存在一个实例的类,我们可以使用单例模式.对于String连接操作,使用      StringBuffer或StringBuilder,对于工具类可以通过静态方法来访问.         b.避免使用错误的方式,尽量不用instanceof做条件判断.使用java中效率高的类,比如ArrayList比Vector性能好. 2 html静态化         我们通过一个

.net如何处理高并发socket,建立高性能健壮的socket服务

1.使用supersocket 高性能的事件驱动通信; 非常简单易用; 你只需要通过创建几个类就能获得一个健壮的 Socket 服务器; 内置的命令行协议让你能够迅速创建一个网络命令行接口服务器; 强大且高性能的协议解析实现工具帮你简化了网络数据的分析工作; 灵活的配置功能和友好的配置 API; 多监听器支持; 让你在一个服务器实例内监听多个端口; 多服务器实例托管的支持让你从容面对复杂的部署需求; 应用程序域和进程级别的隔离能够满足你大部分的宿主场景; 内置的 SSL/TLS 加密支持; 会话

如何处理高并发情况下的DB插入

插入数据库,在大家开发过程中是很经常的事情,假设我们有这么一个需求: 1.  我们需要接收一个外部的订单,而这个订单号是不允许重复的 2.  数据库对外部订单号没有做唯一性约束 3.  外部经常插入相同的订单,对于已经存在的订单则拒绝处理 对于这个需求,很简单我们会用下面的代码进行处理(思路:先查找数据库,如果数据库存在则直接退出,否则插入) package com.yhj.test; import com.yhj.dao.OrderDao; import com.yhj.pojo.Order;

面试最让你手足无措的一个问题:你的系统如何支撑高并发?

这篇文章,我们聊聊大量同学问我的一个问题,面试的时候被问到一个让人特别手足无措的问题:你的系统如何支撑高并发? 大多数同学被问到这个问题压根儿没什么思路去回答,不知道从什么地方说起,其实本质就是没经历过一些真正有高并发系统的锤炼罢了. 因为没有过相关的项目经历,所以就没法从真实的自身体会和经验中提炼出一套回答,然后系统的阐述出来自己复杂过的系统如何支撑高并发的. 所以,这篇文章就从这个角度切入来简单说说这个问题,用一个最简单的思路来回答,大致如何应对. 当然这里首先说清楚一个前提:高并发系统各不

asp.net解决高并发的方案

那啥,最近见了一人叨叨叨的神侃如何处理高并发.居然聊到服务器矩阵.我当时还没回过神,过后细想,服务器矩阵我也知道口里说说,但是中小企业能玩得起?作为一个程序员很多时候只能用手头资源来制定优化方案.(人生哲理:要警惕夸夸其谈者) 我收集了下网上提供的处理方式列在这里.虽然我不会无聊到背下来去唬新人,但加深下映象,有个纲目还是好的. 两大点: 通过服务器处理高并发  调整服务器应用程序池中的最大连接数. 1. 调整IIS 7应用程序池队列长度 由原来的默认1000改为65535. IIS Manag