swoole 异步队列

<?php
// 消费者,文件名《server.php》

class Server {
    private $serv;

    public function __construct() {
        $this->serv = new swoole_server("0.0.0.0", 9501);
        $this->serv->set(array(
            ‘worker_num‘ => 1,        // 一般设置为服务器CPU数的1-4倍
            ‘daemonize‘ => false,     // 以非守护进程执行(一般项目中填 true)
            ‘max_request‘ => 9999,    // 处理这么多请求之后,重启worker进程,防止内存泄漏
            ‘task_worker_num‘ => 4,   // task进程的数量
       //   ‘log_file‘ => ‘/data/log/swoole.log‘ ,     //日志
        ));

        $this->serv->on(‘Receive‘, array($this, ‘onReceive‘));
        // bind callback
        $this->serv->on(‘Task‘, array($this, ‘onTask‘));
        $this->serv->on(‘Finish‘, array($this, ‘onFinish‘));
        $this->serv->start();
    }

    /**
    * 接收到数据时回调此函数,发生在worker进程中
    * $server,swoole_server对象
    * $fd,TCP客户端连接的文件描述符
    * $from_id,TCP连接所在的Reactor线程ID
    * $data,收到的数据内容,可能是文本或者二进制内容
    */
    public function onReceive($serv, $fd, $from_id, $data ) {
        echo PHP_EOL."=========== onReceive ============".PHP_EOL;
        echo "Get Message From Client {$fd}:{$data}".PHP_EOL;
        $serv->task( $data );
    }

    /**
    * 在task_worker进程内被调用,worker进程可以使用swoole_server_task函数向task_worker进程投递新的任务,当前的Task进程在调用onTask回调函数时会将进程状态切换为忙碌      这时将不再接收新的Task,当onTask函数返回时会将进程状态切换为空闲然后继续接收新的Task
    * $task_id 是任务ID,由swoole扩展内自动生成,用于区分不同的任务,$task_id 和 $src_worker_id 组合起来才是全局唯一的,不同的worker进程投递的任务ID可能会有相同
    * $src_worker_id 来自于哪个worker进程
    * $data 是任务的内容
    */
    public function onTask($serv, $task_id, $from_id, $data) {
        $array = json_decode( $data , true );
        echo "=========== onTask ============".PHP_EOL;
        echo var_export($array, 1).PHP_EOL;
        return $array;    // 返回执行结果到worker进程,低版本的swoole要写成 $serv->finish($array);
    }

    /**
    * 当worker进程投递的任务在task_worker中完成时,task进程会通过swoole_server->finish()方法将任务处理的结果发送给worker进程
    * $task_id 是任务的ID
    * $data 是任务处理的结果内容,也就是 onTask()函数中的 return值
    */
    public function onFinish($serv, $task_id, $data) {
        echo "=========== onFinish ============".PHP_EOL;
        echo "Task {$task_id} finish !".PHP_EOL;
        echo var_export($data, 1).PHP_EOL;
    }

}

$server = new Server();

############################  华丽的分割线  ############################

<?php
// 生产者,文件名《client.php》

class Client {
    private $client;

    public function __construct() {
        $this->client = new swoole_client(SWOOLE_SOCK_TCP);
    }

    public function connect() {
        if( !$this->client->connect(‘127.0.0.1‘, 9501 , 1) ) {
            echo "Connect Error";
        }
        $data = array(
            ‘url‘ => ‘http://funsion.cnblogs.com‘,
            ‘timestamp‘ => date(‘Y-m-d H:i:s‘),
            );
        $json_data = json_encode($data);
        $this->client->send($json_data);
    }
}

$client = new Client();
$client->connect();

############################  华丽的分割线  ############################

先执行 php server.php (执行一次,会以守护进程运行)
然后再执行 php client.php (可以执行多次,每次执行会产生新的任务)
时间: 2024-10-22 16:49:09

swoole 异步队列的相关文章

.Net中的并行编程-4.实现高性能异步队列

上文<.Net中的并行编程-3.ConcurrentQueue实现与分析>分析了ConcurrentQueue的实现,本章就基于ConcurrentQueue实现一个高性能的异步队列,该队列主要用于实时数据流的处理并简化多线程编程模型.设计该队列时考虑以下几点需求(需求来自公司的一个实际项目): 1. 支持多线程入队出队,尽量简化多线程编程的复杂度. 2. 支持事件触发机制,数据入队时才进行处理而不是使用定时处理机制, 而且内部能阻塞消费者线程. 3. 出队时数据处理的顺序要保证和入队时是一致

[js高手之路]javascript腾讯面试题学习封装一个简易的异步队列

这道js的面试题,是这样的,页面上有一个按钮,一个ul,点击按钮的时候,每隔1秒钟向ul的后面追加一个li, 一共追加10个,li的内容从0开始技术( 0, 1, 2, ....9 ),首先我们用闭包封装一个创建li元素的函数. 1 var create = (function(){ 2 var count = 0; 3 return function(){ 4 var oLi = document.createElement( "li" ); 5 oLi.innerHTML = co

GCP异步队列-看过的最完整的文章了,特地转载一下

GCP异步队列-看过的最完整的文章了,特地转载一下 分类: Iphone开发入门 2014-04-16 11:48 309人阅读 评论(0) 收藏 举报 概念: 程序中同步和异步是什么意思?有什么区别? 解释一: 异步调用是通过使用单独的线程执行的.原始线程启动异步调用,异步调用使用另一个线程执行请求,而与此同时原始的线程继续处理. 同步调用则在继续之前必须等待响应或返回值.如果不允许调用继续即无响应或返回值,就说调用被阻塞了,不能继续执行. 解释二: 同步.一条马路,只能开一辆车,等这个车开走

.Net中的并行编程-7.基于BlockingCollection实现高性能异步队列

三年前写过基于ConcurrentQueue的异步队列,今天在整理代码的时候发现当时另外一种实现方式-使用BlockingCollection实现,这种方式目前依然在实际项目中使用.关于BlockingCollection的基本使用请查阅MSDN.源码实现 下面直接上代码:(代码已经放到了我的github上) using System; using System.Collections.Concurrent; using System.Collections.Generic; using Sys

javascript 多线程异步队列

首先,你得知道 jQuery.Deferred 的大致用法,然后,我们进入正题吧: 库代码: /*! * 多线程异步队列 * 依赖 jQuery 1.8+ (如果你用的是 1.6或1.7, 只要将源码中的 then方法替换为pipe方法 即可) */ /** * @n {Number} 正整数, 线程数量 */ function Queue (n) { n = parseInt(n || 1, 10); return (n && n > 0) ? new Queue.prototyp

jquery 1.7.2源码解析(四) 异步队列Deferred Object

异步队列Deferred Object 一)jQuery.Callbacks( flags ) 1.总体结构 该函数返回一个链式工具对象(回调函数列表),用于管理一组回调函数. 2.源码分析 1.工具函数createFlags(flags) 该函数用于将字符串标记转换为对象格式标记,并把转换结果缓存起来. // String to Object flags format cache var flagsCache = {}; // Convert String-formatted flags in

jQuery 源码解析(八) 异步队列模块 Callbacks 回调函数详解

异步队列用于实现异步任务和回调函数的解耦,为ajax模块.队列模块.ready事件提供基础功能,包含三个部分:Query.Callbacks(flags).jQuery.Deferred(funct)和jQuery.when().本节讲解Callbacks,也就是回调函数列表 回调函数用于管理一组回调函数,支持添加.移除.触发.锁定和禁用回调函数,为jQuery.ajax.jQuery.Deferred()和ready()事件提供基础功能,我们也可以基于它编写新的组件. 使用方法:$.Callb

jQuery源码分析(九) 异步队列模块 Deferred 详解

deferred对象就是jQuery的回调函数解决方案,它解决了如何处理耗时操作的问题,比如一些Ajax操作,动画操作等.(P.s:紧跟上一节:https://www.cnblogs.com/greatdesert/p/11433365.html的内容) 异步队列有三种状态:待定(pending).成功(resolved)和失败(rejected),初始时处于pending状态 我们可以使用jQuery.Deferred创建一个异步队列,返回一个对象,该对象含有如下操作: done(fn/arr

SWOOLE异步REDIS安装

SWOOLE的异步REDIS客户端一,安装hiredis    wget https://github.com/redis/hiredis/archive/v0.13.3.tar.gz    直接 make make install就可以了    就是把相关的头文件放到相应的目录里    二,升级swoole版本为1.8.0    github地址:        git clone https://github.com/swoole/swoole-src.git    或者tar.gz包