基于管道通知的百万并发长连接server模型

0、前言

最近突然想了解怎样设计一个支持百万连接的后台server架构。

要设计一个支持百万连接的后台server,我们首先要知道会有哪些因素限制后台server的高并发连接,这里想到的因素有以下几点:

1、操作系统的参数设置能否支持百万并发连接;

2、操作系统维持百万并发长连接需要多少内存;

3、应用层面上维持百万并发长连接需要多少内存;

4、百万并发长连接的吞吐量是否超过了硬件网卡的限制。

在学习的过程中,主要针对的是1、2、4,第3点一般跟业务相关,这里暂时没有考虑。

本篇文章估计需要多次才能完成,现在初步的想法是先写一个demo程序,然后后面再慢慢测试优化。

1、后台设计

1.1 后台设计图

如下为后台的设计结构:

1、首先主进程根据机器CPU个数,创建对应数量的管道;

2、创建完对应的管道之后,再创建一样数量的线程,每个线程绑定一个CPU;

3、主进程开始初始化socket,然后accept,当接收到一个客户端连接时,就把conn_fd写到某个pipe中;

3、每个线程创建epoll,然后监听对应pipe的写端fd,当监听到pipe中有数据时,就读取该数据,格式化为fd,将该fd加入epoll进行监听。

1.2 编码实现

根据1.1的设计,我们编写代码,包括server模块和worker模块。server模块负责创建pipe、线程、和监听客户端连接;worker模块负责处理每个客户端的连接。代码如下所示:

1.2.0 common

 1 #ifndef _SERV_COMMON_H
 2 #define _SERV_COMMON_H
 3
 4 typedef struct {
 5     int id;
 6     int fd;
 7 } thread_arg;
 8
 9 #define SERV_PORT 9876
10 #define MAX_LINE  1024
11
12 #endif

1.2.1 worker

worker.h

1 #ifndef _SERV_WORKER_H
2 #define _SERV_WORKER_H
3
4 void *worker(void *arg);
5
6 #endif

worker.cc

  1 #include <errno.h>
  2 #include <fcntl.h>
  3 #include <stdio.h>
  4 #include <stdlib.h>
  5 #include <string.h>
  6 #include <unistd.h>
  7 #include <sched.h>
  8 #include <pthread.h>
  9 #include <sys/epoll.h>
 10 #include <sys/types.h>
 11 #include <sys/socket.h>
 12
 13 #include "common.h"
 14
 15 #define MAXFDS 1000000
 16 #define EVENTSIZE 1000
 17
 18 int taskset_thread_core(int core_id)
 19 {
 20     cpu_set_t cpuset;
 21     CPU_ZERO(&cpuset);
 22     CPU_SET(core_id, &cpuset);
 23
 24     pthread_t curr_tid = pthread_self();
 25     return pthread_setaffinity_np(curr_tid, sizeof(cpu_set_t), &cpuset);
 26 }
 27
 28 int setnonblocking(int fd)
 29 {
 30     if (fcntl(fd, F_SETFL, fcntl(fd, F_GETFD, 0) | O_NONBLOCK) == -1) {
 31         printf("fd %d set non blocking failed\n", fd);
 32         return -1;
 33     }
 34
 35     return 0;
 36 }
 37
 38 void handle_req(int cli_fd)
 39 {
 40     char in_buff[MAX_LINE];
 41     int ret, rs = 1;
 42
 43     while (rs) {
 44         ret = recv(cli_fd, in_buff, 1024, 0);
 45
 46         if (ret < 0) {
 47             if (errno == EAGAIN) {
 48                 printf("EAGAIN\n");
 49                 break;
 50             } else {
 51                 printf("recv error: %d\n", errno);
 52                 close(cli_fd);
 53                 break;
 54             }
 55         } else if (ret == 0) {
 56             rs = 0;
 57         }
 58
 59         if (ret == sizeof(in_buff))
 60             rs = 1;
 61         else
 62             rs = 0;
 63     }
 64
 65     if (ret > 0) {
 66         send(cli_fd, in_buff, strlen(in_buff), 0);
 67     }
 68 }
 69
 70 void run_epoll(int epfd, int pipe_fd)
 71 {
 72     int i, cli_fd, nfds;
 73     struct epoll_event ev, events[EVENTSIZE];
 74     char buff[16];
 75
 76     ev.events = EPOLLIN | EPOLLET;
 77
 78     while (1) {
 79         nfds = epoll_wait(epfd, events, EVENTSIZE , -1);
 80         for (i = 0; i < nfds; i++) {
 81             // pipe msg, add connected fd to epoll
 82             if (events[i].data.fd == pipe_fd) {
 83                 read(pipe_fd, buff, 16);
 84                 cli_fd = atoi(buff);
 85                 setnonblocking(cli_fd);
 86                 ev.data.fd = cli_fd;
 87
 88                 if (epoll_ctl(epfd, EPOLL_CTL_ADD, cli_fd, &ev) < 0) {
 89                     printf("epoll add fd %d failed\n", cli_fd);
 90                 }
 91             } else {  // socket msg
 92                 cli_fd = events[i].data.fd;
 93                 handle_req(cli_fd);
 94             }
 95         }
 96     }
 97 }
 98
 99 void *worker(void *arg)
100 {
101     int epfd, pipe_fd;
102     struct epoll_event ev;
103
104     taskset_thread_core(((thread_arg*) arg)->id);
105
106     pipe_fd = ((thread_arg*) arg)->fd;
107     epfd = epoll_create(MAXFDS);
108     setnonblocking(pipe_fd);
109     ev.data.fd = pipe_fd;
110     ev.events = EPOLLIN | EPOLLET | EPOLLONESHOT;
111     if (epoll_ctl(epfd, EPOLL_CTL_ADD, pipe_fd, &ev) < 0) {
112         printf("epoll add mq fail\n");
113     }
114
115     run_epoll(epfd, pipe_fd);
116
117     return 0;
118 }

1.2.2 server

写完后台代码之后,开始测试能支持多少连接,但测试过程中一直有问题,会报如下的错误:error: Cannot assign requested address。

google了一下,说是因为短时间内大量短连接造成TIME_WAIT耗尽端口问题,不明白我的测试代码怎么是短连接,而不是长连接。

我的客户端代码如下,不知道是哪里出问题了。

#include <unistd.h>
#include <arpa/inet.h>
#include <sys/socket.h>

#include <stdio.h>
#include <stdlib.h>
#include <string.h>
#include <errno.h>

void process_conn_svr(const char *svr_ip, int svr_port);

int connections = 0;

#define MAX_CONN 1005000
int fd[MAX_CONN];

int main(int argc, char **argv)
{
    if (argc <= 2) {
        printf("usage: %s ip port\n", argv[0]);
        exit(0);
    }
    const char *ip = argv[1];
    int port = atoi(argv[2]);

    pid_t pid = fork();
    if (pid == 0) {
        process_conn_svr(ip, port);
    }

    const char buf[] = "keepalive!";
    for (;;) {
        usleep(1*1000);
        for (int i = 0; i < MAX_CONN; ++i) {
            if (fd[i] != 0) {
                send(fd[i], buf, sizeof(buf), 0);
            }
        }
    }
    return 0;   

}

void process_conn_svr(const char *svr_ip, int svr_port)
{
    int conn_idx = 0;
     for (;;) {
        struct sockaddr_in serv_addr;
        bzero(&serv_addr, sizeof(serv_addr));
        serv_addr.sin_family = AF_INET;
        inet_pton(AF_INET, svr_ip, &serv_addr.sin_addr);

        serv_addr.sin_port = htons(svr_port);
        int cli_fd = socket(AF_INET, SOCK_STREAM, 0);
        if (cli_fd == -1) {
            goto sock_err;
        }

        if (connect(cli_fd, (struct sockaddr*)&serv_addr, sizeof(serv_addr)) == -1) {
            goto sock_err;
        }

        fd[conn_idx] = cli_fd;
        conn_idx++;

        connections++;
        printf("connections: %d, fd: %d\n", connections, cli_fd);

        if (connections % 10000 == 9999) {
            printf("press Enter to continue: ");
            getchar();
        }
        usleep(1*1000);
    }

sock_err:
    printf("error: %s\n", strerror(errno));
}

时间: 2024-12-18 01:16:44

基于管道通知的百万并发长连接server模型的相关文章

百万级别长连接,并发测试指南

前言 都说haproxy很牛x, 可是测试的结果实在是不算满意, 越测试越失望,无论是长连接还是并发, 但是测试的流程以及工具倒是可以分享分享.也望指出不足之处. 100w的长连接实在算不上太难的事情,不过对于网上关于测试方法以及测试工具的相关文章实在不甚满意,才有本文. 本文有两个难点,我算不上完全解决. 后端代码的性能. linux内核参数的优化. 环境说明 下面所有的测试机器都是基于openstack云平台,kvm虚拟化技术创建的云主机. 由于一个socket连接一般占用8kb内存,所以百

基于netty实现的长连接,心跳机制及重连机制

技术:maven3.0.5 + netty4.1.33 + jdk1.8 概述 Netty是由JBOSS提供的一个java开源框架.Netty提供异步的.事件驱动的网络应用程序框架和工具,用以快速开发高性能.高可靠性的网络服务器和客户端程序. 也就是说,Netty 是一个基于NIO的客户.服务器端编程框架,使用Netty 可以确保你快速和简单的开发出一个网络应用,例如实现了某种协议的客户.服务端应用.Netty相当于简化和流线化了网络应用的编程开发过程,例如:基于TCP和UDP的socket服务

长连接和短连接分析

转自:http://www.cnblogs.com/heyonggang/p/3660600.html 1. TCP连接 当网络通信时采用TCP协议时,在真正的读写操作之前,server与client之间必须建立一个连接,当读写操作完成后,双方不再需要这个连接 时它们可以释放这个连接,连接的建立是需要三次握手的,而释放则需要4次握手,所以说每个连接的建立都是需要资源消耗和时间消耗的 经典的三次握手示意图: 经典的四次握手关闭图: 2. TCP短连接 我们模拟一下TCP短连接的情况,client向

长连接&amp;短连接分析

转自:http://www.cnblogs.com/heyonggang/p/3660600.html 1. TCP连接 当网络通信时采用TCP协议时,在真正的读写操作之前,server与client之间必须建立一个连接,当读写操作完成后,双方不再需要这个连接 时它们可以释放这个连接,连接的建立是需要三次握手的,而释放则需要4次握手,所以说每个连接的建立都是需要资源消耗和时间消耗的 经典的三次握手示意图: 经典的四次握手关闭图: 2. TCP短连接 我们模拟一下TCP短连接的情况,client向

LinkedIn的即时消息:在一台机器上支持几十万条长连接

最近我们介绍了LinkedIn的即时通信,最后提到了分型指标和读回复.为了实现这些功能,我们需要有办法通过长连接来把数据从服务器端推送到手机或网页客户端,而不是许多当代应用所采取的标准的请求-响应模式.在这篇文章中会描述在我们收到了消息.分型指标和读回复之后,如何立刻把它们发往客户端. 内容会包含我们是如何使用Play框架和Akka Actor Model来管理长连接.由服务器主动发送事件的.我们也会分享一些在生产环境中我们是如何在服务器上做负载测试,来管理数十万条并发长连接的,还有一些心得.最

Comet技术详解:基于HTTP长连接的Web端实时通信技术

前言 一般来说,Web端即时通讯技术因受限于浏览器的设计限制,一直以来实现起来并不容易,主流的Web端即时通讯方案大致有4种:传统Ajax短轮询.Comet技术.WebSocket技术.SSE(Server-sent Events). 关于这4种技术方式的优缺点,请参考<Web端即时通讯技术盘点:短轮询.Comet.Websocket.SSE>.本文将专门讲解Comet技术.(本文同步发布于:http://www.52im.net/thread-334-1-1.html) 学习交流 - 即时通

转:基于ASP.NET的Comet长连接技术解析

原文来自于: Comet技术原理 来自维基百科:Comet是一种用于web的技术,能使服务器能实时地将更新的信息传送到客户端,而无须客户端发出请求,目前有两种实现方式,长轮询和iframe流. 简单的说是一种基于现有Http协议基础上的长轮询技术,之所有会产生这种技术的主要原因是Http协议是无状态的所以客户端和服务端之间没办法建立起一套长时间的连接.比如我们要做一个聊天室,在Web环境下我们通常不能从服务端推送消息到浏览器里,而只能通过每个客户端不断的轮询服务器,以获取最新的消息,这样一来效率

Comet:基于 HTTP 长连接的“服务器推”技术

“服务器推”技术的应用 传统模式的 Web 系统以客户端发出请求.服务器端响应的方式工作.这种方式并不能满足很多现实应用的需求,譬如: 监控系统:后台硬件热插拔.LED.温度.电压发生变化: 即时通信系统:其它用户登录.发送信息: 即时报价系统:后台数据库内容发生变化: 这些应用都需要服务器能实时地将更新的信息传送到客户端,而无须客户端发出请求.“服务器推”技术在现实应用中有一些解决方案,本文将这些解决方案分为两类:一类需要在浏览器端安装插件,基于套接口传送信息,或是使用 RMI.CORBA 进

基于分布式Http长连接框架--架构模型

我画了个简单的架构图来帮助说明: 其实为发布订阅架构模式. 生产者和消费者我们统一可理解为客户端,消息中间件可认为是服务端. 生产者和消费者做为客户端要跟服务端交互,则先通过代理订阅服务端,订阅成功后即可跟服务端互通互联,此刻的连接通道为长连接. 长连接的优势在于会将消息主动通知到客户端,避免客户端去做大量的轮询工作而造成资源浪费,而且对于移动应用来说,可较大程度上节省GPRS流量. 当连接建立好后,生产者可随时发送消息,如果在发消息过程当中,服务端由于各种原因不能连接,则消息的发送会回放重试,