面向连接的Socket Server的简单实现(简明易懂)

一、基本原理

有时候我们需要实现一个公共的模块,需要对多个其他的模块提供服务,最常用的方式就是实现一个Socket Server,接受客户的请求,并返回给客户结果。

这经常涉及到如果管理多个连接及如何多线程的提供服务的问题,常用的方式就是连接池和线程池,基本流程如下:

首先服务器端有一个监听线程,不断监听来自客户端的连接。

当一个客户端连接到监听线程后,便建立了一个新的连接。

监听线程将新建立的连接放入连接池进行管理,然后继续监听新来的连接。

线程池中有多个服务线程,每个线程都监听一个任务队列,一个建立的连接对应一个服务任务,当服务线程发现有新的任务的时候,便用此连接向客户端提供服务。

一个Socket Server所能够提供的连接数可配置,如果超过配置的个数则拒绝新的连接。

当服务线程完成服务的时候,客户端关闭连接,服务线程关闭连接,空闲并等待处理新的任务。

连接池的监控线程清除其中关闭的连接对象,从而可以建立新的连接。

二、对Socket的封装

Socket的调用主要包含以下的步骤:

调用比较复杂,我们首先区分两类Socket,一类是Listening Socket,一类是Connected Socket.

Listening Socket由MySocketServer负责,一旦accept,则生成一个Connected Socket,又MySocket负责。

MySocket主要实现的方法如下:


int MySocket::write(const char * buf, int length)
{
        int ret = 0;
        int left = length;
        int index = 0;
        while(left > 0)
        {
                ret = send(m_socket, buf + index, left, 0);
                if(ret == 0)
                        break;
                else if(ret == -1)
                {
                        break;
                }
                left -= ret;
                index += ret;
        }
        if(left > 0)
                return -1;
        return 0;
}


int MySocket::read(char * buf, int length)
{
        int ret = 0;
        int left = length;
        int index = 0;
        while(left > 0)
        {
                ret = recv(m_socket, buf + index, left, 0);
                if(ret == 0)
                        break;
                else if(ret == -1)
                        return -1;
                left -= ret;
                index += ret;
        }

return index;
}


int MySocket::status()
{
        int status;
        int ret;
        fd_set checkset;
        struct timeval timeout;

FD_ZERO(&checkset);
        FD_SET(m_socket, &checkset);

timeout.tv_sec = 10;
        timeout.tv_usec = 0;

status = select((int)m_socket + 1, &checkset, 0, 0, &timeout);
        if(status < 0)
                ret = -1;
        else if(status == 0)
                ret = 0;
        else
                ret = 0;
        return ret;
}


int MySocket::close()
{
        struct linger lin;
        lin.l_onoff = 1;
        lin.l_linger = 0;
        setsockopt(m_socket, SOL_SOCKET, SO_LINGER, (const char *)&lin, sizeof(lin));
        ::close(m_socket);
        return 0;
}

MySocketServer的主要方法实现如下:


int MySocketServer::init(int port)
{
        if((m_socket = socket(AF_INET, SOCK_STREAM, 0)) == -1)
        {
                return -1;
        }

struct sockaddr_in serverAddr;
        memset(&serverAddr, 0, sizeof(struct sockaddr_in));
        serverAddr.sin_addr.s_addr = htonl(INADDR_ANY);
        serverAddr.sin_family = AF_INET;
        serverAddr.sin_port = htons(port);

if(bind(m_socket, (struct sockaddr *)&serverAddr, sizeof(serverAddr)) == -1)
        {
                ::close(m_socket);
                return -1;
        }

if(listen(m_socket, SOMAXCONN) == -1)
        {
                ::close(m_socket);
                return -1;
        }

struct linger lin;
        lin.l_onoff = 1;
        lin.l_linger = 0;

setsockopt(m_socket, SOL_SOCKET, SO_LINGER, (const char *)&lin, sizeof(lin));
        m_port = port;
        m_inited = true;
        return 0;
}


MySocket * MySocketServer::accept()
{
        int sock;
        struct sockaddr_in clientAddr;
        socklen_t clientAddrSize = sizeof(clientAddr);
        if((sock = ::accept(m_socket, (struct sockaddr *)&clientAddr, &clientAddrSize)) == -1)
        {
                return NULL;
        }
        MySocket* socket = new MySocket(sock);
        return socket;
}


MySocket * MySocketServer::accept(int timeout)
{
        struct timeval timeout;
        timeout.tv_sec = timeout;
        timeout.tv_usec = 0;

fd_set checkset;
        FD_ZERO(&checkset);
        FD_SET(m_socket, &checkset);

int status = (int)select((int)(m_socket + 1), &checkset, NULL, NULL, &timeout);
        if(status < 0)
                return NULL;
        else if(status == 0)
                return NULL;

if(FD_ISSET(m_socket, &checkset))
        {
                return accept();
        }
}

三、线程池的实现

一个线程池一般有一个任务队列,启动的各个线程从任务队列中竞争任务,得到的线程则进行处理:list<MyTask *>  m_taskQueue;

任务队列由锁保护,使得线程安全:pthread_mutex_t m_queueMutex

任务队列需要条件变量来支持生产者消费者模式:pthread_cond_t m_cond

如果任务列表为空,则线程等待,等待中的线程个数为:m_numWaitThreads

需要一个列表来维护线程池中的线程:vector<MyThread *> m_threads

每个线程需要一个线程运行函数:


void * __thread_new_proc(void *p)
{
    ((MyThread *)p)->run();
    return 0;
}

每个线程由MyThread类负责,主要函数如下:


int MyThread::start()
{

pthread_attr_t  attr;
    pthread_attr_init(&attr);
    pthread_attr_setschedpolicy(&attr, SCHED_FIFO);

int ret = pthread_create(&m_thread, &attr, thread_func, args);
    pthread_attr_destroy(&attr);

if(ret != 0)
        return –1;

}


int MyThread::stop()
{

int ret = pthread_kill(m_thread, SIGINT);

if(ret != 0)
        return –1;
}


int MyThread::join()

{

int ret = pthread_join(m_thread, NULL);

if(ret != 0)

return –1;

}


void MyThread::run()

{

while (false == m_bStop)

{

MyTask *pTask = m_threadPool->getNextTask();

if (NULL != pTask)

{

pTask->process();

}

}

}

线程池由MyThreadPool负责,主要函数如下:


int MyThreadPool::init()
{

pthread_condattr_t cond_attr;
    pthread_condattr_init(&cond_attr);
    pthread_condattr_setpshared(&cond_attr, PTHREAD_PROCESS_SHARED);
    int ret =  pthread_cond_init(&m_cond, &cond_attr);
    pthread_condattr_destroy(&cond_attr);

if (ret_val != 0)
        return –1;

pthread_mutexattr_t attr;
    pthread_mutexattr_init(&attr);
    pthread_mutexattr_setpshared(&attr, PTHREAD_PROCESS_SHARED);
    ret = pthread_mutex_init(&m_queueMutex, &attr);
    pthread_mutexattr_destroy(&attr);

if (ret_val != 0)
        return –1;

for (int i = 0; i< m_poolSize; ++i)
    {
        MyThread *thread = new MyThread(i+1, this);        
        m_threads.push_back(thread);
    }

return 0;
}


int MyThreadPool::start()
{
    int ret;
    for (int i = 0; i< m_poolSize; ++i)
    {        
       ret = m_threads[i]->start();
       if (ret != 0)
           break;       
    }

ret = pthread_cond_broadcast(&m_cond);

if(ret != 0)
        return –1;
    return 0;
}


void MyThreadPool::addTask(MyTask *ptask)
{
    if (NULL == ptask)
        return;

pthread_mutex_lock(&m_queueMutex);

m_taskQueue.push_back(ptask);

if (m_waitingThreadCount > 0)
        pthread_cond_signal(&m_cond);

pthread_mutex_unlock(&m_queueMutex);
}


MyTask * MyThreadPool::getNextTask()
{
    MyTask *pTask = NULL;

pthread_mutex_lock(&m_queueMutex);

while (m_taskQueue.begin() == m_taskQueue.end())
    {  
        ++m_waitingThreadCount;

pthread_cond_wait(&n_cond, &m_queueMutex);

--m_waitingThreadCount;       
    }

pTask = m_taskQueue.front();

m_taskQueue.pop_front();

pthread_mutex_unlock(&m_queueMutex);

return pTask;   
}

其中每一个任务的执行由MyTask负责,其主要方法如下:


void MyTask::process()

{

//用read从客户端读取指令

//对指令进行处理

//用write向客户端写入结果

}

四、连接池的实现

每个连接池保存一个链表保存已经建立的连接:list<MyConnection *> * m_connections

当然这个链表也需要锁来进行多线程保护:pthread_mutex_t m_connectionMutex;

此处一个MyConnection也是一个MyTask,由一个线程来负责。

线程池也作为连接池的成员变量:MyThreadPool * m_threadPool

连接池由类MyConnectionPool负责,其主要函数如下:


void MyConnectionPool::addConnection(MyConnection * pConn)
{

pthread_mutex_lock(&m_connectionMutex);

m_connections->push_back(pConn);

pthread_mutex_unlock(&m_connectionMutex);

m_threadPool->addTask(pConn);
}

MyConnectionPool也要启动一个背后的线程,来管理这些连接,移除结束的连接和错误的连接。


void MyConnectionPool::managePool()
{

pthread_mutex_lock(&m_connectionMutex);

for (list<MyConnection *>::iterator itr = m_connections->begin(); itr!=m_connections->end(); )
    {
        MyConnection *conn = *itr;        
        if (conn->isFinish())
        {
            delete conn;
            conn = NULL;
            list<MyConnection *>::iterator pos = itr++;
            m_connections->erase(pos);                         
        }
        else if (conn->isError())
        {

//处理错误的连接
            ++itr;
        }
        else
        {
            ++itr;
        }
    }

pthread_mutex_unlock(&m_connectionMutex);

}

五、监听线程的实现

监听线程需要有一个MySocketServer来监听客户端的连接,每当形成一个新的连接,查看是否超过设置的最大连接数,如果超过则关闭连接,如果未超过设置的最大连接数,则形成一个新的MyConnection,将其加入连接池和线程池。


MySocketServer *pServer = new MySocketServer(port);

MyConnectionPool *pPool = new MyConnectionPool();

while (!stopFlag)

{

MySocket * sock = pServer->acceptConnection(5);

if(sock != null)

{

if(m_connections.size > maxConnectionSize)

{

sock.close();

}

MyTask *pTask = new MyConnection();

pPool->addConnection(pTask);

}

}

时间: 2024-08-28 07:35:52

面向连接的Socket Server的简单实现(简明易懂)的相关文章

[转] 3个学习Socket编程的简单例子:TCP Server/Client, Select

以前都是采用ACE的编写网络应用,最近由于工作需要,需要直接只用socket接口编写CS的代码,重新学习这方面的知识,给出自己所用到的3个简单例子,都是拷贝别人的程序.如果你能完全理解这3个例子,估计socket编程就已经基本入门了. 建议:1) 多多查查所用到的网络接口; 2) 最好有一本书,如UNIX环境高级编程,UNIX网络编程,可查询:3) 可以直接使用书上的例子更好. http://blog.csdn.net/zhenjing/article/details/4770490 TCP C

如何用socket构建一个简单的Web Server

背景 现代社会网络应用随处可见,不管我们是在浏览网页.发送电子邮件还是在线游戏都离不开网络应用程序,网络编程正在变得越来越重要 目标 了解web server的核心思想,然后自己构建一个tiny web server,它可以为我们提供简单的静态网页 最终效果 完整的事例代码可以查看这里 如何运行 python3 index.py 注意 我们假设你已经学习过Python的系统IO.网络编程.Http协议,如果对此不熟悉,可以点击这里的Python教程进行学习,可以点击这里的Http协议进行学习,事

Windows socket之最简单的socket程序

原文:Windows socket之最简单的socket程序 最简单的服务器的socket程序流程如下(面向连接的TCP连接 ): 1. WSAStartup(); 初始化网络库的使用. 2. socket(); 获得一个socket. 3. bind(); 把获得的socket绑定到一个ip 和端口.既然作为服务器, ip通常为本地IP127.0.0.1. 4. listen(); 监听已经绑定了指定端口的socket. 5. accept(); 接受一个来自客户端的连接. accept()返

利用java的Socket实现一个简单hello/hi聊天程序

利用java的Socket实现一个简单hello/hi聊天程序 首先,我们来用java实现一个简单的hello/hi聊天程序.在这个程序里,我学习到了怎么用socket套接套接字来进行编程.简单理解了一些关于socket套接字和底层调用的关系.关于java的封装思想,我学会了一些东西,java里真的是万物皆对象.还学到了一点多线程的知识. TCP 在这里,不得不先介绍以下TCP.TCP是传输层面向连接的协议.提供了端到端的进程之间的通信方式.TCP在通信之前要先建立连接.这里我们称这个建立连接的

面向连接的socket数据处理过程以及非阻塞connect问题

对于面向连接的socket类型(SOCK_STREAM,SOCK_SEQPACKET)在读写数据之前必须建立连接,首先服务器端socket必须在一个客户端知道的地址进行监听,也就是创建socket之后必须调用bind绑定到一个指定的地址,然后调用int listen(int sockfd, int backlog);进行监听.此时服务器socket允许客户端进行连接,backlog提示没被accept的客户连接请求队列的大小,系统决定实际的值,最大值定义为SOMAXCONN在头文件<sys/so

Socket网络编程--简单Web服务器(1)

这一次的Socket系列准备讲Web服务器.就是编写一个简单的Web服务器,具体怎么做呢?我也不是很清楚流程,所以我找来了一个开源的小的Web服务器--tinyhttpd.这个服务器才500多行的代码,使用C语言.这一小节就不讲别的内容了.就对这个程序进行一些注释和讲解了. 主函数: 1 int main(void) 2 { 3 int server_sock = -1; 4 u_short port = 0; 5 int client_sock = -1; 6 struct sockaddr_

socket编程,简单多线程服务端测试程序

socket编程,简单多线程服务端测试程序 前些天重温了MSDN关于socket编程的WSAStartup.WSACleanup.socket.closesocket.bind.listen.accept.recv.send等函数的介绍,今天写了一个CUI界面的测试程序(依赖MFC)作为补充.程序功能简介如下: 1:一个线程做监听用. 2:监听线程收到客户端连接后,创建新线程接收客户端数据.所有对客户端线程将加入容器,以便管理. 3:服务端打印所有客户端发来的信息. 4:服务端CUI界面输入数字

python之路 socket、socket server

一.socket socket的英文原义是“孔”或“插座”.作为BSD UNIX的进程通信机制,取后一种意思.通常也 称作"套接字",用于描述IP地址和端口,是一个通信链的句柄,可以用来实现不同虚拟机或不同计算机之间的通信.在Internet上的主机一 般运行了多个服务软件,同时提供几种服务.每种服务都打开一个Socket,并绑定到一个端口上,不同的端口对应于不同的服务.Socket正如其英文原 意那样,像一个多孔插座.一台主机犹如布满各种插座的房间,每个插座有一个编号,有的插座提供2

Socket网络编程学习笔记(2):面向连接的Socket

转自http://www.cnblogs.com/licongjie/archive/2006/10/26/540640.html 在上一篇中,我列了一些常用的方法,可以说这些方法是一些辅助性的方法,对于分析网络中的主机属性非常有用.在这篇中,我将会介绍一下面向连接(TCP)socket编程,其中辅以实例,代码可供下载.      对于TCP的Socket编程,主要分二部分:      一.服务端Socket侦听:      服务端Socket侦听主要分以下几个步骤,按照以下几个步骤我们可以很方