大家都知道百度云网盘上有很多分享的资源,包括软件、各类视频自学教程、电子书、甚至各种电影、BT种子应有尽有,但百度云却没有提供相应的搜索功能。个人平时要找一些软件、美剧觉得非常蛋疼。于是就尝试开发一个百度云资源的搜索系统。
资源爬虫思路:
搜索引擎么最重要的就是有海量的资源了,有了资源,只要再基于资源实现全文检索功能就是一个简单的搜索引擎了。首先我需要爬取百度云的分享资源,爬取思路,打开任意一个百度云分享者的主页yun.baidu.com/share/home?uk=xxxxxx&view=share#category/type=0,你可以发现分享者有订阅者和粉丝,你可以递归遍历订阅者和粉丝,从而获得大量分享者uk,进而获得大量的分享资源。
系统实现环境:
语言:python
操作系统:Linux
其他中间件:nginx mysql sphinx
系统包括几个独立的部分:
1、基于requests实现的独立资源爬虫
2、基于开源全文检索引擎sphinx实现的资源索引程序
3、基于Django+bootstrap3开发的简易网站,网站搭建采用nginx1.8+fastCGI(flup)+python。演示网站http://www.itjujiao.com
PS:
目前爬虫爬取了4000W左右的数据,sphinx对内存的要求实在太大了,巨坑。
百度会对爬虫做ip限制,写了个简单的xicidaili代理采集程序,requests可以配置http代理。
分词是sphinx自带的实现,支持中文分词,中文基于一元分词,有点过度分词,分词效果不是特别理想,比如我搜关键词“叶问3”出现的结果中会有“叶子的问题第3版”,不符合预期。英文分词有很多可以改善的地方,比如我搜xart不会出现x-art的结果,而实际上x-art却也是我想要的结果集(你们懂的)。
数据库是mysql,资源表,考虑单表记录上限,分了10个表。第一次爬完sphinx做全量索引,后续做增量索引。
后续优化:
1、分词处理,目前分词搜索结果不是很理想,有大神可以指点下思路。比如我检索“功夫熊猫之卷轴的秘密”,一个结果都没有。而检索“功夫熊猫“有结果集(功丶夫熊猫⒊英语中英字幕.mp4,功丶夫熊猫2.Kung.Fu.Panda.2.2011.BDrip.720P.国粤英台四语.特效中英字幕.mp4,功丶夫熊猫3(韩版)2016.高清中字.mkv等)或搜索”卷轴的秘密“有结果集([美国]功夫潘达之卷轴的秘密.2016.1080p.mp4,g夫熊猫之卷轴的秘密.HD1280超清中英双字.mp4等)
2、数据去重,目前发现抓取的数据很多是共享资源,后续考虑基于MD5去重
爬虫部分实现代码(只是思路代码有点乱):
#coding: utf8 import re import urllib2 import time from Queue import Queue import threading, errno, datetime import json import requests import MySQLdb as mdb DB_HOST = ‘127.0.0.1‘ DB_USER = ‘root‘ DB_PASS = ‘‘ re_start = re.compile(r‘start=(\d+)‘) re_uid = re.compile(r‘query_uk=(\d+)‘) re_pptt = re.compile(r‘&pptt=(\d+)‘) re_urlid = re.compile(r‘&urlid=(\d+)‘) ONEPAGE = 20 ONESHAREPAGE = 20 URL_SHARE =‘http://yun.baidu.com/pcloud/feed/getsharelist?auth_type=1&start={start}&limit=20&query_uk={uk}&urlid={id}‘ URL_FOLLOW =‘http://yun.baidu.com/pcloud/friend/getfollowlist?query_uk={uk}&limit=20&start={start}&urlid={id}‘ URL_FANS =‘http://yun.baidu.com/pcloud/friend/getfanslist?query_uk={uk}&limit=20&start={start}&urlid={id}‘ QNUM = 1000 hc_q = Queue(20) hc_r = Queue(QNUM) success = 0 failed = 0 PROXY_LIST = [[0, 10,"42.121.33.160", 809, "", "", 0], [5, 0, "218.97.195.38", 81, "", "", 0], ] def req_worker(inx): s =requests.Session() whileTrue: req_item = hc_q.get() req_type = req_item[0] url = req_item[1] r= s.get(url) hc_r.put((r.text, url)) print "req_worker#", inx, url def response_worker(): dbconn = mdb.connect(DB_HOST, DB_USER, DB_PASS, ‘baiduyun‘,charset=‘utf8‘) dbcurr = dbconn.cursor() dbcurr.execute(‘SET NAMES utf8‘) dbcurr.execute(‘set global wait_timeout=60000‘) whileTrue: metadata, effective_url = hc_r.get() #print "response_worker:", effective_url try: tnow = int(time.time()) id = re_urlid.findall(effective_url)[0] start = re_start.findall(effective_url)[0] if True: if ‘getfollowlist‘ in effective_url: #type = 1 follows =json.loads(metadata) uid =re_uid.findall(effective_url)[0] if "total_count"in follows.keys() and follows["total_count"]>0 and str(start) =="0": for i inrange((follows["total_count"]-1)/ONEPAGE): try: dbcurr.execute(‘INSERT INTO urlids(uk, start, limited, type, status)VALUES(%s, %s, %s, 1, 0)‘ % (uid, str(ONEPAGE*(i+1)), str(ONEPAGE))) except Exception asex: print"E1", str(ex) pass if "follow_list"in follows.keys(): for item infollows["follow_list"]: try: dbcurr.execute(‘INSERT INTO user(userid, username, files, status,downloaded, lastaccess) VALUES(%s, "%s", 0, 0, 0, %s)‘ %(item[‘follow_uk‘], item[‘follow_uname‘], str(tnow))) except Exception asex: print"E13", str(ex) pass else: print "delete1", uid, start dbcurr.execute(‘deletefrom urlids where uk=%s and type=1 and start>%s‘ % (uid, start)) elif ‘getfanslist‘ in effective_url: #type = 2 fans = json.loads(metadata) uid =re_uid.findall(effective_url)[0] if "total_count"in fans.keys() and fans["total_count"]>0 and str(start) =="0": for i inrange((fans["total_count"]-1)/ONEPAGE): try: dbcurr.execute(‘INSERT INTO urlids(uk, start, limited, type, status)VALUES(%s, %s, %s, 2, 0)‘ % (uid, str(ONEPAGE*(i+1)), str(ONEPAGE))) except Exception asex: print"E2", str(ex) pass if "fans_list" infans.keys(): for item infans["fans_list"]: try: dbcurr.execute(‘INSERTINTO user(userid, username, files, status, downloaded, lastaccess) VALUES(%s,"%s", 0, 0, 0, %s)‘ % (item[‘fans_uk‘], item[‘fans_uname‘],str(tnow))) except Exception asex: print "E23",str(ex) pass else: print "delete2", uid, start dbcurr.execute(‘deletefrom urlids where uk=%s and type=2 and start>%s‘ % (uid, start)) else: shares =json.loads(metadata) uid =re_uid.findall(effective_url)[0] if "total_count"in shares.keys() and shares["total_count"]>0 and str(start) =="0": for i inrange((shares["total_count"]-1)/ONESHAREPAGE): try: dbcurr.execute(‘INSERT INTO urlids(uk, start, limited, type, status)VALUES(%s, %s, %s, 0, 0)‘ % (uid, str(ONESHAREPAGE*(i+1)), str(ONESHAREPAGE))) except Exception asex: print"E3", str(ex) pass if "records" inshares.keys(): for item inshares["records"]: try: dbcurr.execute(‘INSERT INTO share(userid, filename, shareid, status)VALUES(%s, "%s", %s, 0)‘ % (uid, item[‘title‘], item[‘shareid‘])) except Exception asex: #print"E33", str(ex), item pass else: print "delete0", uid, start dbcurr.execute(‘deletefrom urlids where uk=%s and type=0 and start>%s‘ % (uid, str(start))) dbcurr.execute(‘delete from urlids where id=%s‘ % (id, )) dbconn.commit() except Exception as ex: print "E5", str(ex), id pid = re_pptt.findall(effective_url) if pid: print "pid>>>", pid ppid = int(pid[0]) PROXY_LIST[ppid][6] -= 1 dbcurr.close() dbconn.close() def worker(): global success, failed dbconn = mdb.connect(DB_HOST, DB_USER, DB_PASS, ‘baiduyun‘,charset=‘utf8‘) dbcurr = dbconn.cursor() dbcurr.execute(‘SET NAMES utf8‘) dbcurr.execute(‘set global wait_timeout=60000‘) whileTrue: #dbcurr.execute(‘select * from urlids where status=0 order by type limit1‘) dbcurr.execute(‘select * from urlids where status=0 and type>0 limit1‘) d= dbcurr.fetchall() #print d if d: id = d[0][0] uk = d[0][1] start = d[0][2] limit = d[0][3] type = d[0][4] dbcurr.execute(‘update urlids set status=1 where id=%s‘ % (str(id),)) url = "" if type == 0: url = URL_SHARE.format(uk=uk, start=start, id=id).encode(‘utf-8‘) elif type == 1: url = URL_FOLLOW.format(uk=uk, start=start, id=id).encode(‘utf-8‘) elif type == 2: url = URL_FANS.format(uk=uk, start=start, id=id).encode(‘utf-8‘) if url: hc_q.put((type, url)) #print "processed", url else: dbcurr.execute(‘select * from user where status=0 limit 1000‘) d = dbcurr.fetchall() if d: for item in d: try: dbcurr.execute(‘insertinto urlids(uk, start, limited, type, status) values("%s", 0, %s, 0,0)‘ % (item[1], str(ONESHAREPAGE))) dbcurr.execute(‘insertinto urlids(uk, start, limited, type, status) values("%s", 0, %s, 1,0)‘ % (item[1], str(ONEPAGE))) dbcurr.execute(‘insertinto urlids(uk, start, limited, type, status) values("%s", 0, %s, 2,0)‘ % (item[1], str(ONEPAGE))) dbcurr.execute(‘updateuser set status=1 where userid=%s‘ % (item[1],)) except Exception as ex: print "E6",str(ex) else: time.sleep(1) dbconn.commit() dbcurr.close() dbconn.close() for item in range(16): t =threading.Thread(target = req_worker, args = (item,)) t.setDaemon(True) t.start() s = threading.Thread(target = worker, args =()) s.setDaemon(True) s.start() response_worker()