Python 多线程同步队列模型
我面临的问题是有个非常慢的处理逻辑(比如分词、句法),有大量的语料,想用多线程来处理。
这一个过程可以抽象成一个叫“同步队列”的模型。 具体来讲,有一个生产者(Dispatcher)一方面从语料中读入句子,并且存入队列中,一方面看有没有空闲的消费者(Segmentor),如果有,就把句子从队列中弹出并交给这个空闲的消费者处理。 然后消费者把处理完成的结果交给生产者输出,生产者要保证输出与输入顺序一致。
消费者是典型的threading,它需要看见生成者的队列,从而从队列中拿一些数据。
对于生产者,python中有一个叫Queue的module,实现了FIFO的同步队列。 但它只能保证输入与交付消费者的顺序的有序,但不能保障生产者在输出时有序,所以需要一个buffer来保存输出顺序。 程序的模型大概是这样的。有一个master(),用来分发任务。有N个多线程的slave用来处理任务。
具体程序如下:
#!/usr/bin/env python
# real 3m0.263s
# user 0m0.016s
# sys 0m0.012s
from time import sleep
from random import random
from Queue import Queue
from threading import Thread, Lock
class Segmentor(Thread):
def __init__(self, dispatcher):
Thread.__init__(self)
self.d = dispatcher
def run(self):
while True:
idx, item = self.d.get()
# segment section
sleep(random() * 5)
# output section
d.output( idx, item )
self.d.task_done()
class Dispatcher(Queue):
def __init__(self):
Queue.__init__(self)
self.idx = 0
self.box = {}
self.lock = Lock()
def output(self, idx, item):
self.lock.acquire()
if idx > self.idx:
self.box[idx] = item
elif idx == self.idx:
self._output(item)
self.idx += 1
while self.idx in self.box:
item = self.box[self.idx]
self._output(item)
self.idx += 1
self.lock.release()
def _output(self, item):
print item
if __name__=="__main__":
d = Dispatcher()
for i in xrange(4):
t = Segmentor(d)
t.daemon = True
t.start()
num = 0
for line in open("data", "r"):
d.put( (num, line.strip()) )
num += 1
d.join()
在300句的条件下,单线程的处理速度约为750s=12m,开4个线程后3m可以处理完成,并且输出是有序的。
其他语言应该可以仿照这个方式编写程序,对于没有同步队列的语言,实现时可以参考这个http://hg.python.org/cpython/file/2.7/Lib/Queue.py
时间: 2024-10-12 08:10:41