服务器端编程心得(三)—— 一个服务器程序的架构介绍

本文将介绍我曾经做过的一个项目的服务器架构和服务器编程的一些重要细节。

一、程序运行环境

操作系统:centos 7.0

编译器:gcc/g++ 4.8.3     cmake 2.8.11

mysql数据库:5.5.47

项目代码管理工具:VS2013

二、程序结构

该程序总共有17个线程,其中分为9个数据库工作线程D和一个日志线程L,6个普通工作线程W,一个主线程M。(以下会用这些字母来代指这些线程)

(一)、数据库工作线程的用途

9个数据库工作线程在线程启动之初,与mysql建立连接,也就是说每个线程都与mysql保持一路连接,共9个数据库连接。

每个数据库工作线程同时存在两个任务队列,第一个队列A存放需要执行数据库增删查改操作的任务sqlTask,第二个队列B存放sqlTask执行完成后的结果。sqlTask执行完成后立即放入结果队列中,因而结果队列中任务也是一个个的需要执行的任务。大致伪代码如下:

void db_thread_func()
{
    while (!m_bExit)
    {
        if (NULL != (pTask = m_sqlTask.Pop()))
        {
            //从m_sqlTask中取出的任务先执行完成后,pTask将携带结果数据
            pTask->Execute();
            //得到结果后,立刻将该任务放入结果任务队列
            m_resultTask.Push(pTask);
            continue;
        }

        sleep(1000);
    }
}v

现在的问题来了:

1. 任务队列A中的任务从何而来,目前只有消费者,没有生产者,那么生产者是谁?

2. 任务队列B中的任务将去何方,目前只有生产者没有消费者。

这两个问题先放一会儿,等到后面我再来回答。

(二)工作线程和主线程

在介绍主线程和工作线程具体做什么时,我们介绍下服务器编程中常常抽象出来的几个概念(这里以tcp连接为例):

1.  TcpServer 即Tcp服务,服务器需要绑定ip地址和端口号,并在该端口号上侦听客户端的连接(往往由一个成员变量TcpListener来管理侦听细节)。所以一个TcpServer要做的就是这些工作。除此之外,每当有新连接到来时,TcpServer需要接收新连接,当多个新连接存在时,TcpServer需要有条不紊地管理这些连接:连接的建立、断开等,即产生和管理下文中说的TcpConnection对象。

2.一个连接对应一个TcpConnection对象,TcpConnection对象管理着这个连接的一些信息:如连接状态、本端和对端的ip地址和端口号等。

3.数据通道对象Channel,Channel记录了socket的句柄,因而是一个连接上执行数据收发的真正执行者,Channel对象一般作为TcpConnection的成员变量。

4. TcpSession对象,是将Channel收取的数据进行解包,或者对准备好的数据进行装包,并传给Channel发送。

归纳起来:一个TcpServer依靠TcpListener对新连接的侦听和处理,依靠TcpConnection对象对连接上的数据进行管理,TcpConnection实际依靠Channel对数据进行收发,依靠TcpSession对数据进行装包和解包。也就是说一个TcpServer存在一个TcpListener,对应多个TcpConnection,有几个TcpConnection就有几个TcpSession,同时也就有几个Channel。

以上说的TcpServer、TcpListener、TcpConnection、Channel和TcpSession是服务器框架的网络层。一个好的网络框架,应该做到与业务代码脱耦。即上层代码只需要拿到数据,执行业务逻辑,而不用关注数据的收发和网络数据包的封包和解包以及网络状态的变化(比如网络断开与重连)。

拿数据的发送来说:

当业务逻辑将数据交给TcpSession,TcpSession将数据装好包后(装包过程后可以有一些加密或压缩操作),交给TcpConnection::SendData(),而TcpConnection::SendData()实际是调用Channel::SendData(),因为Channel含有socket句柄,所以Channel::SendData()真正调用send()/sendto()/write()方法将数据发出去。

对于数据的接收,稍微有一点不同:

通过select()/poll()/epoll()等IO multiplex技术,确定好了哪些TcpConnection上有数据到来后,激活该TcpConnection的Channel对象去调用recv()/recvfrom()/read()来收取数据。数据收到以后,将数据交由TcpSession来处理,最终交给业务层。注意数据收取、解包乃至交给业务层是一定要分开的。我的意思是:最好不要解包并交给业务层和数据收取的逻辑放在一起。因为数据收取是IO操作,而解包和交给业务层是逻辑计算操作。IO操作一般比逻辑计算要慢。到底如何安排要根据服务器业务来取舍,也就是说你要想好你的服务器程序的性能瓶颈在网络IO还是逻辑计算,即使是网络IO,也可以分为上行操作和下行操作,上行操作即客户端发数据给服务器,下行即服务器发数据给客户端。有时候数据上行少,下行大。(如游戏服务器,一个npc移动了位置,上行是该客户端通知服务器自己最新位置,而下行确是服务器要告诉在场的每个客户端)。

在我的博文《服务器端编程心得(一)—— 主线程与工作线程的分工》中介绍了,工作线程的流程:

while (!m_bQuit)
{
    epoll_or_select_func();

    handle_io_events();

    handle_other_things();
}

其中epoll_or_select_func()即是上文所说的通过select()/poll()/epoll()等IO multiplex技术,确定好了哪些TcpConnection上有数据到来。我的服务器代码中一般只会监测socket可读事件,而不会监测socket可写事件。至于如何发数据,文章后面会介绍。所以对于可读事件,以epoll为例,这里需要设置的标识位是:

EPOLLIN 普通可读事件(当连接正常时,产生这个事件,recv()/read()函数返回收到的字节数;当连接关闭,这两个函数返回0,也就是说我们设置这个标识已经可以监测到新来数据和对端关闭事件)

EPOLLRDHUP 对端关闭事件(linux man手册上说这个事件可以监测对端关闭,但我实际调试时发送即使对端关闭也没触发这个事件,仍然是EPOLLIN,只不过此时调用recv()/read()函数,返回值会为0,所以实际项目中是否可以通过设置这个标识来监测对端关闭,仍然待考证)

EPOLLPRI 带外数据

muduo里面将epoll_wait的超时事件设置为1毫秒,我的另一个项目将epoll_wait超时时间设置为10毫秒。这两个数值供大家参考。

这个项目中,工作线程和主线程都是上文代码中的逻辑,主线程监听侦听socket上的可读事件,也就是监测是否有新连接来了。主线程和每个工作线程上都存在一个epollfd。如果新连接来了,则在主线程的handle_io_events()中接收新连接。产生的新连接的socket句柄挂接到哪个线程的epollfd上呢?这里采取的做法是round-robin算法,即存在一个对象CWorkerThreadManager记录了各个工作线程上工作状态。伪码大致如下:

void attach_new_fd(int newsocketfd)
{
    workerthread = get_next_worker_thread(next);
    workerthread.attach_to_epollfd(newsocketfd);
    ++next;
    if (next > max_worker_thread_num)
        next = 0;
}

即先从第一个工作线程的epollfd开始挂接新来socket,接着累加索引,这样下次就是第二个工作线程了。如果所以超出工作线程数目,则从第一个工作重新开始。这里解决了新连接socket“负载均衡”的问题。在实际代码中还有个需要注意的细节就是:epoll_wait的函数中的struct epoll_event 数量开始到底要设置多少个才合理?存在的顾虑是,多了浪费,少了不够用,我在曾经一个项目中直接用的是4096:

const int EPOLL_MAX_EVENTS = 4096;
const int dwSelectTimeout = 10000;
struct epoll_event events[EPOLL_MAX_EVENTS];
int nfds = epoll_wait(m_fdEpoll, events, EPOLL_MAX_EVENTS, dwSelectTimeout / 1000);

我在陈硕的muduo网络库中发现作者才用了一个比较好的思路,即动态扩张数量:开始是n个,当发现有事件的fd数量已经到达n个后,将struct epoll_event数量调整成2n个,下次如果还不够,则变成4n个,以此类推,作者巧妙地利用stl::vector在内存中的连续性来实现了这种思路:

//初始化代码
std::vector<struct epoll_event> events_(16);

//线程循环里面的代码
while (m_bExit)
{
    int numEvents = ::epoll_wait(epollfd_, &*events_.begin(), static_cast<int>(events_.size()), 1);
    if (numEvents > 0)
    {
        if (static_cast<size_t>(numEvents) == events_.size())
        {
            events_.resize(events_.size() * 2);
        }
    }
}

读到这里,你可能觉得工作线程所做的工作也不过就是调用handle_io_events()来接收网络数据,其实不然,工作线程也可以做程序业务逻辑上的一些工作。也就是在handle_other_things()里面。那如何将这些工作加到handle_other_things()中去做呢?写一个队列,任务先放入队列,再让handle_other_things()从队列中取出来做?我在该项目中也借鉴了muduo库的做法。即handle_other_things()中调用一系列函数指针,伪码如下:

void do_other_things()
{
    somefunc();
}

//m_functors是一个stl::vector,其中每一个元素为一个函数指针
void somefunc()
{
    for (size_t i = 0; i < m_functors.size(); ++i)
    {
        m_functors[i]();
    }

    m_functors.clear();
}

当任务产生时,只要我们将执行任务的函数push_back到m_functors这个stl::vector对象中即可。但是问题来了,如果是其他线程产生的任务,两个线程同时操作m_functors,必然要加锁,这也会影响效率。muduo是这样做的:

void add_task(const Functor& cb)
{
    std::unique_lock<std::mutex> lock(mutex_);
    m_functors.push_back(cb);
}

void do_task()
{
    std::vector<Functor> functors;
    {
        std::unique_lock<std::mutex> lock(mutex_);
        functors.swap(m_functors);
    }

    for (size_t i = 0; i < functors.size(); ++i)
    {
        functors[i]();
    }
}

看到没有,利用一个栈变量functors将m_functors中的任务函数指针倒换(swap)过来了,这样大大减小了对m_functors操作时的加锁粒度。前后变化:变化前,相当于原来A给B多少东西,B消耗多少,A给的时候,B不能消耗;B消耗的时候A不能给。现在变成A将东西放到篮子里面去,B从篮子里面拿,B如果拿去一部分后,只有消耗完了才会来拿,或者A通知B去篮子里面拿,而B忙碌时,A是不会通知B来拿,这个时候A只管将东西放在篮子里面就可以了。

bool bBusy = false;
void add_task(const Functor& cb)
{
    std::unique_lock<std::mutex> lock(mutex_);
    m_functors_.push_back(cb);

    //B不忙碌时只管往篮子里面加,不要通知B
    if (!bBusy)
    {
        wakeup_to_do_task();
    }
}

void do_task()
{
    bBusy = true;
    std::vector<Functor> functors;
    {
        std::unique_lock<std::mutex> lock(mutex_);
        functors.swap(pendingFunctors_);
    }

    for (size_t i = 0; i < functors.size(); ++i)
    {
        functors[i]();
    }

    bBusy = false;
}

看,多巧妙的做法!

因为每个工作线程都存在一个m_functors,现在问题来了,如何将产生的任务均衡地分配给每个工作线程。这个做法类似上文中如何将新连接的socket句柄挂载到工作线程的epollfd上,也是round-robin算法。上文已经描述,此处不再赘述。

还有种情况,就是希望任务产生时,工作线程能够立马执行这些任务,而不是等epoll_wait超时返回之后。这个时候的做法,就是使用一些技巧唤醒epoll_wait,linux系统可以使用socketpair或timerevent、eventfd等技巧(这个细节在我的博文《服务器端编程心得(一)—— 主线程与工作线程的分工》已经详细介绍过了)。

上文中留下三个问题:

1. 数据库线程任务队列A中的任务从何而来,目前只有消费者,没有生产者,那么生产者是谁?

2.数据库线程任务队列B中的任务将去何方,目前只有生产者没有消费者。

3.业务层的数据如何发送出去?

问题1的答案是:业务层产生任务可能会交给数据库任务队列A,这里的业务层代码可能就是工作线程中do_other_things()函数执行体中的调用。至于交给这个9个数据库线程的哪一个的任务队列,同样采用了round-robin算法。所以就存在一个对象CDbThreadManager来管理这九个数据库线程。下面的伪码是向数据库工作线程中加入任务:

bool CDbThreadManager::AddTask(IMysqlTask* poTask )
{
    if (m_index >= m_dwThreadsCount)
    {
        m_index = 0;
    }

    return m_aoMysqlThreads[m_index++].AddTask(poTask);
}

同理问题2中的消费者也可能就是do_other_things()函数执行体中的调用。

现在来说问题3,业务层的数据产生后,经过TcpSession装包后,需要发送的话,产生任务丢给工作线程的do_other_things(),然后在相关的Channel里面发送,因为没有监测该socket上的可写事件,所以该数据可能调用send()或者write()时会阻塞,没关系,sleep()一会儿,继续发送,一直尝试,到数据发出去。伪码如下:

bool Channel::Send()
{
    int offset = 0;
    while (true)
    {
        int n = ::send(socketfd, buf + offset, length - offset);
        if (n == -1)
        {
            if (errno == EWOULDBLOCK)
            {
                ::sleep(100);
                continue;
            }
        }
        //对方关闭了socket,这端建议也关闭
        else if (n == 0)
        {
            close(socketfd);
            return false;
        }

        offset += n;
        if (offset >= length)
            break;

    }

    return true;
}

最后,还有一个日志线程没有介绍,高性能的日志实现方案目前并不常见。限于文章篇幅,下次再介绍。

zhangyl 2016.12.02晚12:35

原文地址:https://www.cnblogs.com/balloonwj/p/9093521.html

时间: 2024-09-28 03:07:12

服务器端编程心得(三)—— 一个服务器程序的架构介绍的相关文章

服务器端编程心得(六)—— 关于网络编程的一些实用技巧和细节

这些年,接触了形形色色的项目,写了不少网络编程的代码,从windows到linux,跌进了不少坑,由于网络编程涉及很多细节和技巧,一直想写篇文章来总结下这方面的心得与经验,希望对来者有一点帮助,那就善莫大焉了. 本文涉及的平台包括windows和linux,下面开始啦. 一.非阻塞的的connect()函数如何编写 我们知道用connect()函数默认是阻塞的,直到三次握手建立之后,或者实在连不上超时返回,期间程序执行流一直阻塞在那里.那么如何利用connect()函数编写非阻塞的连接代码呢?

UNIX网络编程入门——TCP客户/服务器程序详解

前言 最近刚开始看APUE和UNP来学习socket套接字编程,因为网络这方面我还没接触过,要等到下学期才上计算机网络这门课,所以我就找了本教材啃了一两天,也算是入了个门. 至于APUE和UNP这两本书,书是好书,网上也说这书是给进入unix网络编程领域初学者的圣经,这个不可置否,但这个初学者,我认为指的是接受过完整计算机本科教育的研究生初学者,需要具有完整计算机系统,体系结构,网络基础知识.基础没打好就上来啃书反而会适得其反,不过对于我来说也没什么关系,因为基础课也都上得差不多了,而且如果书读

Socket编程——怎么实现一个服务器多个客户端之间的连接

  1 package coreBookSocket; 2 3 import java.io.IOException; 4 import java.net.ServerSocket; 5 import java.net.Socket; 6 7 /* 8 * 这个方法的主要目地是为了用多线程的方法实现网络编程,让多个客户端可以同时连接到一个服务器 9 *1:准备工作和单个客户端编程类似,先建立服务器端的套接字,同时让客户端那边调用accept()方法来接受服务器端的信息 10 *2:这里面定一个w

我们通过一个服务器程序,以研究backlog参数对listen系统调用的影响,运行截图如下

启动服务器程序,服务器程序正在等待客户端的连接 我们使用一次telnet命令就建立一个连接 打开多个终端窗口执行下列命名 #telnet 192.168.255.128 4444多次执行 然后我们执行如下netstat -nt |grep 4444,显示出listen监听队列中的内容 可见,在监听对列中,处于ESTABLISHD状态的连接只有4个(backlog值加1),其他的连接处于SYS_RCVD状态.我们改变服务器程序的第三个参数 并重新运行只,可发现同样的规律,即完整连接最多有(back

【UNIX网络编程】TCP客户/服务器程序示例

做一个简单的回射服务器: 客户从标准输入读入一行文本,写给服务器 -> 服务器从网络输入读入这行文本,并回射给客户 -> 客户从网络输入读入这行回射文本,并显示在标准输出上 以下是我的代码(部分.h文件是由unpv13e文件夹中的.c文件改名得到) #include "../unpv13e/unp.h" #include "../unpv13e/apueerror.h" #include "../unpv13e/wrapsock.h"

java网络编程(6)——实现一个服务器把小写转大写

实现一个服务器,通过我们发送的文本数据,然后转回大写放回,实现一个服务端与客户端的交互,用over来作为结束标记,具体代码如下: 客户端: package com.seven.tcp; import java.io.BufferedReader; import java.io.BufferedWriter; import java.io.InputStreamReader; import java.io.OutputStreamWriter; import java.net.Socket; pu

CUDA 编程:第一个CUDA程序

# 环境准备 Windows 下安装好CUDA,VS2013. 创建一个空的控制台程序,新增加一个文件“test.cu”. ##配置头文件和库文件目录 ``` C:\Program Files\NVIDIA GPU Computing Toolkit\CUDA\v9.0\include C:\Program Files\NVIDIA GPU Computing Toolkit\CUDA\v9.0\lib\x64 ``` ``` cublas.libcublas_device.libcuda.li

windows编程:第一个windows程序

1 #define WIN32_LEAN_AND_MEAN 2 #include <windows.h> 3 #include <windowsx.h> 4 #include <math.h> 5 #define WINDOW_CLASS_NAME L"WINCLASS1" 6 //窗口处理函数 7 LRESULT CALLBACK WindowProc(HWND hwnd, 8 UINT msg, 9 WPARAM wParam, 10 LPARA

我的下一个web程序的架构

作为业余的,非生计驱动的程序员,总是难以抵制流行技术的诱惑.我在服务器一侧主要使用java,因为java在这一个领域有很多成熟的解决方案.对于nodejs,当我想写一个完整的应用时,总觉得缺少某些关键的解决方案,比如说用户认证授权方面(我没有深入研究),如果从头开始,又提不起兴趣.所以想用新的架构,引入自己感兴趣的技术. 架构图如下: 1.ua和nodejs之间是有session的,nodejs和backend之间是sessionless. 2.登陆通过nodejs转接,返回的principle