一个几十G的文件想用Python多线程读取提高处理效率,得到的结果总是不如预期。在毛帅的提醒下才发现一个进程启动的线程将共享文件句柄,A线程对文件的操作(即使是读)也将影响到B线程。如图,图片来自毛帅:
测试代码如下:
# -*- coding: UTF-8 -*- def threadFunc1(demo, threadnum, startlinenum, deallinenum): # 行数计数器 line = 0 # skip若干行 while line < startlinenum: lineStr = demo.readline() line += 1 # deal指定行数 while line < deallinenum: lineStr = demo.readline() line += 1 print "Thread-%s:%s" % (threadnum, lineStr) def threadFunc2(demo, threadnum, startlinenum, deallinenum): linenum = 1 for line in demo: if linenum >= startlinenum and linenum < (startlinenum+deallinenum): print "Thread-%s:%s" % (threadnum, line) elif (linenum >= (startlinenum+deallinenum)): break; linenum = linenum+1 import threading if __name__ == "__main__": # 初始化demo文件 fileLoc = ‘demo.txt‘ demo = open(fileLoc,‘w‘) for i in range(100000): demo.write("Line:"+str(i)+‘\n‘) demo.close() # 预读文件总行数 demo = open(fileLoc,‘r‘) filetotalnum = 0 for line in demo: filetotalnum = filetotalnum + 1 demo.close() # 设定线程数 TOTAL_THREAD = 6 # 在主进程总打开文件,将文件句柄传至线程启动的函数中 demo = open(fileLoc,‘r‘) for threadnum in range(TOTAL_THREAD): # 每个线程处理的行数 deallinenum = filetotalnum / (TOTAL_THREAD-1) # 最后一个线程处理剩余部分 leftnum = filetotalnum % (TOTAL_THREAD-1) if(threadnum != (TOTAL_THREAD-1)): # 实例化线程 t = threading.Thread(target=threadFunc2,args=(demo, threadnum, threadnum*deallinenum, threadnum*deallinenum+deallinenum)) # 启动 t.start() # 等待线程结束后主进程退出 t.join() else : t = threading.Thread(target=threadFunc2,args=(demo, threadnum, threadnum*deallinenum, threadnum*deallinenum+leftnum)) t.start() t.join()
threadFunc2函数中利用句柄参数读取文件,但结果是有些线程有正常输出,其它一些线程则无输出。目测原因是线程切换的随机性影响了本来应正常进行读取的线程(例如一些线程正好定位到属于其处理范围的行时发生线程切换,而其它线程获得时间片后在这个位置基础上继续处理)。
threadFunc1使用readLine进行读取,但本质同threadFunc2一样。
如果传给threadFuncx的并不是在主线程中打开的文件句柄而是一个文件路径,并在threadFuncx内部打开文件,结果表现为各个线程都有输出,但是输出的内容并非分配给其的文件片段的内容,有干涉。
细节上未进行深入研究,总之不加锁的情况下,python启动多线程读取文件会得到非预期结果。
至于最后的解决方案,是利用awk对文件进行“模数切割”,Mod的不同余数分别对应一个处理线程。
targetFile=xxx threadNum=5 # 设定五个线程 for((i=0; i<threadNum; i++)) do awk ‘NR%n==t {print $0}‘ n=$threadNum t=$i $targetFile | python doTask.py & done
时间: 2024-10-13 02:05:44