基础入门_Python-模块和包.Gevent事件/队列/组/池/信号量/子进程?

常用结构:

1.Event类,事件主要用于Greenlet之间的异步通信

e = gevent.event.Event() -> Event

说明: 创建一个信号对象

e.set() -> None

说明: 设置标志位

e.clear() -> None

说明: 清除标志位

e.wait() -> None

说明: 阻塞直至标志位被设置


#!/usr/bin/env python
# -*- coding: utf-8 -*-
"""
#
# Authors: limanman
# OsChina: http://xmdevops.blog.51cto.com/
# Purpose:
#
"""
# 说明: 导入公共模块
import gevent
from gevent.event import Event
# 说明: 导入其它模块
def setter(e):
    print ‘I‘
    gevent.sleep(3)
    print ‘LOVE‘
    e.set()
def waiter(e):
    e.wait()
    print ‘You‘
if __name__ == ‘__main__‘:
    e = Event()
    gevent.joinall([
        gevent.spawn(setter, e),
        gevent.spawn(waiter, e),
        gevent.spawn(waiter, e),
        gevent.spawn(waiter, e),
        gevent.spawn(waiter, e)
    ])


a = gevent.event.AsyncResult() -> AsyncResult

说明: 创建一个扩展可携带数据的信号对象

a.set(value=None) -> None

说明: 设置带数据的标志位

a.e.clear() -> None

说明: 清除带数据的标志位

a.get(block=True, timeout=None) -> obj

说明: 阻塞直至标志位被设置并返回value值,可设置timeout到点抛出Timeout异常


#!/usr/bin/env python
# -*- coding: utf-8 -*-
"""
#
# Authors: limanman
# OsChina: http://xmdevops.blog.51cto.com/
# Purpose:
#
"""
# 说明: 导入公共模块
import gevent
from gevent.event import AsyncResult
# 说明: 导入其它模块
def setter(a):
    print ‘I‘
    gevent.sleep(3)
    print ‘LOVE‘
    a.set(‘I LOVE‘)
def waiter(a):
    value = a.get()
    if e.successful:
        print ‘found notice: recv data from setter %s.‘ % (value,)
    print ‘You‘
if __name__ == ‘__main__‘:
    a = AsyncResult()
    gevent.joinall([
        gevent.spawn(setter, a),
        gevent.spawn(waiter, a),
        gevent.spawn(waiter, a),
        gevent.spawn(waiter, a),
        gevent.spawn(waiter, a)
    ])


2. Queue类,常用用于Greenlet之间的异步共享

q = gevent.queue.Queue(maxsize=None, items=None) -> Queue

说明: 创建一个指定大小包含指定items的队列对象

q.empty() -> Boolean

说明: 队列是否为空

q.full() -> Boolean

说明: 队列是否已满,如果初始化时maxsize为None时队列永远不会满,除非内存耗尽

q.get(block=True, timeout=None) -> obj

说明: 队列尾部弹出并返回末尾元素,block为True如果队列为空会一直等待,否则会抛出gevent.queue.Empty异常

q.get_nowait() -> obj

说明: 同q.get(block=False),如果队列为空直接返回gevent.queue.Empty

q.put(item, block=True, timeout=None) -> None

说明: 队列头部插入item,如果block为True则等待队列有空间时再插入,如果超出timeout则抛出Timeout异常,否则直接抛出gevent.queue.Full异常

q.put_nowait(self, item) -> None

说明: 同q.put(block=False),如果队列满直接返回gevent.queue.Full

q.qsize() -> int

说明: 返回队列的元素数


#!/usr/bin/env python
# -*- coding: utf-8 -*-
"""
#
# Authors: limanman
# OsChina: http://xmdevops.blog.51cto.com/
# Purpose:
#
"""
# 说明: 导入公共模块
import gevent
from gevent.queue import Queue
# 说明: 导入其它模块
def boss(q):
    for _ in xrange(25):
        q.put_nowait(_)
def work(name, q):
    while not q.empty():
        task = q.get()
        print ‘found notice: worker %s got task-%s‘ % (name, task)
        gevent.sleep(0)
if __name__ == ‘__main__‘:
    q = Queue()
    g = gevent.spawn(boss, q)
    g.join()
    workers = []
    for _ in (‘limanman‘, ‘liuzhen‘, ‘zhenbao‘):
        workers.append(gevent.spawn(work, _, q))
    gevent.joinall(workers)


说明: 如上演示了一个老板平均分配任务给指定数目的员工的例子,为了实现每个员工都能拿到任务,我们手动的gevent.sleep(0)互相切换(关键是调用函数中没有确定性的阻塞函数)实现.

3. Group类,常用于不限制数量的管理异步任务的分组且可搜集运行结果

g = gevent.pool.Group(*args) -> Group

说明: 创建一个组对象,其实就是一个不限greenlets数量的pool,可以随时添加/删除/关闭greenlet对象

g.add(greenlet) -> None

说明: 向组中添加一个greenlet对象

g.discard(greenlet) -> None

说明: 从组中删除一个greenlet对象

g.join(timeout=None, raise_error=False) -> None

说明: 等待组中所有的greenlets都执行完毕再进行下一步

g.kill(exception=<class ‘greenlet.GreenletExit‘>, block=True, timeout=None) -> None

说明: 关闭组内所有运行中的greenlet对象

g.killone(self, greenlet, exception=<class ‘greenlet.GreenletExit‘>, block=True, timeout=None) -> None

说明: 关闭组内指定运行的greenlet对象

g.apply(self, func, args=None, kwds=None) -> obj

说明: 同apply单次调用func并返回运行结果

g.apply_async(self, func, args=None, kwds=None, callback=None) -> greenlet

说明: 同上但是返回的不是直接的运行结果而是异步对象greenlet,需要再次调用其.get()方法才能获取最终结果

g.imap(self, func, *iterables, **kwargs) -> IMap

g.imap_unordered(self, func, *iterables, **kwargs) -> IMapUnordered

g.map(self, func, iterable) -> list

g.map_async(self, func, iterable, callback=None) -> GreenletGroup

说明: 为了简化协程的使用,调用map/imap实现任务快速分组,基本上返回的对象都需要再次调用get()方法获取最终运行结果,经测试发现调用函数传递多个参数时通过map/imap类方法传参时有bug,已反馈GitHub.


#!/usr/bin/env python
# -*- coding: utf-8 -*-
"""
#
# Authors: limanman
# OsChina: http://xmdevops.blog.51cto.com/
# Purpose:
#
"""
# 说明: 导入公共模块
import gevent
import itertools
from gevent.pool import Group
from gevent.queue import Queue
# 说明: 导入其它模块
def add_worker(name, q):
    while not q.empty():
        task = q.get()
        print ‘found notice: worker(%s:%s) got task(%s)‘ % (name, gevent.getcurrent(), task)
        gevent.sleep(0)
if __name__ == ‘__main__‘:
    q = Queue()
    g = Group()
    # 添加任务
    for _ in xrange(100):
        q.put_nowait(_)
    # 通用方法
    for worker in (‘langlang‘, ‘fengfeng‘, ‘shuishui‘):
        g.add(gevent.spawn(add_worker, worker, q))
    g.join()


4. Pool类,常用于限制数量的管理异步任务,在受限于网络和IO的任务时候比较有优势

p =  gevent.pool.Pool(size=None, greenlet_class=None) -> Pool

说明: 创建一个协程池对象,可以指定其实协程数

p.add(greenlet) -> None

说明: 尝试向协程池中添加greenlet对象,阻塞直到有池中有协程完毕有剩余空间

p.free_count() -> int

说明: 返回池中剩余的空间,也就是可以add的greenlet数

p.full() -> boolean

说明: 返回池是否已满

p.start(greenlet) -> None

说明: 启动未启动的greenlet且将其加入pool监控

p.imap(self, func, *iterables, **kwargs) -> IMap

p.imap_unordered(self, func, *iterables, **kwargs) -> IMapUnordered

p.map(self, func, iterable) -> list

p.map_async(self, func, iterable, callback=None) -> GreenletGroup

说明: 为了简化协程的使用,调用map/imap实现任务快速分组,基本上返回的对象都需要再次调用get()方法获取最终运行结果,经测试发现调用函数传递多个参数时通过map/imap类方法传参时有bug,已反馈GitHub.


#!/usr/bin/env python
# -*- coding: utf-8 -*-
"""
#
# Authors: limanman
# OsChina: http://xmdevops.blog.51cto.com/
# Purpose:
#
"""
# 说明: 导入公共模块
import gevent
from gevent.pool import Pool
# 说明: 导入其它模块
def pool_size(p):
    print ‘found notice: current pool size: %d free %d‘ % (len(p), p.free_count())
if __name__ == ‘__main__‘:
    p = Pool(2)
    p.map(pool_size, [p]*3)


5. BoundedSemaphore类,常用于限制资源同时被多少个协程访问,通过限制同时发放锁的数量来限制资源访问

b = gevent.lock.BoundedSemaphore(n) -> BoundedSemaphore

说明: 创建一个信号量对象,限制资源同时只能被n个协程访问,其实就是每次分发n把锁,只有有人释放了锁才会重新追加分发锁


#!/usr/bin/env python
# -*- coding: utf-8 -*-
"""
#
# Authors: limanman
# OsChina: http://xmdevops.blog.51cto.com/
# Purpose:
#
"""
# 说明: 导入公共模块
import gevent
from gevent.pool import Pool
from gevent.lock import BoundedSemaphore
# 说明: 导入其它模块
b = BoundedSemaphore(2)
def work(name):
    with b:
        print ‘found notice: worker %s acquire sem lock.‘ % (name,)
        gevent.sleep(0)
    print ‘found notice: worker %s release sem lock.‘ % (name,)
if __name__ == ‘__main__‘:
    p = Pool()
    p.map(work, [‘李满满‘, ‘刘珍珍‘, ‘罗诗瑶‘])


局部变量:

说明:  Gevent内部实现以greenlet的getcurrent()为键,在一个私有命名空间寻址的全局查找,使得local对象可以在不同的greenlet对象中存在且相互隔离,很多继承了gevent的web框架将HTTP会话对象以局部变量的形式存储在gevent内


#!/usr/bin/env python
# -*- coding: utf-8 -*-
"""
#
# Authors: limanman
# OsChina: http://xmdevops.blog.51cto.com/
# Purpose:
#
"""
# 说明: 导入公共模块
import time
import gevent
from gevent.local import local
# 说明: 导入其它模块
def session_manager(l, user):
    l.name = user
    l.timestamp = time.time()
    print ‘found notice: user %s login in and online.‘ % (user,)
    gevent.sleep(5)
    l.name = user
    l.timestamp = time.time()
    print ‘found notice: user %s session expire outs.‘ % (user,)
    print ‘‘‘
    name: %s
    status: offline
    timestamp: %s
    ‘‘‘ % (l.name, l.timestamp)
if __name__ == ‘__main__‘:
    l = local()
    gevent.joinall([
        gevent.spawn(session_manager, l, ‘李满满‘),
        gevent.spawn(session_manager, l, ‘刘珍珍‘),
    ])


进程相关:

1. gevent.subprocess类,常用于作为subprocess模块增强版使用,支持异步协作式等待子进程


#!/usr/bin/env python
# -*- coding: utf-8 -*-
"""
#
# Authors: limanman
# OsChina: http://xmdevops.blog.51cto.com/
# Purpose:
#
"""
# 说明: 导入公共模块
import time
import gevent
from gevent.subprocess import PIPE, Popen
# 说明: 导入其它模块
def task001():
    while True:
        print time.time()
        gevent.sleep(1)
if __name__ == ‘__main__‘:
    task = gevent.spawn(task001)
    task.start()
    # 中间插入子进程
    p = Popen([‘ls;sleep 5‘], shell=True, stdout=PIPE, stderr=PIPE)
    out, err = p.communicate()
    print out
    # 等待协程的结束
    task.join()


说明: 如上实例简单演示了支持异步协作子进程gevent.subprocess,首先task.start()启动协程调用但不会等待调用结束然后执行Popen()子进程,默认的Popen会阻塞主进程,但是在这里我们可以看到程序并没有阻塞而是继续执行task001,由于我们在最后task.join()会等待task001结束,而task001是一个死循环,所以一旦Popen子进程阻塞,立马会切换到task001执行,所以程序并不会阻塞

时间: 2024-10-07 00:41:13

基础入门_Python-模块和包.Gevent事件/队列/组/池/信号量/子进程?的相关文章

基础入门_Python-模块和包.Gevent异步服务类实现多姿势WEB实时展示?

内置服务: 1. gevent.server.StreamServer类, 常用于创建异步TCP服务器 #!/usr/bin/env python # -*- coding: utf-8 -*- """ # # Authors: limanman # OsChina: http://xmdevops.blog.51cto.com/ # Purpose: # """ # 说明: 导入公共模块 import time import gevent fr

基础入门_Python-模块和包.Gevent异步/状态获取/超时设置/猴子补丁?

简单介绍: 说明: Gevent是一个基于libev的并发库,为各种并发和网络相关的任务提供了整洁的API 快速安装: pip install --upgrade gevent 主要模式: 说明: Greenlet以C扩展模块形式接入PY轻量级协程,它们运行于主进程内部,被协作式的调度,且不同于multiprocessing和threading等真正的并行执行,它在同一时刻只能有一个协程在运行 公有方法 gevent.spawn(cls, *args, **kwargs) 创建一个Greenle

基础入门_Python-模块和包.深入Celery之常用架构/方案选型/必知必会?

简单介绍: 说明: 此模块是一个专注于分布式消息传递的异步任务队列,所谓任务就是消息,消息中的有效载荷中包含要执行的任务需要的全部数据 几大特性: 1. Celery易于使用和维护,且不需要配置文件,默认配置启动时自动写入消息代理. 2. Celery高可用,连接丢失或失败时客户端或消费者会自动重试,并且可通过消息代理的双主/主从模式来提高高可用性 3. Celery快速,单个进程每分钟可处理百万任务,且优化后可保持往返延迟在亚毫秒级别 4. Celery灵活,几乎所有部分都支持扩展或单独使用,

2015/9/15 Python基础(12):模块和包

模块是用来组织 Python 代码的方法,而包则是用来组织模块的. 当代码量很大时,我们一般会把代码分成几个有组织的代码段,然后每个代码段之间有一定的联系.代码单之间是共享的,所以Python允许调入一个模块,允许使用其他模块的属性利用之前的工作成果,实现代码重用.那些自我包含并且有组织的代码片段就是模块(module),将其他模块中属性附加到你的模块中的操作较导入(import) 模块是逻辑上的说法,而它们在物理层是一个个独立的文件,模块的文件名就是模块的名字加拓展名.py.与其他可以导入类的

基础入门_Python-模块和包.运维开发中日志模块logging的最佳实践?

简单介绍: 说明: 此模块儿提供了文件,HTTP GET/POST,SMTP,SOCKET等方式实现日志记录,甚至可以自动实现具体的日志记录方式 快速安装: pip install --upgrade logging 处理流程: 日志级别: 属性名称 属性说明 logging.NOTSET 默认为0 logging.DEBUG 调试为10 logging.INFO 一般为20 logging.WARN 警告为30 logging.ERROR 错误为40 logging.CRITICAL 严重为5

基础入门_Python-模块和包.运维开发中watchdog事件监视的最佳实践?

简单介绍: 说明:  此模块是一个跨平台的PY库和SHELL工具,可以监视文件系统事件(增加/删除/修改) 快速安装: pip install --upgrade watchdog 日志记录: event_handler = LoggingEventHandler() -> event_handler 说明: 创建一个日志处理句柄,其实LoggingEventHandler是继承自FileSystemEventHandler类,只是重写了增删查改的回调函数,直接调用logging模块写到对应lo

基础入门_Python-模块和包.深入Celery之应用配置/独立模块配置实践?

配置简介: 说明: Celery的配置文件非常强大,支持在应用上设置,也可以使用一个独立的配置模块,具体需要调整的默认选项可通过http://docs.jinkan.org/docs/celery/configuration.html#configuration 获取. # 方式一 : 直接在应用上设置,通过app.conf.update可一次性设置多个选项,常用于小型项目 #!/usr/bin/env python # -*- coding: utf-8 -*- # @Date    : 20

基础入门_Python-模块和包.运维开发中inspect自省模块的最佳实践?

简单介绍: 说明: 此模块提供了一系列自省函数,可获取模块/类/方法/函数/traceback/帧对象/代码对象的信息 快速安装: 内置模块 测试相关: inspect.ismodule(object) -> True/False 说明: 判断object是否为模块 inspect.isclass(object) -> True/False 说明: 判断object是否为类 inspect.ismethod(object) -> True/False 说明: 判断object是否为方法

基础入门_Python-模块和包.运维开发中内建模块getopt的最佳实践?

简单介绍: 此模块提供命令行选项解析,目前支持短格式和长格式选项 快速安装: 说明:  内建模块无需安装 解析方法: getopt(args, shortopts, longopts = []) -> (opts, args) 说明: args为要解析的参数序列,常为sys.argv[1:],shortopts为单字符选项定义串,如果某个选项需要一个参数,响应字母后面必须有一个冒号,longopts为长格式的选项名序列,可以包含多个字符,序列元素必须包含--前缀,如果此长选项需要参数则其名应包含