基础入门_Python-模块和包.Gevent异步/状态获取/超时设置/猴子补丁?

简单介绍:

说明: Gevent是一个基于libev的并发库,为各种并发和网络相关的任务提供了整洁的API

快速安装:

pip install --upgrade gevent

主要模式:

说明: Greenlet以C扩展模块形式接入PY轻量级协程,它们运行于主进程内部,被协作式的调度,且不同于multiprocessing和threading等真正的并行执行,它在同一时刻只能有一个协程在运行

公有方法
gevent.spawn(cls, *args, **kwargs) 创建一个Greenlet对象,其实调用的是Greenlet.spawn(需要from gevent import Greenlet),返回greenlet对象
gevent.joinall(greenlets, timeout=None, raise_error=False, count=None) 等待所有greenlets全部执行完毕, greenlets为序列,timeout为超时计时器对象,返回执行完毕未出错的的greenlet序列
greenlet
g.start() 启动协程
g.join() 等待此协程执行完毕后

1. 使用基本封装类初始化Greenlets


#!/usr/bin/env python
# -*- coding: utf-8 -*-
"""
#
# Authors: limanman
# OsChina: http://xmdevops.blog.51cto.com/
# Purpose:
#
"""
# 说明: 导入公共模块
import gevent
# 说明: 导入其它模块
def task001():
    print ‘I‘
    gevent.sleep(0)
    print ‘#=> switch to task002 run.‘
    print ‘You‘
def task002():
    print ‘LOVE‘
    gevent.sleep(0)
    print ‘#=> switch to task001 run.‘
if __name__ == ‘__main__‘:
    gevent.joinall([
        gevent.spawn(task001),
        gevent.spawn(task002)
    ])


2. 继承Greenlets基类重载_run方法


#!/usr/bin/env python
# -*- coding: utf-8 -*-
"""
#
# Authors: limanman
# OsChina: http://xmdevops.blog.51cto.com/
# Purpose:
#
"""
# 说明: 导入公共模块
import gevent
from gevent import Greenlet
# 说明: 导入其它模块
class Gevent(Greenlet):
    def __init__(self, msg, *args, **kwargs):
        super(Gevent, self).__init__(*args, **kwargs)
        self.msg = msg
    # 重载run方法,join后会自动调用此方法执行
    def _run(self):
        print self.msg
        gevent.sleep(0)
if __name__ == ‘__main__‘:
    for _ in (‘I‘, ‘LOVE‘, ‘YOU‘):
        g = Gevent(_)
        g.start()
        g.join()


同步异步:

说明: 并发的核心在于可拆分成小任务的大任务被分成子任务然后被上下文切换调度,在Gevent中,上下文切换调度是通过yielding来完成的,默认自动切换上下文,当然也可手动gevent.sleep(0)切换上下文


#!/usr/bin/env python
# -*- coding: utf-8 -*-
"""
#
# Authors: limanman
# OsChina: http://xmdevops.blog.51cto.com/
# Purpose:
#
"""
# 说明: 导入公共模块
import gevent
# 说明: 导入其它模块
def task001():
    print ‘I‘
    gevent.sleep(0)
    print ‘#=> switch to task002 run.‘
    print ‘You‘
def task002():
    print ‘LOVE‘
    gevent.sleep(0)
    print ‘#=> switch to task001 run.‘
if __name__ == ‘__main__‘:
    gevent.joinall([
        gevent.spawn(task001),
        gevent.spawn(task002)
    ])


说明: 如上实例手动调用gevent.sleep(0)来切换上下文,将生成的调度任务通过gevent.joinall加入调度队列,使得只有执行完调度队列的任务后才能继续往下走,一旦调度任务出现阻塞则立即切换到下一个上下文执行


#!/usr/bin/env python
# -*- coding: utf-8 -*-
"""
#
# Authors: limanman
# OsChina: http://xmdevops.blog.51cto.com/
# Purpose:
#
"""
# 说明: 导入公共模块
import socket
import gevent
from gevent import select
# 说明: 导入其它模块
def handler(server, port):
    r_list, w_list, e_list = [server], [], []
    while True:
        r, w, e = select.select(r_list, w_list, e_list)
        for sock in r:
            if sock is server:
                c_sock, c_addr = sock.accept()
                r_list.append(c_sock)
                print ‘found notice: client #%s connectd to port #%s‘ % (c_addr, port)
            else:
                data = sock.recv(1024)
                data = data.strip()
                if data in (‘quit‘, ‘exit‘):
                    sock.close()
                    r_list.remove(sock)
                    print ‘found notice: client #%s disconnectd‘ % (sock.getpeername(),)
                else:
                    print ‘found notice: client #%s send data #%s to port #%s‘ % (sock.getpeername(), data, port)
if __name__ == ‘__main__‘:
    slist = []
    ports = [8001, 8002]
    for p in ports:
        server = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
        server.bind((‘0.0.0.0‘, p))
        server.listen(10000)
        slist.append(gevent.spawn(handler, server, p))
    gevent.joinall(slist)


说明: Gevent使得函数协作式调度,对于我们隐藏的所有实现细节,保证网络库在可能的时候,隐式交出上下文执行权,上面实例中利用协程+多路复用I/O SELECT实现同时绑定多个端口处理数据的ECHO SERVER,我并没有显式的调用gevent.sleep(0),但是依然执行着上下文切换,这说明当我们在受限于网络或IO的函数中调用gevent时会隐式的切换上下文来实现协作式调度.


#!/usr/bin/env python
# -*- coding: utf-8 -*-
"""
#
# Authors: limanman
# OsChina: http://xmdevops.blog.51cto.com/
# Purpose:
#
"""
# 说明: 导入公共模块
import time
import pprint
import multiprocessing
from gevent.pool import Pool
# 说明: 导入其它模块
def echo_data(data):
    time.sleep(0.001)
    return data
if __name__ == ‘__main__‘:
    # 多进程 - 进程池
    res = []
    p = multiprocessing.Pool(10)
    for _ in xrange(10):
        res.append(list(p.imap_unordered(echo_data, [_ for _ in xrange(10)])))
    p.close()
    p.join()
    pprint.pprint(res)
    print ‘################################‘
    # 协程式 - 协程池
    res = []
    p = Pool(20)
    for _ in xrange(10):
        res.append(list(p.imap_unordered(echo_data, [_ for _ in xrange(10)])))
    pprint.pprint(res)


说明: Gevent在相同调用相同输入相同SLEEP时产生的结果总是相同的,相对于多进程/多线程的即使在相同调用相同输入相同SLEEP下结果不总相同

注意: Gevent在处理不确定阻塞时间调用时,你的程序可能会出现不确定性,这是由于并发通病-竞争条件导致,最好的解决办法是尽量避免所有的全局状态.避免竞争访问或是修改.

状态获取:

说明: Greenlet可能由于不同的原因运行失败,但其并不会抛出异常,而是用状态变量/方法跟踪线程内部的状态

状态相关
g.started 协程是否已经启动
g.ready() 协程是否已经停止
g.successful() 同上,但是没有抛异常
g.value 协程返回的值
g.exception 协程内抛出的未捕捉的异常
g.get(block=True, timeout=None) 获取greenlet的返回值或重新抛出运行中异常,timeout设置计时器对象
#!/usr/bin/env python
# -*- coding: utf-8 -*-
"""
#
# Authors: limanman
# OsChina: http://xmdevops.blog.51cto.com/
# Purpose:
#
"""
# 说明: 导入公共模块
import gevent
# 说明: 导入其它模块
# 谈情
def love():
    print ‘ok‘
# 做爱
def makelove():
    raise Exception(‘fuck‘)
if __name__ == ‘__main__‘:
    greenlets = [gevent.spawn(love), gevent.spawn(makelove)]
    gevent.joinall(greenlets)
    for g in greenlets:
        print ‘started? %s‘ % g.started
        print ‘ready? %s‘ % g.ready()
        print ‘successful? %s‘ % g.successful()
        print ‘exception? %s‘ % g.exception


超时设置:

说明: 超时是一种对代码块儿或Greenlet的运行时间约束

公有方法
gevent.Timeout(seconds, exception) 创建一个计时器对象
Timeout
t.start() 启动一个计时器对象
t.cancel() 取消一个计时器对象
t.start_new(timeout=None, exception=None, ref=True) 创建一个新的计时器对象,timeout为超时时间,默认单位为秒
#!/usr/bin/env python
# -*- coding: utf-8 -*-
"""
#
# Authors: limanman
# OsChina: http://xmdevops.blog.51cto.com/
# Purpose:
#
"""
# 说明: 导入公共模块
import gevent
# 说明: 导入其它模块
def run_forever(interval):
    while True:
        gevent.sleep(interval)
if __name__ == ‘__main__‘:
    timer = gevent.Timeout(1).start()
    g = gevent.spawn(run_forever, 1000000)
    # 执行超时
    try:
        g.join(timeout=timer)
    except gevent.Timeout, e:
        print ‘found error: thread 1 timeout.‘
    # 获取超时
    timer = gevent.Timeout.start_new(1)
    g = gevent.spawn(run_forever, 1000000)
    try:
        g.get(timeout=timer)
    except gevent.Timeout, e:
        print ‘found error: thread 2 timeout.‘
    # 执行超时
    try:
        gevent.with_timeout(1, run_forever, 1000000)
    except gevent.Timeout, e:
        print ‘found error: thread 3 timeout.‘


猴子补丁:

说明: PY的默认运行环境允许我们运行中修改大部分对象,包括模块/类/函数等,而猴子补丁可以修改PY标准库里面大部分的阻塞式系统调用,实在不知哪些库可打补丁可gevent.monkey.patch_all()即可.

patch_all/patch_builtins/patch_dns/patch_item/patch_module/patch_os/patch_select/patch_signal/patch_socket/patch_ssl/patch_subprocess/patch_sys/patch_thread/patch_time


#!/usr/bin/env python
# -*- coding: utf-8 -*-
"""
#
# Authors: limanman
# OsChina: http://xmdevops.blog.51cto.com/
# Purpose:
#
"""
# 说明: 导入公共模块
import select
# 说明: 导入其它模块
if __name__ == ‘__main__‘:
    print ‘#select before path %s‘ % (select.select,)
    from gevent import monkey
    monkey.patch_all()
    print ‘#select after path %s‘ % (select.select,)


本文出自 “ζ自动化运维开发之路ζ” 博客,请务必保留此出处http://xmdevops.blog.51cto.com/11144840/1862085

时间: 2024-10-24 09:50:31

基础入门_Python-模块和包.Gevent异步/状态获取/超时设置/猴子补丁?的相关文章

基础入门_Python-模块和包.Gevent异步服务类实现多姿势WEB实时展示?

内置服务: 1. gevent.server.StreamServer类, 常用于创建异步TCP服务器 #!/usr/bin/env python # -*- coding: utf-8 -*- """ # # Authors: limanman # OsChina: http://xmdevops.blog.51cto.com/ # Purpose: # """ # 说明: 导入公共模块 import time import gevent fr

基础入门_Python-模块和包.Gevent事件/队列/组/池/信号量/子进程?

常用结构: 1.Event类,事件主要用于Greenlet之间的异步通信 e = gevent.event.Event() -> Event 说明: 创建一个信号对象 e.set() -> None 说明: 设置标志位 e.clear() -> None 说明: 清除标志位 e.wait() -> None 说明: 阻塞直至标志位被设置 #!/usr/bin/env python # -*- coding: utf-8 -*- """ # # Auth

基础入门_Python-模块和包.深入Celery之常用架构/方案选型/必知必会?

简单介绍: 说明: 此模块是一个专注于分布式消息传递的异步任务队列,所谓任务就是消息,消息中的有效载荷中包含要执行的任务需要的全部数据 几大特性: 1. Celery易于使用和维护,且不需要配置文件,默认配置启动时自动写入消息代理. 2. Celery高可用,连接丢失或失败时客户端或消费者会自动重试,并且可通过消息代理的双主/主从模式来提高高可用性 3. Celery快速,单个进程每分钟可处理百万任务,且优化后可保持往返延迟在亚毫秒级别 4. Celery灵活,几乎所有部分都支持扩展或单独使用,

2015/9/15 Python基础(12):模块和包

模块是用来组织 Python 代码的方法,而包则是用来组织模块的. 当代码量很大时,我们一般会把代码分成几个有组织的代码段,然后每个代码段之间有一定的联系.代码单之间是共享的,所以Python允许调入一个模块,允许使用其他模块的属性利用之前的工作成果,实现代码重用.那些自我包含并且有组织的代码片段就是模块(module),将其他模块中属性附加到你的模块中的操作较导入(import) 模块是逻辑上的说法,而它们在物理层是一个个独立的文件,模块的文件名就是模块的名字加拓展名.py.与其他可以导入类的

基础入门_Python-模块和包.运维开发中日志模块logging的最佳实践?

简单介绍: 说明: 此模块儿提供了文件,HTTP GET/POST,SMTP,SOCKET等方式实现日志记录,甚至可以自动实现具体的日志记录方式 快速安装: pip install --upgrade logging 处理流程: 日志级别: 属性名称 属性说明 logging.NOTSET 默认为0 logging.DEBUG 调试为10 logging.INFO 一般为20 logging.WARN 警告为30 logging.ERROR 错误为40 logging.CRITICAL 严重为5

基础入门_Python-模块和包.深入Celery之应用配置/独立模块配置实践?

配置简介: 说明: Celery的配置文件非常强大,支持在应用上设置,也可以使用一个独立的配置模块,具体需要调整的默认选项可通过http://docs.jinkan.org/docs/celery/configuration.html#configuration 获取. # 方式一 : 直接在应用上设置,通过app.conf.update可一次性设置多个选项,常用于小型项目 #!/usr/bin/env python # -*- coding: utf-8 -*- # @Date    : 20

基础入门_Python-模块和包.运维开发中inspect自省模块的最佳实践?

简单介绍: 说明: 此模块提供了一系列自省函数,可获取模块/类/方法/函数/traceback/帧对象/代码对象的信息 快速安装: 内置模块 测试相关: inspect.ismodule(object) -> True/False 说明: 判断object是否为模块 inspect.isclass(object) -> True/False 说明: 判断object是否为类 inspect.ismethod(object) -> True/False 说明: 判断object是否为方法

基础入门_Python-模块和包.运维开发中内建模块getopt的最佳实践?

简单介绍: 此模块提供命令行选项解析,目前支持短格式和长格式选项 快速安装: 说明:  内建模块无需安装 解析方法: getopt(args, shortopts, longopts = []) -> (opts, args) 说明: args为要解析的参数序列,常为sys.argv[1:],shortopts为单字符选项定义串,如果某个选项需要一个参数,响应字母后面必须有一个冒号,longopts为长格式的选项名序列,可以包含多个字符,序列元素必须包含--前缀,如果此长选项需要参数则其名应包含

基础入门_Python-模块和包.深入Celery之节点管理/任务调度/任务追踪?

任务管理: 说明: 如上为运行任务后的标准输出,transport为消息代理,concurrency为默认进程池进程数,当所有子进程处于忙碌状态时必须等待空闲子进程处理,如果是IO密集型可尝试使用Eventlet/Gevent协程,具体可参考http://docs.jinkan.org/docs/celery/userguide/concurrency/index.html#concurrency,result为结果存储,queue为所有的队列以及交换机信息列表 参数 含义 %p 节点全名,如[