首先介绍下python异步执行,python有两种方法编写异步代码:
1、corutines协程(也称为greenlets)
2、回调
gevent是greenlets的一种实现方式,可以通过pip方便的安装gevent模块。gevent执行方式实际上是代码块的交替执行,具体的可以看下这篇blog,我就不重复造轮子了。
值得一提的是,gevent封装了很多接口,其中一个是著名的猴子补丁monkey,
from gevent import monkey monkey.patch_all()
这两行就可以在代码中改变其余包的行为,让其从同步阻塞方式变为异步非阻塞方式,非常的神奇。
我利用gevent的异步非阻塞方式写了一个手机号段蜘蛛爬虫,目前一直在服务器稳定的运行,代码详见我的github,内有福利。脚本用法:python numspiderlist.py -s [String, e.g:138,137,1393134,1700001-1709999,1450000-1459999]
1 #!/usr/bin/python 2 #-*- coding:utf-8 -*- 3 """手机号段爬虫:接收用户命令参数精简版 for sqlitedb 4 @version:1.0 5 @author:Kenny{Kenny.F<mailto:[email protected]>} 6 @since:2014/05/23 7 """ 8 import sys 9 reload(sys) 10 sys.setdefaultencoding(‘utf8‘) 11 import gevent #gevent协程包 12 import multiprocessing #多进程 13 from multiprocessing import Manager 14 import urllib2 15 from urllib import unquote,quote 16 import socket 17 socket.setdefaulttimeout(20) 18 import cookielib 19 import random 20 import simplejson as json 21 import os 22 import time 23 import sqlite3 #sqlite数据库操作 24 from functools import wraps #方法工具 25 from strtodecode import strtodecode #编码检测转换 26 27 28 manager = Manager() #多进程共享队列 29 lacknumlist = manager.list() 30 31 32 def multi_run_wrapper(func): #多进程map包裹参数 33 @wraps(func) 34 def newF(args): 35 if isinstance(args,list): 36 return func(*args) 37 elif isinstance(args,tuple): 38 return func(*args) 39 else: 40 return func(args) 41 return newF 42 43 44 def getRanIp(): #得到随机IP 45 #123.125.40.255 - 123.127.134.56 北京联通154938条 46 return "123.{0}.{1}.{2}".format(random.randint(125,127), random.randint(40,134), random.randint(56,255)) 47 48 49 def _cookiePool(url): #查看cookie池 50 cookie = cookielib.CookieJar() 51 opener = urllib2.build_opener(urllib2.HTTPCookieProcessor(cookie)) 52 opener.open(url) 53 for item in cookie: 54 print ‘Name = ‘+item.name 55 print ‘Value = ‘+item.value 56 57 58 def catchPage(url=‘‘): #封装的网页页面获取 59 if not url: 60 return False 61 62 with open("./logs/outprint.txt","a") as f: 63 f.write(url+"\n") 64 65 try: 66 headers = { 67 ‘User-Agent‘:‘Mozilla/5.0 (Windows; U; Windows NT 6.1; en-US; rv:1.9.1.6) Gecko/20091201 Firefox/3.5.6‘, 68 ‘Referer‘:‘http://www.baidu.com‘, 69 "X-Forwarded-For":getRanIp() 70 } 71 req = urllib2.Request( 72 url = url, 73 headers = headers 74 ) 75 76 html = ‘‘ 77 result = ‘‘ 78 try: 79 try: 80 gevent.Timeout 81 except: 82 result = urllib2.urlopen(req,timeout=20) 83 else: 84 with gevent.Timeout(20, False): 85 result = urllib2.urlopen(req) 86 except urllib2.HTTPError, e: 87 #For Ptyhon 2.6 88 try: 89 socket.timeout 90 except: 91 print ‘The server couldn\‘t fulfill the request.‘ 92 print "url:{0} Httperrorcode:{1}".format(url, e.code) 93 else: 94 if isinstance(e.reason, socket.timeout): 95 print ‘The server couldn\‘t fulfill the request.‘ 96 print "url:{0} Httperrorcode:{1}".format(url, e.code) 97 except urllib2.URLError, e: 98 print ‘We failed to reach a server.‘ 99 print "url:{0} Reason:{1}".format(url, e.reason) 100 except socket.timeout, e: 101 #For Python 2.7 102 print ‘The server couldn\‘t fulfill the request.‘ 103 print "url:{0} Httperrorcode:{1}".format(url, e) 104 else: 105 if result: 106 html = result.read() 107 return html 108 except: 109 try: 110 socket.timeout 111 except: 112 print ‘The server couldn\‘t fulfill the request.‘ 113 print "url:{0} Httperrorcode:{1}".format(url, ‘timeout‘) 114 else: 115 print ‘The server couldn\‘t fulfill the request.‘ 116 print "url:{0} Server someting error".format(url) 117 return False 118 119 120 def opensqlitedb(): #从sqlite数据源开始工作 121 db_file = ‘./data/mobile_area.db‘ 122 123 if not os.path.exists(db_file): 124 try: 125 cx = sqlite3.connect(db_file) 126 cu = cx.cursor() 127 #建表 128 sql = "create table mobile_area (id integer primary key,129 mobile_num integer,130 mobile_area varchar(50) NULL,131 mobile_type varchar(50) NULL,132 area_code varchar(50) NULL,133 post_code varchar(50) NULL)" 134 cu.execute(sql) 135 except: 136 print "can not find sqlite db file\n" 137 with open(‘./logs/errorlog.txt‘,‘a‘) as f: 138 f.write("can not find sqlite db file ‘%s‘\n" % str(db_file)) 139 return False 140 else: 141 try: 142 cx = sqlite3.connect(db_file) 143 cu = cx.cursor() 144 except: 145 print "can not find sqlite db file\n" 146 with open(‘./logs/errorlog.txt‘,‘a‘) as f: 147 f.write("can not find sqlite db file ‘%s‘\n" % str(db_file)) 148 return False 149 150 mobile_err_list,mobile_dict = [],{} 151 limit = 10000 152 offset = 0 153 mobile_num_pre = 0 154 while 1: 155 cu.execute("SELECT * FROM mobile_area ORDER BY mobile_num ASC LIMIT %d OFFSET %d " % (limit, offset)) 156 rs = cu.fetchall() 157 if not rs: 158 break 159 else: 160 offset = offset + limit 161 for i in xrange(0,len(rs)): 162 id = rs[i][0] 163 mobile_num = int(rs[i][1]) 164 mobile_area = rs[i][2] 165 mobile_type = rs[i][3] 166 area_code = rs[i][4] 167 post_code = rs[i][5] 168 169 if len(mobile_area) > 100 or (not mobile_area) or (not mobile_num) or len(mobile_type) > 100 or len(area_code) > 100 or len(post_code) > 100 or len(str(mobile_num)) > 7: 170 print "error id:%d" % id 171 continue 172 173 #正确的号码入字典 174 mobile_dict[str(mobile_num)] = True 175 176 print "get data from sqlite works down!\n" 177 return mobile_dict 178 179 180 @multi_run_wrapper 181 def getNumPage(segnum=‘‘, num=‘‘, url=‘‘): #获取号码页详细数据 182 if not segnum: 183 return False 184 if not num: 185 return False 186 if not url: 187 return False 188 189 gevent.sleep(random.randint(10,22)*0.81) #从此处协程并发 190 191 db_file = ‘./data/mobile_area.db‘ 192 193 html = catchPage(url) 194 if not html: 195 print "catch %s num page error!" % num 196 print "url:%s\n" % (url) 197 with open("./logs/errornum.txt", "a") as f: 198 f.write(segnum+‘,‘+num+‘,‘+url+"\n") 199 return False 200 201 #json数据 202 try: 203 page_temp_dict = json.loads(unquote(html)) 204 except: 205 print segnum+‘,‘+num+‘,‘+url+",result error convert to dict\n" 206 with open(‘./logs/errorlog.txt‘,‘a‘) as f: 207 f.write(segnum+‘,‘+num+‘,‘+url+",result error convert to dict\n") 208 return False 209 else: 210 try: 211 cx = sqlite3.connect(db_file) 212 cu = cx.cursor() 213 except: 214 print "can not find sqlite db file\n" 215 with open(‘./logs/errorlog.txt‘,‘a‘) as f: 216 f.write("can not find sqlite db file ‘%s‘\n" % str(db_file)) 217 return False 218 219 insdata = {} 220 #mobile_num 221 if page_temp_dict.get(‘Mobile‘, False): 222 insdata[‘mobile_num‘] = int(page_temp_dict[‘Mobile‘]) 223 else: 224 with open(‘./logs/errorlog.txt‘,‘a‘) as f: 225 f.write(segnum+‘,‘+num+‘,‘+url+",No matching data\n") 226 return False #无号码 227 #mobile_area 228 if page_temp_dict.get(‘Province‘, False): 229 if page_temp_dict[‘Province‘] == u‘未知‘: 230 with open(‘./logs/errorlog.txt‘,‘a‘) as f: 231 f.write(segnum+‘,‘+num+‘,‘+url+",province is weizhi\n") 232 return False #无地区 233 if page_temp_dict.get(‘City‘, False): 234 insdata[‘mobile_area‘] = strtodecode(page_temp_dict[‘Province‘]+‘ ‘+page_temp_dict[‘City‘]) 235 else: 236 insdata[‘mobile_area‘] = strtodecode(page_temp_dict[‘Province‘]+‘ ‘+page_temp_dict[‘Province‘]) 237 else: 238 with open(‘./logs/errorlog.txt‘,‘a‘) as f: 239 f.write(segnum+‘,‘+num+‘,‘+url+",No matching province\n") 240 return False #无地区 241 #mobile_type 242 if page_temp_dict.get(‘Corp‘, False): 243 if page_temp_dict.get(‘Card‘, False): 244 insdata[‘mobile_type‘] = strtodecode(page_temp_dict[‘Corp‘]+‘ ‘+page_temp_dict[‘Card‘]) 245 else: 246 insdata[‘mobile_type‘] = strtodecode(page_temp_dict[‘Corp‘]) 247 #area_code 248 if page_temp_dict.get(‘AreaCode‘, False): 249 insdata[‘area_code‘] = strtodecode(page_temp_dict[‘AreaCode‘]) 250 #post_code 251 if page_temp_dict.get(‘PostCode‘, False): 252 insdata[‘post_code‘] = strtodecode(page_temp_dict[‘PostCode‘]) 253 254 if insdata: 255 sql = "insert into mobile_area values (?,?,?,?,?,?)" 256 cu.execute(sql, (None,insdata[‘mobile_num‘],insdata[‘mobile_area‘],insdata[‘mobile_type‘],insdata[‘area_code‘],insdata[‘post_code‘])) 257 258 try: 259 cx.commit() #执行insert 260 except: 261 with open(‘./logs/errorlog.txt‘,‘a‘) as f: 262 f.write(segnum+‘,‘+num+‘,‘+url+",insert sqlitdb faild\n") 263 return False 264 else: 265 return True 266 267 def getneednum(url=‘‘, step=10): #获取所有未记录的号码信息数据 268 if not lacknumlist: 269 return False 270 if not url: 271 return False 272 if not step: 273 print "step can not be null" 274 return False 275 if not isinstance(step,int): 276 print "step should be numeric" 277 return False 278 if step < 0: 279 print "step should be > 0" 280 return False 281 282 offset = 0 283 limit = int(step) 284 len_max = len(lacknumlist) 285 breaktag = False 286 while 1: 287 if breaktag: 288 break 289 290 threads = [] 291 for i in xrange(offset,(limit+offset)): 292 try: 293 num = lacknumlist[i] 294 except: 295 breaktag = True 296 break 297 else: 298 furl = url() 299 threads.append( gevent.spawn(getNumPage, (num[0:3], num, furl+num)) ) #协程并发 300 301 try: 302 gevent.joinall(threads) 303 print "%d-%d is end\n" % (offset+1,limit+offset) 304 except Exception as e: 305 print "Gevent catch error\n" 306 307 offset = offset + limit 308 time.sleep(random.randint(5,80)*0.9) 309 310 i = 1 #处理网络异常号码数据10次 311 while i <= 10: 312 if not os.path.exists("./logs/errornum.txt"): 313 break 314 j = 1 315 threads = [] 316 with open("./logs/errornum.txt","r") as f: 317 while 1: 318 if (j >= step) and threads: 319 try: 320 gevent.joinall(threads) 321 except Exception as e: 322 print "turn%d-%d Gevent catch error\n" % (i,j) 323 time.sleep(random.randint(5,80)*0.9) 324 threads = [] 325 j = 0 326 line = f.readline() 327 if line: 328 errnum_str = line.strip() 329 errnum_truple = errnum_str.split(‘,‘) 330 threads.append(gevent.spawn(getNumPage, (errnum_truple[0], errnum_truple[1], errnum_truple[2]))) 331 else: 332 if threads: 333 try: 334 gevent.joinall(threads) 335 except Exception as e: 336 print "turn%d-%d Gevent catch error\n" % (i,j) 337 break 338 j += 1 339 340 if i < 10: 341 with open("./logs/errornum.txt","w") as f: #清除文件内容 342 pass 343 i = i + 1 344 345 346 def setneednum(num=‘‘, mobile_dict={}): #设置得到所有未补全的号码 347 if not num: 348 return False 349 350 if len(str(num))==3: 351 start_num = int(num+‘0000‘) 352 end_num = int(num+‘9999‘) 353 else: 354 num_list = num.split(‘-‘) 355 start_num = int(num_list[0]) 356 end_num = int(num_list[1]) 357 358 i = start_num 359 while i <= end_num: 360 if not mobile_dict.get(str(i),False): #查找没有的号码 361 lacknumlist.append(str(i)) 362 i += 1 363 # print "%s num works down\n" % num 364 365 366 def setsegnum(segnumlist=[], mobile_dict={}): #根据号段起并发进程 367 if not segnumlist: 368 return False 369 370 record = [] 371 for seg in xrange(0, len(segnumlist)): 372 segnum = segnumlist[seg].strip() 373 if len(str(segnum)) == 3: #指定的单个号段:137 374 try: 375 int(segnum) 376 except: 377 print "%s is illegal argument\n" % str(segnum) 378 continue 379 else: 380 process = multiprocessing.Process(target=setneednum, args=(str(segnum), mobile_dict)) 381 process.start() 382 record.append(process) 383 elif len(str(segnum)) == 7: #具体指定的单个号码:1391234 384 if not mobile_dict.get(str(segnum),False): 385 lacknumlist.append(str(segnum)) #sqlite没有的号码 386 else: 387 segparam_list = segnum.split(‘-‘) 388 try: 389 int(segparam_list[0]) 390 except: 391 print "%s is illegal argument\n" % str(segnum) 392 continue 393 else: 394 try: 395 segparam_list[1] 396 except: 397 print "%s is illegal argument\n" % str(segnum) 398 continue 399 else: 400 if segparam_list[0][:3] == segparam_list[1][:3] : #指定号码范围:1380000-1389999 401 process = multiprocessing.Process(target=setneednum, args=(str(segnum), mobile_dict)) 402 process.start() 403 record.append(process) 404 else: 405 print "%s is illegal argument\n" % str(segnum) 406 continue 407 for process in record: 408 process.join() 409 410 print "all SegNum prepare works down!\n" 411 412 413 def callback_url_showji(): #返回showji网的api地址 414 showji = ‘http://api.showji.com/Locating/www.showji.c.o.m.aspx?output=json‘ 415 return "{0}×tamp={1}&m=".format(showji, int(time.time())) 416 417 418 def main(param=‘‘): #主方法 419 with open("./logs/errornum.txt","w") as f: #清除零时文件内容 420 pass 421 with open("./logs/outprint.txt","w") as f: 422 pass 423 424 if not param: 425 print "no argument!" 426 return False 427 428 # segnumlist = [\ 429 # # ‘134‘,‘135‘,‘136‘,‘137‘,‘138‘,‘139‘,‘147‘,‘150‘,‘151‘,‘152‘,‘157‘,‘158‘,‘159‘,‘182‘,‘183‘,‘187‘,‘188‘,\ 430 # # ‘130‘,‘131‘,‘132‘,‘136‘,‘145‘,‘185‘,‘186‘,\ 431 # # ‘133‘,‘153‘,‘180‘,‘189‘,\ 432 # # ‘147‘,‘155‘,‘156‘,‘170‘,‘176‘,‘177‘,‘178‘,‘181‘,‘184‘\ 433 # ] 434 435 segnumlist = str(param).split(‘,‘) 436 437 #从sqlite库查已有的 438 mobile_dict = opensqlitedb() 439 440 #算哪些是还没有的 441 setsegnum(segnumlist, mobile_dict) 442 if lacknumlist: 443 tempstr = ‘‘ 444 for i in xrange(0,len(lacknumlist)): 445 tempstr += str(lacknumlist[i])+"\n" 446 with open("./logs/needmobilelist.txt","w") as f: 447 f.write(tempstr) 448 449 #补没有的 450 getneednum(callback_url_showji) 451 452 print "all works end!" 453 454 455 if __name__ == "__main__": 456 from optparse import OptionParser 457 USAGE = "usage:python numspiderlist.py -s [String, e.g:138,137,1393134,1700001-1709999,1450000-1459999]" 458 parser = OptionParser(USAGE) 459 parser.add_option("-s", dest="s") 460 opt,args = parser.parse_args() 461 judopt = lambda x:x.s 462 463 if not opt.s: 464 print USAGE 465 sys.exit(1) 466 467 if not judopt(opt): 468 print USAGE 469 sys.exit(1) 470 471 if opt.s: 472 content = opt.s 473 474 main(content)
如果你看的仔细一定会发现我在加了这样两行:
import socket socket.setdefaulttimeout(20)
这是为了兼容python2.6以下版本urllib2的timeout无法正常生效。而且在gevent异步非阻塞方式下urllib2的阻塞方式需要改用gevent.Timeout()替代。
时间: 2024-10-09 01:24:04