基于线程池、消息队列和epoll模型实现Client-Server并发架构

引言

并发是什么?企业在进行产品开发过程中为什么需要考虑这个问题?想象一下天猫的双11和京东的618活动,一秒的点击量就有几十万甚至上百万,这么多请求一下子涌入到服务器,服务器需要对这么多的请求逐个进行消化掉,假如服务器一秒的处理能力就几万,那么剩下的不能及时得到处理的这些请求作何处理?总不能让用户界面一直等着,因此消息队列应运而生,所有的请求都统一放入消息队列,工作线程从消息队列不断的消费,消息队列相当于一个缓冲区,可达到解藕、异步和削峰的目的。

Kafka、ActiveMQ、RabbitMQ和RockerMQ都是消息队列的典型,每一种都有其自身的优势和劣势。本文我用自己编写的Buffer类模拟消息队列,如果是企业级需要上线的应用,一般都是基于业界已有的MQ框架上开发。

需求原型

  1. N个Client从标准输入接收数据,然后连续不断的发送到Server端;
  2. Server端接收来自每个Client的数据,将数据中的小写字母全部转换成大写字母,其他字符保持不变,最后把转换结果发送给对应的Client。

需求分解

  1. 拿到需求,第一步要做的就是分析需求并选择合适的设计架构,考虑到Server需要和Client进行通信,Client来自四面八方,端对端通信自然选择TCP,因此Server端需要能够监听新的连接请求和已有连接的业务请求;
  2. 又由于Server需要响应多个Client的业务请求,我们希望把业务处理交给Server端的工作线程(消费者)来做;
  3. 同时还需要一个IO线程负责监听Socket描述符,当IO线程监听到已有连接的业务请求时,立即把请求内容封装成一个任务推入消息队列尾;
  4. IO线程与工作线程互斥访问消息队列,当然工作线程消费一个任务或者IO线程添加一个任务都需要通知对方,也就是同步;
  5. 工作线程处理完毕后,把处理结果交给IO线程,由IO线程负责把结果发送给对应的Client,也就是IO线程与工作线程的分离,这里工作线程通知IO线程的方式我用eventfd来实现;
  6. 我们希望引入Log4cpp记录服务端的日志,并能够保存到文件中;
  7. 分析完这些,一个整体架构和大体的样子在脑海中就已经形成了,接着就需要编写设计文档和画流程图、类图和时序图了。

详细设计文档

1.UML静态类图:

2.UML动态时序图:

效果

1.如图,开了三个Client,运行结果正确:

2.Server端通过Log4cpp把日志写到文件中:

源码获取

https://github.com/icoty/cs_threadpool_epoll_mq

目录结构

.
├── client                   // 客户端Demo
│?? ├── Client.cc
│?? ├── Client.exe
│?? ├── client.sh    // 进入该目录下启动Client Demo: sh client.sh
│?? ├── Log4func.cc // 引入日志模块重新疯转
│?? ├── Log4func.h
│?? └── Makefile     // 编译方式:make
├── conf
│?? └── my.conf // IP,Port配置文件, 从这里进行修改
├── include // 头文件
│?? ├── Configuration.hpp // 配置文件,单例类,my.conf的内存化
│?? ├── FileName.hpp // 全局定义,Configuration会用到
│?? ├── log // 日志模块头文件
│?? │?? └── Log4func.hpp
│?? ├── net // 网络框架模块头文件
│?? │?? ├── EpollPoller.hpp
│?? │?? ├── InetAddress.hpp
│?? │?? ├── Socket.hpp
│?? │?? ├── SockIO.hpp
│?? │?? ├── TcpConnection.hpp
│?? │?? └── TcpServer.hpp
│?? ├── String2Upper.hpp // 工作线程转换成大写实际走的这里面的接口
│?? ├── String2UpperServer.hpp // Server端的整个工厂
│?? └── threadpool // 线程池、锁、条件变量和消息队列的封装
│??     ├── Buffer.hpp
│??     ├── Condition.hpp
│??     ├── MutexLock.hpp
│??     ├── Noncopyable.hpp
│??     ├── Pthread.hpp
│??     ├── Task.hpp
│??     └── Threadpool.hpp
├── log // Server端的日志通过Log4cpp记录到这个文件中
│?? └── log4test.log
├── Makefile // 编译方式:make
├── README.md
├── server // server端Demo
│?? ├── server.exe
│?? └── server.sh // 进入该目录下启动Server Demo:sh server.sh
└── src // 源文件
    ├── Configuration.cpp
    ├── log
    │?? └── Log4func.cpp
    ├── main.cpp
    ├── net
    │?? ├── EpollPoller.cpp
    │?? ├── InetAddress.cpp
    │?? ├── Socket.cpp
    │?? ├── SockIO.cpp
    │?? ├── TcpConnection.cpp
    │?? └── TcpServer.cpp
    ├── String2Upper.cpp
    ├── String2UpperServer.cpp
    └── threadpool
    ├── Buffer.cpp
    ├── Condition.cpp
    ├── MutexLock.cpp // MutexLockGuard封装
    ├── Pthread.cpp
    └── Threadpool.cpp

参考文献

[1] UNIX环境高级编程第3版
[2] cpp reference
[3] UML时序图
[4] Log4cpp官网下载
[5] Log4cpp安装

原文地址:https://www.cnblogs.com/icoty23/p/10989445.html

时间: 2024-08-02 16:32:41

基于线程池、消息队列和epoll模型实现Client-Server并发架构的相关文章

基于线程池的线程调度管控系统

本文将详细描述"基于线程池的线程调度管控系统"的实现原理,以梳理当时的编程思路. 简单叙述一下此线程池的开发背景:客户端是批量运行的,虽然客户端均运行在服务器上,但是大量客户端运行时它们对机器资源是抢占式的,所以客户端在大规模运行时与单次运行时的运行效果是不一样的,因为相同客户端在单次运行与大规模运行时所占有的资源量是不同的,理论上说大规模运行时客户端的数量越多,每个客户端所占有的资源量就越少,于是我们认为解决问题的关键应该是不管是单次运行还是大规模运行,相同客户端所获取的资源量就应该

使用Android新式LruCache缓存图片,基于线程池异步加载图片

import java.io.BufferedInputStream; import java.io.ByteArrayOutputStream; import java.io.InputStream; import java.net.HttpURLConnection; import java.net.URL; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; import a

自定义基于xmemcached协议消息队列的Spark Streaming 接收器

虽然spark streaming定义了常用的Receiver,但有时候还是需要自定义自己的Receiver的.对于自定义的Receiver,只需要实现spark streaming的Receiver抽象类即可.而Receiver的实现只需要简单地实现两个方法: 1.onStart():接收数据. 2.onStop():停止接收数据. 一般onStart()不应该阻塞,应该启动一个新的线程复杂数据接收.而onStop()方法负责确保这些接收数据的线程是停止的,在 Receiver 被关闭时调用了

HQueue:基于HBase的消息队列

HQueue:基于HBase的消息队列 凌柏 ?1. HQueue简介 HQueue是一淘搜索网页抓取离线系统团队基于HBase开发的一套分布式.持久化消息队列.它利用HTable存储消息数据,借助HBase Coprocessor将原始的KeyValue数据封装成消息数据格式进行存储,并基于HBase Client API封装了HQueue Client API用于消息存取. HQueue可以有效使用在需要存储时间序列数据.作为MapReduce Job和iStream等输入.输出供上下游共享

【Java TCP/IP Socket】基于线程池的TCP服务器(含代码)

了解线程池 在http://blog.csdn.net/ns_code/article/details/14105457(读书笔记一:TCP Socket)这篇博文中,服务器端采用的实现方式是:一个客户端对应一个线程.但是,每个新线程都会消耗系统资源:创建一个线程会占用CPU周期,而且每个线程都会建立自己的数据结构(如,栈),也要消耗系统内存,另外,当一个线程阻塞时,JVM将保存其状态,选择另外一个线程运行,并在上下文转换(context switch)时恢复阻塞线程的状态.随着线程数的增加,线

JAVA线程池中队列与池大小的关系

JAVA线程中对于线程池(ThreadPoolExecutor)中队列,池大小,核心线程的关系写出自己的理解: 1:核心线程:简单来讲就是线程池中能否允许同时并发运行的线程的数量 2:线程池大小:线程池中最多能够容纳的线程的数量. 3:队列:对提交过来的任务的处理模式. 对于线程池与队列的交互有个原则: 如果队列发过来的任务,发现线程池中正在运行的线程的数量小于核心线程,则立即创建新的线程,无需进入队列等待.如果正在运行的线程等于或者大于核心线程,则必须参考提交的任务能否加入队列中去. 1:提交

让工作线程具有消息队列和消息循环

Android的消息队列和消息循环都是针对具体线程的,一个线程可以存在一个消息队列和消息循环,特定线程的消息只能分发给本线程,不能跨线程和跨进程通讯.但是创建的工作线程默认是没有消息队列和消息循环的,如果想让工作线程具有消息队列和消息循环,就需要在线程中先调用Looper.prepare()来创建消息队列,然后调用Looper.loop()进入消息循环.下面是我们创建的工作线程: class WorkThread extends Thread { public Handler mHandler;

C++11消息队列 + Qt线程池 + QRunnable执行任务简单模型

1.模板类queue,包含头文件<queue>中,是一个FIFO队列. queue.push():在队列尾巴增加数据 queue.pop():移除队列头部数据 queue.font():获取队列头部数据的引用... 2.Qt库的线程池,QThreadPool QThreadPool.setMaxThreadCount():设置线程池最大线程数 QThreadPool.start(new QRunnable(..)):开启线程池调用QRunnable 3.QRunnable执行任务 void r

java自带线程池和队列详细讲解(转)

Java线程池使用说明 一简介 线程的使用在java中占有极其重要的地位,在jdk1.4极其之前的jdk版本中,关于线程池的使用是极其简陋的.在jdk1.5之后这一情况有了很大的改观.Jdk1.5之后加入了java.util.concurrent包,这个包中主要介绍java中线程以及线程池的使用.为我们在开发中处理线程的问题提供了非常大的帮助. 二:线程池 线程池的作用: 线程池作用就是限制系统中执行线程的数量.     根据系统的环境情况,可以自动或手动设置线程数量,达到运行的最佳效果:少了浪