swoole process use queue example

server:

class Server{    private $_serv = null;

    private $_workers = [];

    private $_worker_num = 2;

    public function __construct()    {        $this->_serv = new swoole_server(‘127.0.0.1‘, 9501);

        $this->_serv->on(‘start‘, array($this, ‘onStart‘));        $this->_serv->on(‘workerstart‘, array($this, ‘onWorkerStart‘));        $this->_serv->on(‘connect‘, array($this, ‘onConnect‘));        $this->_serv->on(‘receive‘, array($this, ‘onReceive‘));        $this->_serv->on(‘close‘, array($this, ‘onClose‘));

        $this->_serv->start();    }

    public function onStart($serv)    {        echo "start \n";    }

    public function onWorkerStart($serv, $worker_id)    {        echo "worker start\n";    }        public function onConnect($serv, $fd, $from_id )    {        echo "connect..\n";    }

    public function onReceive(swoole_server $serv, $fd, $from_id, $data)    {

        for ($i = 0; $i < $this->_worker_num ;$i++) {            $process = new swoole_process(array($this, ‘onProcess‘), false, false);            $process->useQueue();            $pid = $process->start();            echo $pid;            $this->_workers[$pid] = $process;        }

        foreach ($this->_workers as $pid => $worker) {            echo $process->push("hello worker[{$pid}]\n");            sleep(2);//停2秒,不然主进程pop时 获取消息太快,而阻塞状态了,push完给子进程消息,子进程逻辑还没处理完            //(1)不加这停两秒时,可能主进程马上pop时,队列是空的,主进程阻塞状态了,下面子进程只能pop和push一次,再pop时也阻塞状态了            //客户端再发send消息时,服务器不响应,除非新client进来,发send,主进程阻塞取消了,第一个客户端又可以发send消息了,第二个客户端            //发send消息时,服务端不响应,这种情况发生了            $result = $process->pop();//默认模式下,如果队列中没有数据,pop方法会阻塞等待            echo "From worker: $result\n";//这里主进程,接受到的子进程的数据        }

        for($i = 0; $i < $this->_worker_num; $i++)        {            $ret = swoole_process::wait();            $pid = $ret[‘pid‘];            unset($this->_workers[$pid]);            echo "Worker Exit, PID=".$pid.PHP_EOL;        }

    }

    public function onProcess($worker)    {        $msg_status = $worker->statQueue();        if($msg_status[‘queue_num‘] > 0)        {            $recv = $worker->pop();

            echo "FROM master {$recv}\n";            $worker->push("heheh parent");//子进程处理完逻辑,不是马上push数据给主进程  sleep(2)        }        $worker->exit(0);

    }

    public function onClose($serv, $fd, $from_id)    {        echo "close.\n";    }}

new Server();

client:

$cli = new Swoole_client(SWOOLE_SOCK_TCP);

$cli->connect(‘127.0.0.1‘, 9501, 1);

fwrite(STDOUT, ‘输入消息:‘);$msg = trim(fgets(STDIN));

echo $cli->send($msg);

echo $cli->recv()."\n";
时间: 2024-10-15 21:25:27

swoole process use queue example的相关文章

swoole process example

$workers = [];$worker_num = 2; for($i = 0; $i < $worker_num; $i++){    $process = new swoole_process('callback_function',false,false);    $process->useQueue();    $pid = $process->start();    $workers[$pid] = $process;    //echo "Master: new

PHP swoole process的使用

引入背景:假如我们每天有10000个订单生成,需要同步到仓储系统中去,以前做法是开启一个crontab去跑这些任务,但是发现总有感觉同步效率低,间隔时间都是分钟级别的. 解决方案测试:我们将同步订单的任务表添加一个hash作为key,作为分发条件,因为mysql中select如果做mod函数是用不到索引的,所以我们自己做随机hash,但是务必不需要范围太大,以免服务器资源不够,方法是根据hashkey投放到不同的进程中进行同步,测试代码如下 <?php /** * Created by PhpS

关于python multiprocessing进程通信的pipe和queue方式

这两天温故了python 的multiprocessing多进程模块,看到的pipe和queue这两种ipc方式,啥事ipc? ipc就是进程间的通信模式,常用的一半是socke,rpc,pipe和消息队列等. 今个就再把pipe和queue搞搞. #coding:utf-8 import multiprocessing import time def proc1(pipe):     while True:         for i in xrange(10000):            

python Queue在两个地方

其一: Source code: Lib/queue.py The queue module implements multi-producer, multi-consumer queues. It is especially useful in threaded programming when information must be exchanged safely between multiple threads. The Queue class in this module implem

swoole创建工作进程,执行滞后工作

一,创建守候进程,因为这里不需要Server,也没有Client,数据交换通过redis进行 <?php namespace Kuba\Saas; require_once __DIR__ . '/Core/ErrorHandle.php'; use \Swoole\Timer; use \Swoole\Process; use Kuba\Saas\Core\ErrorHandle; final class Services { private $m_workers = []; private

python 学习笔记 - Queue &amp; Pipes,进程间通讯

上面写了Python如何创建多个进程,但是前面文章中创建的进程都是哑巴和聋子,自己顾自己执行,不会相互交流.那么如何让进程间相互说说话呢?Python为我们提供了一个函数multiprocessing.Pipe和一个类:multiprocessing.Queue. multiprocessing.Pipe() multiprocessing.Pipe()即管道模式,调用Pipe()返回管道的两端的Connection. Python官方文档的描述: Returns a pair (conn1,

进程---Process

#! /usr/bin/env python# -*- coding:utf-8 -*- """ python中的多线程其实并不是真正的多线程(全局解释器锁(GIL)存在) 多进程包multiprocessing:可以轻松完成从单进程到并发执行的转换 multiprocessing支持子进程.通信和共享数据.执行不同形式的同步,提供了Process.Queue.Pipe.Lock等组件"""import multiprocessingimport

swoole 父子进程间通信

<?php /** * 场景: * 监控订单表状态 父子进程通信 * 一个主进程 两个子进程实现 */ //设置主进程名 echo '主进程id:' . posix_getpid() . PHP_EOL; cli_set_process_title('php_main'); //1.此子进程用于监听数据的改变 $process1 = new Swoole\Process(function (\Swoole\Process $process) { // cli_set_process_title(

swoole 消息队列

<?php /** * 场景: * 监控订单表状态 队列通信 * 一个进程向队列发布消息 另外两个进程争抢 */ //设置主进程名 echo '主进程id:' . posix_getpid() . PHP_EOL; cli_set_process_title('php_main'); //1.此子进程用于监听数据的改变 $process1 = new \Swoole\Process(function (\Swoole\Process $process) { // cli_set_process_