基于SWOOLE的分布式SOCKET消息服务器架构

消息服务器使用socket,为避免服务器过载,单台只允许500个socket连接,当一台不够的时候,扩充消息服务器是必然,问题来了,如何让链接在不同消息服务器上的用户可以实现消息发送呢?

要实现消息互通就必须要让这些消息服务器本身能互通,想了两个方式,一种是消息服务器之间交叉链接,另一种是增加一个特殊的消息服务器,这个消息服务器不对外开放,只负责消息转发和推送。

下列测试不考虑防火墙等。仅测试可行性和效率。

测试环境

  • 消息服务器
    192.168.0.201 9501
    192.168.0.202 9501
    
  • 转发服务器
    192.168.0.203 9501
    
  • 公共缓存
    Redis 192.168.0.231 6379
    
  • 软件环境
    centos 6.5 mini swoole php
    

流程图

  • 整个流程图如下:

  • 流程图说明:
    client1可向client2或者其他client发送消息,并接收其他client发送的消息.

    Redis中保存client连接的信息,给每个用户分配唯一的key,包括链接的哪台服务器,转发服务器定时检测消息服务器,如消息服务器挂掉,由转发服务器清理掉Redis已经挂掉的所有链接。

  • 完整的流程:

    1.Client1Client2发送一条消息

    2.Socket1接收到消息,根据key从Redis取出Client2的连接信息,连接在本机,直接推送给Client2,流程结束。

    3.如果连接不在本机,把消息推送到转发服务器,由转发服务器把该消息推送给连接所在消息服务器,消息服务器接收消息,推送给Client2

    4.消息发送结束。

编码实现

  • Socket
    在socket1上创建一个server.php,内容如下:

      <?php //服务端
       $serv = new swoole_server("0.0.0.0", 9501);
    
      //redis
      $redis = new \Redis();
      $redis->connect("192.168.0.231", 6379);
    
      //client
      $proxy = new swoole_client(SWOOLE_TCP | SWOOLE_KEEP);
      $proxy->connect("192.168.0.203", 9501);
    
      $serv->on(‘start‘, function($serv) {
          echo "Service:Start...";
      });
      $serv->on(‘connect‘, function ($serv, $fd) {
    
      });
      $serv->on(‘receive‘, function ($serv, $fd, $from_id, $data) {
          global $redis;
    
          $data = (array) json_decode($data);
          $cmd = $data[‘cmd‘];
    
         switch ($cmd) {
    
              case "login"://登陆
                  //保存连接信息
                  $save = array(
                      ‘fd‘ => $fd,
                      ‘socket_ip‘ => "192.168.0.201"
                  );
                  $redis->set($data[‘name‘], serialize($save));
                  break;
    
              case "chat":
                  $recv = unserialize($redis->get($data[‘recv‘]));
                  if ($recv[‘socket_ip‘] != "192.168.0.201") {
                      //需要转发
                      $data[‘cmd‘] = ‘forward‘;
                      $data[‘recv_ip‘] = $recv[‘socket_ip‘];
                      $serv->task(json_encode($data));
                  } else {
                      //直接发送
                      $serv->send($recv[‘fd‘], "{$data[‘send‘]}给您发了消息:{$data[‘content‘]}");
                  }
              break;
    
              case "forward"://接收转发消息
                  $recv = unserialize($redis->get($data[‘recv‘]));
                  $serv->send($recv[‘fd‘], "{$data[‘send‘]}给您发了消息:{$data[‘content‘]}");
    
              break;
          }
          //$serv->send($fd, ‘Swoole: ‘ . $data);
      });
      $serv->on(‘task‘, function ($serv, $task_id, $from_id, $data) {
          global $proxy;
          $proxy->send($data);
      });
    
      $serv->on(‘finish‘, function ($serv, $task_id, $data) {
    
      });
      $serv->on(‘close‘, function ($serv, $fd) {
          echo "Client: Close.\n";
      });
    
      $serv->set(array(‘task_worker_num‘ => 4));
    
      $serv->start();
    

    在socket2上只需把ip变更一下即可。192.168.0.201变更为192.168.0.202.

  • Proxy
    在转发服务器上建立脚本proxy.php,内容如下:
     $serv = new swoole_server("0.0.0.0", 9501); //服务端
     $serv->on(‘start‘, function($serv) {
         echo "Service:Start...";
     });
     $serv->on(‘connect‘, function ($serv, $fd) {
    
     });
     $serv->on(‘receive‘, function ($serv, $fd, $from_id, $data) {
         global $redis;
         $serv->task($data);
     });
     $serv->on(‘task‘, function ($serv, $task_id, $from_id, $data) {
         $forward = (array) json_decode($data);
         $client = new swoole_client(SWOOLE_SOCK_TCP, SWOOLE_SOCK_SYNC);
    
         $client->connect($forward[‘recv_ip‘], 9501);
         unset($forward[‘recv_ip‘]);
         $client->send(json_encode($forward));
         $client->close();
     });
    
     $serv->on(‘finish‘, function ($serv, $task_id, $data) {
    
     });
     $serv->on(‘close‘, function ($serv, $fd) {
         echo "Client: Close.\n";
     });
    
     $serv->set(array(‘task_worker_num‘ => 4));
    
     $serv->start();
    

测试

注意开启顺序

1.开启转发服务器php proxy.php

2.分别开启socket服务器php server.php

可以在转发服务器上看到两个消息服务器已经连接
3.开始测试,分别打开两个telnet,连接两个消息服务器,发送消息测试:
登陆

发送消息测试

消息成功接收。

基于强大的swoole扩展,让php高效的实现这些成为可能,目前消息服务器到转发服务器是长连接,转发服务器到消息服务器是短连接,存在性能瓶颈,也浪费了连接资源。下一步改造成长连接,消息服务器的client使用异步。

时间: 2024-11-08 21:24:41

基于SWOOLE的分布式SOCKET消息服务器架构的相关文章

用PHP搭建基于swoole扩展的socket服务(附PHP扩展的安装步骤)

最近公司的一项目中,需要用PHP搭建一个socket服务. 本来PHP是不适合做服务的,因为和第三方合作,需要采用高效而稳定的TCP协议进行数据通信.经过多次尝试,最终选择了开源的PHP扩展:swoole,是用C写的多线程异步Server. swoole官网:http://www.swoole.com/,http://wiki.swoole.com/wiki/index/prid-1 swoole入门教程及文档:https://github.com/LinkedDestiny/swoole-do

java分布式通信系统(J2EE分布式服务器架构)--(转载)

一.序言 近几个月一直从事一个分布式异步通信系统,今天就整理并blog一下. 这是一个全国性的通信平台,对性能,海量数据,容错性以及扩展性有非常高的要求,所以在系统的架构上就不能简单的采用集中式.简单的总结一下就是: 1.数据分布式存储 2.请求分布式调度 3.多结点分布式部署 4.双重备份,热切换 系统的核心无非就是网络架构,分布式算子和通信,要求如下: 分布式算子: 1.对于任意输入,输出均匀分布 2.输出结果数可控 通信: 1.高并发量 2.多线程 分布式算子我们选择的是sun公司的has

java分布式通信系统(J2EE分布式服务器架构)

一.序言 近几个月一直从事一个分布式异步通信系统,今天就整理并blog一下. 这是一个全国性的通信平台,对性能,海量数据,容错性以及扩展性有非常高的要求,所以在系统的架构上就不能简单的采用集中式.简单的总结一下就是: 1.数据分布式存储 2.请求分布式调度 3.多结点分布式部署 4.双重备份,热切换 系统的核心无非就是网络架构,分布式算子和通信,要求如下: 分布式算子: 1.对于任意输入,输出均匀分布 2.输出结果数可控 通信: 1.高并发量 2.多线程 分布式算子我们选择的是sun公司的has

基于 OpenResty 的服务器架构设计

这个服务器架构不一定能用上,记录在这里,算是一个小小的学习成果. 1. 技术选择 Cocos2d-x 3.x —— 客户端框架. WebSockt —— 网络协议. HTTP —— 网络协议. OpenResty —— 基于 nginx+lua 实现 WebSocket 或 HTTP 服务器. MySQL —— 数据库支持. Redis —— NoSQL 支持. 2. 逻辑服务器 有两个不同的客户端需要提供服务.data_tester 和 client .它们都需要 WebSocket 服务,

部署基于国际版Azure的SharePoint三层架构服务器场

前言 微软Azure国际版已经很普及了,这里没有用国内版(世纪互联),用的是国际版,当然是由于公司性质的缘故.这里一步步图文的方式,分享给大家创建Azure国际版的SharePoint三层架构的过程,并带给大家一些使用感受. 自己在使用的过程中,也发现一些问题,搜了很久也没有搞定,最后在MS case的帮助下,才真正解决了问题.同时也分享给大家,对于已经深入了解Azure的朋友,可以忽略本文,烦请勿见笑. 1.申请Azure账号,这部分略过了,我这里已经有创建好的Azure账号,在管理页面上点击

分享一个大型进销存供应链项目(多层架构、分布式WCF多服务器部署、微软企业库架构)

分享一个大型进销存供应链项目(多层架构.分布式WCF多服务器部署.微软企业库架构) 这是一个比较大型的项目,准备开源了.支持N家门店同时操作.远程WCF+企业库5.0实现. 这块应该算是库存模块中的核心模块了,因为该块的业务逻辑比较多,比较繁琐,大致讲讲业务逻辑吧,大致的逻辑为:出库单/出库单-->填写订单-->出库/入库-->修改库存信息,按照这个顺序来完成入库出库,顺序不能颠倒,同时还要实现订单的删除,修改,在修改库存信息时由于表和表之间有很多的外键关系,所以要同时删除多张表中含有删

基于Redis实现分布式消息队列(汇总目录)

基于Redis实现分布式消息队列(1)– 缘起 http://blog.csdn.net/stationxp/article/details/45595733 基于Redis实现分布式消息队列(2)– 分布式消息队列功能设计 http://blog.csdn.net/stationxp/article/details/45596619 基于Redis实现分布式消息队列(3)– Redis功能分析 http://blog.csdn.net/stationxp/article/details/457

电商总结(四)基于共享存储的图片服务器架构

在当前这个互联网的时代,不管何种网站,对图片的需求量越来越大,尤其在电商网站中,几乎都会面临到海量图片资源的存储.访问等相关技术问题.在对图片服务器的架构,扩展,升级的过程中,肯定也会碰到各种各样的问题,各种各样的需求.当然这并不代表,就必须得弄一个特别NB的图片服务架构,简单,高效,稳定就行.所以今天就来总结一个特别简单,高效的图片服务架构:通过共享存储的方式来实现图片服务架构. 然而,也有一些人问我,现在大型网站的图片服务器的架构已经完全不是这样的了,别人家的图片系统,比你这个牛逼多了,为啥

基于Scala的Actor之上的分布式并发消息驱动框架Akka初体验

学习了基于Scala的Actor之上的分布式并发消息驱动框架Akka初体验,应用actor模型,位置透明,做到高并发.可伸缩.容错.单机也可以用,水平扩展.垂直扩展.容错都有很好的表现,spark中的例子如下: private def initializeEventProcessActor(){ implicat val timeout=Timeout( 30 seconds) val initEventActorReply= dagSchedulerActorSupervisor ? Prop