swoole与php协程实现异步非阻塞IO开发

“协程可以在遇到阻塞的时候中断主动让渡资源,调度程序选择其他的协程运行。从而实现非阻塞IO”

然而php是不支持原生协程的,遇到阻塞时如不交由异步进程来执行是没有任何意义的,代码还是同步执行的,如下所示:

function foo()

{

$db=new Db();

$result=(yield $db->query());

yield $result;

}

上面的数据库查询操作是阻塞的,当调度器调度该协程到这一步时发现执行了阻塞操作,此时调度器该怎么办?选择其余协程执行?那该协程的阻塞操作又该何时执行,交由谁执行呢?所以说在php协程中抛开异步调用谈非阻塞IO属于耍流氓。

而swoole的异步task提供了一个实现异步的解决方案,关于swoole_task可以参考官方文档

核心功能实现

将一次请求形成一个协程

首先创建一个swoole_server并设置回调

class HttpServer implements Server

{

private $swooleHttpServer;

public function __construct(\swoole_http_server $swooleHttpServer)

{

$this->swooleHttpServer = $swooleHttpServer;

}

public function start()

{

$this->swooleHttpServer->on(‘start‘, [$this, ‘onStart‘]);

$this->swooleHttpServer->on(‘shutdown‘, [$this, ‘onShutdown‘]);

$this->swooleHttpServer->on(‘workerStart‘, [$this, ‘onWorkerStart‘]);

$this->swooleHttpServer->on(‘workerStop‘, [$this, ‘onWorkerStop‘]);

$this->swooleHttpServer->on(‘workerError‘, [$this, ‘onWorkerError‘]);

$this->swooleHttpServer->on(‘task‘, [$this, ‘onTask‘]);

$this->swooleHttpServer->on(‘finish‘, [$this, ‘onFinish‘]);

$this->swooleHttpServer->on(‘request‘, [$this, ‘onRequest‘]);

$this->swooleHttpServer->start();

}

onRequest方法:

public function onRequest(\swoole_http_request $request, \swoole_http_response $response)

{

$requestHandler = new RequestHandler($request, $response);

$requestHandler->handle();

}

在ReqeustHandler中执行handle方法,来解析请求的路由,并创建控制器,调用相应的方法,相

public function handle()

{

$this->context = new Context($this->request, $this->response, $this->getFd());

$this->router = new Router($this->request);

try {

if (false === $this->router->parse()) {

$this->response->output(‘‘);

return;

}

$coroutine = $this->doRun();

$task = new Task($coroutine, $this->context);

$task->run();

} catch (\Exception $e) {

PcsExceptionHandler::handle($e, $this->response);

}

}

private function doRun()

{

$ret = (yield $this->dispatch());

yield $this->response->send($ret);

}

上面代码中的ret是action()的调用结果,yield $this->response->send($ret);是向对客户端请求的应答。

$coroutine是这一次请求形成的一个协程(Genetator对象),包含了整个请求的流程,接下来就要对这个协程进行调度来获取真正的执行结果。

协程调度

namespace Pcs\Coroutine;

use Pcs\Network\Context\Context;

class Task

{

private $coroutine;

private $context;

private $status;

private $scheduler;

private $sendValue;

public function __construct(\Generator $coroutine, Context $context)

{

$this->coroutine = $coroutine;

$this->context = $context;

$this->scheduler = new Scheduler($this);

}

public function run()

{

while (true) {

try {

$this->status = $this->scheduler->schedule();

switch ($this->status) {

case TaskStatus::TASK_WAIT:

echo "task status: TASK_WAIT\n";

return null;

case TaskStatus::TASK_DONE:

echo "task status: TASK_DONE\n";

return null;

case TaskStatus::TASK_CONTINUE;

echo "task status: TASK_CONTINUE\n";

break;

}

} catch (\Exception $e) {

$this->scheduler->throwException($e);

}

}

}

public function setCoroutine($coroutine)

{

$this->coroutine = $coroutine;

}

public function getCoroutine()

{

return $this->coroutine;

}

public function valid()

{

if ($this->coroutine->valid()) {

return true;

} else {

return false;

}

}

public function send($value)

{

$this->sendValue = $value;

$ret = $this->coroutine->send($value);

return $ret;

}

public function getSendVal()

{

return $this->sendValue;

}

}

Task依赖于Generator对象$coroutine,在Task类中定义了一些get/set方法,以及一些Generator的方法,Task::run()方法用来执行对协程的调度,调度行为由Schedule来执行,每次调度都会返回当前这次调度的状态。多个协程共用一个调度器,而这里run方法会为每个协程创建一个调度器,原因是每个协程都是一个客户端的请求,使用一个单独的调度器能减少相互间的影响,而且多个协程之间的调度顺序是swoole来处理的,这里的调度器不用关心。下面给出调度的代码:

namespace Pcs\Coroutine;

class Scheduler

{

private $task;

private $stack;

const SCHEDULE_CONTINUE = 10;

public function __construct(Task $task)

{

$this->task = $task;

$this->stack = new \SplStack();

}

public function schedule()

{

$coroutine = $this->task->getCoroutine();

$value = $coroutine->current();

$status = $this->handleSystemCall($value);

if ($status !== self::SCHEDULE_CONTINUE) return $status;

$status = $this->handleStackPush($value);

if ($status !== self::SCHEDULE_CONTINUE) return $status;

$status = $this->handleAsyncJob($value);

if ($status !== self::SCHEDULE_CONTINUE) return $status;

$status = $this->handelYieldValue($value);

if ($status !== self::SCHEDULE_CONTINUE) return $status;

$status = $this->handelStackPop();

if ($status !== self::SCHEDULE_CONTINUE) return $status;

return TaskStatus::TASK_DONE;

}

public function isStackEmpty()

{

return $this->stack->isEmpty();

}

private function handleSystemCall($value)

{

if (!$value instanceof SystemCall) {

return self::SCHEDULE_CONTINUE;

}

}

private function handleStackPush($value)

{

if (!$value instanceof \Generator) {

return self::SCHEDULE_CONTINUE;

}

$coroutine = $this->task->getCoroutine();

$this->stack->push($coroutine);

$this->task->setCoroutine($value);

return TaskStatus::TASK_CONTINUE;

}

private function handleAsyncJob($value)

{

if (!is_subclass_of($value, Async::class)) {

return self::SCHEDULE_CONTINUE;

}

$value->execute([$this, ‘asyncCallback‘]);

return TaskStatus::TASK_WAIT;

}

public function asyncCallback($response, $exception = null)

{

if ($exception !== null

&& $exception instanceof \Exception

) {

$this->throwException($exception, true);

} else {

$this->task->send($response);

$this->task->run();

}

}

private function handelYieldValue($value)

{

if (!$this->task->valid()) {

return self::SCHEDULE_CONTINUE;

}

$ret = $this->task->send($value);

return TaskStatus::TASK_CONTINUE;

}

private function handelStackPop()

{

if ($this->isStackEmpty()) {

return self::SCHEDULE_CONTINUE;

}

$coroutine = $this->stack->pop();

$this->task->setCoroutine($coroutine);

$value = $this->task->getSendVal();

$this->task->send($value);

return TaskStatus::TASK_CONTINUE;

}

public function throwException($e, $isFirstCall = false)

{

if ($this->isStackEmpty()) {

$this->task->getCoroutine()->throw($e);

return;

}

try {

if ($isFirstCall) {

$coroutine = $this->task->getCoroutine();

} else {

$coroutine = $this->stack->pop();

}

$this->task->setCoroutine($coroutine);

$coroutine->throw($e);

$this->task->run();

} catch (\Exception $e) {

$this->throwException($e);

}

}

}

Scheduler中的schedule方法会获取当前Task的协程,并通过current()方法获取当前中断点的返回值,接着依次调用5个方法来对返回值进行处理。

1:handleSystemCall

如果返回的值是SystemCall类型的对象,则执行系统调用,如killTask之类的操作,systemCall是第一优先级。

2:handleStackPush

在A函数中调用B函数,则B函数称为A函数的子例程(子函数),然而在协程中却不能像普通函数那样调用。

function funcA()

{

return funcB();

}

function genA()

{

yield genB();

}

在funcA中funcB();会返回funcB的执行结果,但是在genA中,yield genB();会返回一个Generator对象,而不是genB的最终执行结果。想得到genB的执行结果需要对genB进行调度,而genB中又可能有genC()genD()的协程嵌套,所以为了让协程像函数一眼正常调用,这里使用协程栈来实现。

如上图,当调度器获取到GenA(父协程)的返回值is instance of Generator时,调度器会把父协程push到stack中,然后把子协程分配给Task,继续调度子协程。如此反复直到最后一个子协程返回,然后开始pop,将stack中的协程依次取出

3:handleAsyncJob

handleAsyncJob是整个协程调度的核心

private function handleAsyncJob($value)

{

if (!is_subclass_of($value, Async::class)) {

return self::SCHEDULE_CONTINUE;

}

$value->execute([$this, ‘asyncCallback‘]);

return TaskStatus::TASK_WAIT;

}

public function asyncCallback($response, $exception = null)

{

if ($exception !== null

&& $exception instanceof \Exception

) {

$this->throwException($exception, true);

} else {

$this->task->send($response);

$this->task->run();

}

}

当协程调度的返回值是继承了Async的子类或者是实现了Asycn接口的实例的时候,会执行Async的execute方法。这里用mysqli数据库查询类举例。

public function execute(callable $callback)

{

$this->callback = $callback;

$serv = ServerHolder::getServer();

$serv->task($this->sql, -1, [$this, ‘queryReady‘]);

}

public function queryReady(\swoole_http_server $serv, $task_id, $data)

{

$queryResult = unserialize($data);

$exception = null;

if ($queryResult->errno != 0) {

$exception = new \Exception($queryResult->error);

}

call_user_func_array($this->callback, [$queryResult, $exception]);

}

execute方法接收一个函数作为该异步操作完成之后的回调函数,在Mysqli类中的execute方法中,启动了一个异步swoole_task,将sql操作交给swoole_task异步执行,在执行结束后会执行queryReady方法,该方法在解析异步返回数据之后执行$this->callback()也就是之前在调度器中传入的 asyncCallback方法,该方法在检测异常之后会执行send()方法将异步执行的结果发送到中断处,继续执行。

handleAsyncJob不会等待异步操作的返回结果,而是直接返回TASK_WAIT信号,回到上面的Task->run()方法可以看到TASK_WAIT信号会导致run()方法返回null,释放当前worker,调度流程图如下图所示,

4:handleYieldValue

private function handelYieldValue($value)

{

if (!$this->task->valid()) {

return self::SCHEDULE_CONTINUE;

}

$ret = $this->task->send($value);

return TaskStatus::TASK_CONTINUE;

}

如果某次yield的返回值既不是异步调用也不是Generator,那么判断当前的generator是否是valid(是否执行完)如果执行完毕,继续调度,执行下面的handleStackPush方法,否则的话返回Task_Continue继续调度,也就是说在一个generator中多次yield,最后只会取最后一次yield的返回值。

5:handleStackPush

当上一步中判断!$this->task->valid()也就是当前生成器执行完毕的时候,会执行本方法来控制之前的协程stack进行pop操作,首先检查Stac是否是非空,非空的话pop出一个父协程,并将当前协程的返回值send()到父协程中断出继续执行。

协程优势在哪里

当一次请求遇到IO的时候,同步操作会导致当前请求阻塞在IO处等待IO返回,体现在swoole上就是一个请求一直占用一个worker。

但是当使用了协程调度之后,用户可以在阻塞的地方通过yield手动中断,交由swoole_task去异步操作,同时释放worker占用来处理其他请求。

当异步处理执行结束后再继续调度。

注意 php的协程只负责中断,异步操作是Swoole_task做的

原文地址:https://www.cnblogs.com/winner192/p/11704147.html

时间: 2024-11-06 16:30:00

swoole与php协程实现异步非阻塞IO开发的相关文章

Nginx:异步非阻塞IO

在使用socket编程中,经常会看到阻塞.非阻塞.同步.异步,那么它们之间到底有什么关系跟区别呢? 本次将那Nginx的异步非阻塞的事件驱动模型来解释一下它们之间的关系. 阻塞IO 在linux中,默认所有socket都是阻塞的. 这意味着使用该socket调用诸如recv的函数时,在没有数据到达之前,该函数将不会返回,导致线程被阻塞,直到数据到达. 非阻塞IO 我们可以使用fcntl把socket设置为非阻塞的. 这意味着使用该socket调用诸如recv的函数时,该函数将立刻返回,可以根据返

转一贴,今天实在写累了,也看累了--【Python异步非阻塞IO多路复用Select/Poll/Epoll使用】

下面这篇,原理理解了, 再结合 这一周来的心得体会,整个框架就差不多了... http://www.haiyun.me/archives/1056.html 有许多封装好的异步非阻塞IO多路复用框架,底层在linux基于最新的epoll实现,为了更好的使用,了解其底层原理还是有必要的.下面记录下分别基于Select/Poll/Epoll的echo server实现.Python Select Server,可监控事件数量有限制: 1 2 3 4 5 6 7 8 9 10 11 12 13 14

异步非阻塞IO的Python Web框架--Tornado

Tornado的全称是Torado Web Server,从名字上就可知它可用作Web服务器,但同时它也是一个Python Web的开发框架.最初是在FriendFeed公司的网站上使用,FaceBook收购之后便进行了开源. 作为Web框架,是一个轻量级的Web框架,类似于另一个Python web 框架Web.py,其拥有异步非阻塞IO的处理方式. 作为Web服务器,Tornado有较为出色的抗负载能力,官方用nginx反向代理的方式部署Tornado和其它Python web应用框架进行对

Java异步非阻塞IO NIO使用与代码分析

[TOC] Java异步非阻塞IO NIO使用与代码分析 TimeServer程序的NIO实现完整代码 TimeServer程序来自书本<Netty权威指南>,nio的代码确实有些难懂(这也是后面需要使用Netty的原因之一),不过我对代码加了注释,这样一来对nio的概念及基本的使用都会有一个非常清晰的认识: 服务端程序 TimeServer.java: package cn.xpleaf.nio; public class TimeServer { public static void ma

nodejs的异步非阻塞IO

简单表述一下:发启向系统IO操作请求,系统使用线程池IO操作,执行完放到事件队列里,node主线程轮询事件队列,读取结果与调用回调.所以说node并非真的单线程,还是使用了线程池的多线程. 上个图看看吧 举一反三:所有的异步非阻塞思路都类似,如:nginx,python的模拟异步非阻塞,还有java的nio.C#的 EAP

Python的异步编程[0] -&gt; 协程[1] -&gt; 使用协程建立自己的异步非阻塞模型

使用协程建立自己的异步非阻塞模型 接下来例子中,将使用纯粹的Python编码搭建一个异步模型,相当于自己构建的一个asyncio模块,这也许能对asyncio模块底层实现的理解有更大的帮助.主要参考为文末的链接,以及自己的补充理解. 完整代码 1 #!/usr/bin/python 2 # ============================================================= 3 # File Name: async_base.py 4 # Author: L

Python实现基于协程的异步爬虫

一.课程介绍 1. 课程来源 本课程核心部分来自<500 lines or less>项目,作者是来自 MongoDB 的工程师 A. Jesse Jiryu Davis 与 Python 之父 Guido van Rossum.项目代码使用 MIT 协议,项目文档使用 http://creativecommons.org/licenses/by/3.0/legalcode 协议. 课程内容在原文档基础上做了稍许修改,增加了部分原理介绍,步骤的拆解分析及源代码注释. 2. 内容简介 传统计算机

gj12-1 协程和异步io

1 并发.并行.同步.异步.阻塞.非阻塞 并发.并行 并发是报一个时间段内有几个程序在同一个cpu上运行,但是任意时刻只有一个程序在cpu上运行.在一个时间段内某一个请求很快,能够响应的用户就越多,高并发. 并行是指任意时刻点上,有多个程序同时运行在多个cpu上,并行数量跟CPU数一致的,因此没有高并行. 情况:开水没有:水壶要洗,茶壶茶杯要洗:火生了,茶叶也有了.怎么办? 时间分配: 洗水壶:3 灌凉水:1 洗茶壶:3 洗茶杯:3 拿茶叶:1 泡茶:1 烧开水:30 并发版 老张 办法甲:洗好

爬虫必备—性能相关(异步非阻塞)

在编写爬虫时,性能的消耗主要在IO请求中,当单进程单线程模式下请求URL时必然会引起等待,从而使得请求整体变慢. 1. 同步执行 1 import requests 2 3 def fetch_async(url): 4 response = requests.get(url) 5 return response 6 7 8 url_list = ['http://www.github.com', 'http://www.bing.com'] 9 10 for url in url_list: