Tornado 高并发源码分析之六---异步编程的几种实现方式

方式一:通过线程池或者进程池

导入库futures是python3自带的库,如果是python2,需要pip安装future这个库

备注:进程池和线程池写法相同

 1 from concurrent.futures import ThreadPoolExecutor, ProcessPoolExecutor
 2 from tornado.concurrent import run_on_executor
 3
 4 def doing(s):
 5     print(‘xiumian--{}‘.format(s))
 6     time.sleep(s)
 7     return s
 8
 9 class MyMainHandler(RequestHandler):
10     executor = ProcessPoolExecutor(2)   #新建一个进程池,静态变量,属于类,所以全程只有这个几个进程,不需要关闭,如果放在__init__中,则属于对象,每次请求都会新建pool,当请求增多的时候,会导致今天变得非常多,这个方法不可取
11
12     @gen.coroutine
13     def get(self, *args, **kwargs):
14         print(‘开始{}‘.format(self.pool_temp))
15         a = yield self.executor.submit(doing, 20)
16         print(‘进程 %s‘  % self.executor._processes)
17         self.write(str(a))
18         print(‘执行完毕{}‘.format(a))
19
20    @run_on_executor       #tornado 另外一种写法,需要在静态变量中有executor的进程池变量
21     def post(self, *args, **kwargs):
22 a = yield doing(20)

方式二:Tornado + Celery + RabbitMQ 实现

使用Celery任务队列,Celery 只是一个任务队列,需要一个broker媒介,将耗时的任务传递给Celery任务队列执行,执行完毕将结果通过broker媒介返回。官方推荐使用RabbitMQ作为消息传递,redis也可以

一、Celery 介绍:

1.1、注意:

1、当使用RabbitMQ时,需要按照pika第三方库,pika0.10.0存在bug,无法获得回调信息,需要按照0.9.14版本即可

2、tornado-celery 库比较旧,无法适应Celery的最新版,会导致报无法导入task Producter包错误,只需要将celery版本按照在3.0.25就可以了

1.2、关于配置:

单个参数配置:

1 app.conf.CELERY_RESULT_BACKEND = ‘redis://localhost:6379/0‘

多个参数配置:

1 app.conf.update(
2     CELERY_BROKER_URL = ‘amqp://[email protected]//‘,
3     CELERY_RESULT_BACKEND = ‘redis://localhost:6379/0‘
4 )

从配置文件中获取:(将配置参数写在文件app.py中)

1 BROKER_URL=‘amqp://[email protected]//‘
2 CELERY_RESULT_BACKEND=‘redis://localhost:6379/0‘
3 app.config_from_object(‘celeryconfig‘)

二、案例

2.1、启动一个Celery 任务队列,也就是消费者:

1 from celery import Celery
2 celery = Celery(‘tasks‘, broker=‘amqp://guest:[email protected]:5672‘, backend=‘amqp‘)  使用RabbitMQ作为载体, 回调也是使用rabbit作为载体
3
4 @celery.task(name=‘doing‘)   #异步任务,需要命一个独一无二的名字
5 def doing(s, b):
6     print(‘开始任务‘)
7     logging.warning(‘开始任务--{}‘.format(s))
8     time.sleep(s)
9     return s+b

命令行启动任务队列守护进程,当队列中有任务时,自动执行 (命令行可以放在supervisor中管理)

--loglevel=info --concurrency=5

记录等级,默认是concurrency:指定工作进程数量,默认是CPU核心数

2.2、启动任务生产者

 1 import tcelery
 2 tcelery.setup_nonblocking_producer()  #设置为非阻塞生产者,否则无法获取回调信息
 3
 4 class MyMainHandler(RequestHandler):
 5
 6     @web.asynchronous
 7     @gen.coroutine
 8     def get(self, *args, **kwargs):
 9         print(‘begin‘)
10         result = yield gen.Task(sleep.apply_async, args=[10])   #使用yield 获取异步返回值,会一直等待但是不阻塞其他请求
11         print(‘ok--{}‘.format(result.result))     #返回值结果
12
13        # sleep.apply_async((10, ), callback=self.on_success)
14        # print(‘ok -- {}‘.format(result.get(timeout=100)))#使用回调的方式获取返回值,发送任务之后,请求结束,所以不能放在处理tornado的请求任务当中,因为请求已经结束了,与客户端已经断开连接,无法再在获取返回值的回调中继续向客户端返回数据
15
16         # result = sleep.delay(10)    #delay方法只是对apply_async方法的封装而已
17         # data = result.get(timeout=100)  #使用get方法获取返回值,会导致阻塞,相当于同步执行
18
19
20     def on_success(self, response):    #回调函数
21         print (‘Ok-- {}‘.format(response))
时间: 2024-10-06 21:09:55

Tornado 高并发源码分析之六---异步编程的几种实现方式的相关文章

Tornado 高并发源码分析之四--- HTTPServer 与 TCPServer 对象

主要工作: 服务器启动的时候做的事: 1.把包含了各种配置信息的 application 对象封装到了 HttpServer 对象的 request_callback 字段中,等待被调用 2.TCPServer 通过 listen 方法启动端口监听, 封装_handle_connection回调函数,并注册到 IOLoop 中 服务器运行时做的事: 3.当有请求到来时,注册在 IOLoop 中的 _handle_connection 将会被调用, _handle_connection 方法将会调

Tornado 高并发源码分析之三--- Application 对象

Application 对象主要工作: 服务器启动时: 1.在新建一个app的时候,根据设置好的 URL 和回调函数 Handler 封装成URLSpec 对象 服务器运行时: 2.在请求到来,将 HTTPServer 封装好的HTTPRequest 传入_RequestDispatcher对象,_RequestDispatcher对象根据传入的 HTTPRequest 使用URLSpec解析匹 match 正则匹配找到对应的 RequestHandler ,执行它的 _execute 方法 A

Tornado 高并发源码分析之二---Tornado启动和请求处理流程

Tornado 服务器启动流程 因为Tornado 里使用了很多传类的方式,也就是delegate,之所以要这么做,其实和 iOS 开发那样,也很多的 delegate, 如此来实现高度解耦,但是比较绕,所以建议: 1.先浏览一遍启动流程,再看源码 2.在看一遍请求到来时的处理流程,再看源码 备注: 流程图是xmind 编辑的,好像这里无法上传源文件,所以只能把图片下载下来看了,会没那么清晰

Tornado 高并发源码分析之一---启动一个web服务

前言: 启动一个tornado 服务器基本代码 1 class HomeHandler(tornado.web.RequestHandler): #创建 RequesHandler 对象,处理接收到的 http 请求 2 def get(self): 3 entries = self.db.query("SELECT * FROM entries ORDER BY published DESC LIMIT 5") 4 if not entries: 5 self.redirect(&q

Javascript教程:js异步编程的4种方法详述(转载)

文章收集转载于(阮一峰的网络日志) 你可能知道,Javascript语言的执行环境是“单线程”(single thread). 所谓“单线程”,就是指一次只能完成一件任务.如果有多个任务,就必须排队,前面一个任务完成,再执行后面一个任务,以此类推. 这种模式的好处是实现起来比较简单,执行环境相对单纯:坏处是只要有一个任务耗时很长,后面的任务都必须排队等着,会拖延整个程序的执行.常见的浏览器无响应(假死),往往就是因为某一段Javascript代码长时间运行(比如死循环),导致整个页面卡在这个地方

浅析C# 异步编程的两种方式

一.传统BeginInvoke方式. BeginInvoke方法用于启动c#异步调用.它返回IasyncResult,可用于监视调用进度.EndInvoke方法用于检索c#异步调用结果. 调用BeginInvoke后可随时调用EndInvoke方法;如果C#异步调用未完成,EndInvoke将一直阻塞到C#异步调用完成. 总结其使用大体分5个步骤: 1.声明委拖 2.创建异步方法 3.实例化委拖(把委拖与方法关联)  A 4.通过实例的BeginInvoke调用异步方法 5.通过实例的EndIn

jQuery源码分析(九) 异步队列模块 Deferred 详解

deferred对象就是jQuery的回调函数解决方案,它解决了如何处理耗时操作的问题,比如一些Ajax操作,动画操作等.(P.s:紧跟上一节:https://www.cnblogs.com/greatdesert/p/11433365.html的内容) 异步队列有三种状态:待定(pending).成功(resolved)和失败(rejected),初始时处于pending状态 我们可以使用jQuery.Deferred创建一个异步队列,返回一个对象,该对象含有如下操作: done(fn/arr

Spark源码分析之六:Task调度(二)

话说在<Spark源码分析之五:Task调度(一)>一文中,我们对Task调度分析到了DriverEndpoint的makeOffers()方法.这个方法针对接收到的ReviveOffers事件进行处理.代码如下: [java] view plain copy // Make fake resource offers on all executors // 在所有的executors上提供假的资源(抽象的资源,也就是资源的对象信息,我是这么理解的) private def makeOffers

JDK源码分析之concurrent包(三) -- Future方式的实现

上一篇我们基于JDK的源码对线程池ThreadPoolExecutor的实现做了分析,本篇来对Executor框架中另一种典型用法Future方式做源码解读.我们知道Future方式实现了带有返回值的程序的异步调用,关于异步调用的场景大家可以自行脑补Ajax的应用(获取返回结果的方式不同,Future是主动询问获取,Ajax是回调函数),这里不做过多说明. 在进入源码前,首先来看下Future方式相关的API: 接口Callable:有返回结果并且可能抛出异常的任务: 接口Future:表示异步