Python 多线程同步队列模型

Python 多线程同步队列模型


我面临的问题是有个非常慢的处理逻辑(比如分词、句法),有大量的语料,想用多线程来处理。

这一个过程可以抽象成一个叫“同步队列”的模型。 具体来讲,有一个生产者(Dispatcher)一方面从语料中读入句子,并且存入队列中,一方面看有没有空闲的消费者(Segmentor),如果有,就把句子从队列中弹出并交给这个空闲的消费者处理。 然后消费者把处理完成的结果交给生产者输出,生产者要保证输出与输入顺序一致。

消费者是典型的threading,它需要看见生成者的队列,从而从队列中拿一些数据。

对于生产者,python中有一个叫Queue的module,实现了FIFO的同步队列。 但它只能保证输入与交付消费者的顺序的有序,但不能保障生产者在输出时有序,所以需要一个buffer来保存输出顺序。 程序的模型大概是这样的。有一个master(),用来分发任务。有N个多线程的slave用来处理任务。

具体程序如下:

#!/usr/bin/env python
# real    3m0.263s
# user    0m0.016s
# sys     0m0.012s

from time import sleep
from random import random
from Queue import Queue
from threading import Thread, Lock

class Segmentor(Thread):
    def __init__(self, dispatcher):
        Thread.__init__(self)
        self.d = dispatcher

    def run(self):
        while True:
            idx, item = self.d.get()
            # segment section
            sleep(random() * 5)
            # output section
            d.output( idx, item )
            self.d.task_done()

class Dispatcher(Queue):
    def __init__(self):
        Queue.__init__(self)
        self.idx = 0
        self.box = {}
        self.lock = Lock()

    def output(self, idx, item):
        self.lock.acquire()
        if idx > self.idx:
            self.box[idx] = item
        elif idx == self.idx:
            self._output(item)
            self.idx += 1
            while self.idx in self.box:
                item = self.box[self.idx]
                self._output(item)
                self.idx += 1

        self.lock.release()

    def _output(self, item):
        print item

if __name__=="__main__":
    d = Dispatcher()
    for i in xrange(4):
        t = Segmentor(d)
        t.daemon = True
        t.start()

    num = 0
    for line in open("data", "r"):
        d.put( (num, line.strip()) )
        num += 1

    d.join()

在300句的条件下,单线程的处理速度约为750s=12m,开4个线程后3m可以处理完成,并且输出是有序的。

其他语言应该可以仿照这个方式编写程序,对于没有同步队列的语言,实现时可以参考这个http://hg.python.org/cpython/file/2.7/Lib/Queue.py

时间: 2024-10-12 08:10:41

Python 多线程同步队列模型的相关文章

python多线程同步实例分析

进程之间通信与线程同步是一个历久弥新的话题,对编程稍有了解应该都知道,但是细说又说不清.一方面除了工作中可能用的比较少,另一方面就是这些概念牵涉到的东西比较多,而且相对较深.网络编程,服务端编程,并发应用等都会涉及到.其开发和调试过程都不直观.由于同步通信机制的原理都是想通的,本文希通过望借助python实例来将抽象概念具体化. 阅读之前可以参考之前的一篇文章:python多线程与多进程及其区别,了解一下线程和进程的创建. python多线程同步 python中提供两个标准库thread和thr

Python多线程同步

对Python多线程实现同步机制及其遇到的一些问题.本文给出了样例代码 ,主要包括Condition,Event and Queue三种机制 1. 锁机制 threading的Lock类,用该类的acquire函数进行加锁,用realease函数进行解锁,当一个线程调用锁的acquire()方法获得锁时,锁就进入“locked”状态.每次只有一个线程可以获得锁.如果此时另一个线程试图获得这个锁,该线程就会变为“blocked”状态,称为“同步阻塞” 在此没有给出样例后面条件 2.条件 Conti

python多线程同步机制Semaphore

#!/usr/bin/env python # -*- coding: utf-8 -*- """ Python 线程同步机制:Semaphore """ import time import threading import random # 信号量同步基于内部计数器,每调用一次acquire(),计数器减1:每调用一次release(),计数器加1.当计数器为0时,acquire()调用被阻塞. sema = threading.Semaph

python多线程--优先级队列(Queue)

Python的Queue模块中提供了同步的.线程安全的队列类,包括FIFO(先入先出)队列Queue,LIFO(后入先出)队列LifoQueue,和优先级队列PriorityQueue.这些队列都实现了锁原语,能够在多线程中直接使用.可以使用队列来实现线程间的同步. Queue模块中的常用方法: Queue.qsize() 返回队列的大小 Queue.empty() 如果队列为空,返回True,反之False Queue.full() 如果队列满了,返回True,反之False Queue.fu

利用python多线程和队列管理shell程序

首先来描述下环境,在机器上有很多个JAVA程序,我们在每个JAVA程序里都配置了一个启动|停止|重启的脚本 举个例子: 我们现在要同时运行这些脚本,来达到快速启动所有的JAVA程序,如果我们只用多线程的话,线程是不会返回消息给父进程,我们如何才能知道这些程序是启动成功了呢? 所以我们用到了队列来管理. """我试过gevent,但是会在command这里造成阻塞""" gevent代码如下  如果有朋友知道如何优化,请您告诉我 #!/usr/bi

python 多线程与队列

各位好,之前写了多线程,但是在实际的生产中,往往情况比较复杂,要处理一批任务(比如要处理列表中所有元素),这时候不可能创建很多的线程,线程过多反而不好,还会造成资源开销太大,这时候想到了队列. Queue队列 Queue用于建立和操作队列,常和threading类一起用来建立一个简单的线程队列. Queue.Queue(maxsize) FIFO(先进先出队列) Queue.LifoQueue(maxsize) LIFO(先进后出队列) Queue.PriorityQueue(maxsize)

python多线程同步机制condition

#!/usr/bin/env python# -*- coding: utf-8 -*- import threadingimport time def customer(cond): t = threading.currentThread() with cond: # wait()方法创建了一个名为waiter的锁,并且设置锁的状态为locked.这个waiter锁用于线程间的通讯 cond.wait() print '{}: Resource is available to consumer

python多线程同步机制Lock

#!/usr/bin/env python# -*- coding: utf-8 -*- import threadingimport time value = 0lock = threading.Lock() def add(): global value with lock: new_value = value + 1 time.sleep(0.001) value = new_value if __name__ == '__main__': threads = [] for i in ra

python多线程多队列(BeautifulSoup网络爬虫)

程序大概内容如下: 程序中设置两个队列分别为queue负责存放网址,out_queue负责存放网页的源代码. ThreadUrl线程负责将队列queue中网址的源代码urlopen,存放到out_queue队列中. DatamineThread线程负责使用BeautifulSoup模块从out_queue网页的源代码中提取出想要的内容并输出. 这只是一个基本的框架,可以根据需求继续扩展. 程序中有很详细的注释,如有有问题跪求指正啊. import Queue import threading i