最近总是要爬取一些东西,索性就把Python爬虫的相关内容都总结起来了,自己多动手还是好。
(1)普通的内容爬取
(2)保存爬取的图片/视频和文件和网页
(3)普通模拟登录
(4)处理验证码登录
(5)爬取js网站
(6)全网爬虫
(7)某个网站的站内所有目录爬虫
(8)多线程
(9)爬虫框架Scrapy
一,普通的内容爬取
#coding=utf-8 import urllib import urllib2 url = 'http://www.dataanswer.top' headers = { 'Host':'www.dataanswer.top', 'User-Agent':'Mozilla/5.0 (X11; Ubuntu; Linux i686; rv:31.0) Gecko/20100101 Firefox/31.0', #'Accept':'application/json, text/javascript, */*; q=0.01', #'Accept-Language':'zh-cn,zh;q=0.8,en-us;q=0.5,en;q=0.3', #'Accept-Encoding':'gzip,deflate', #'Referer':'http://www.dataanswer.top' } request = urllib2.Request(url,headers=headers) response = urllib2.urlopen(request) page = response.read() print page
二,保存爬取的图片/视频和文件和网页
#图片/视频和文件和网页的地址抓取下来后,利用模块urllib里的urlretrieve()方法下载下来:
#coding=utf-8 import urllib import urllib2 import os def getPage(url): request = urllib2.Request(url) response = urllib2.urlopen(request) return response.read() url='http://www.dataanswer.top/' result=getPage(url) file_name='test.doc' file_path='doc' if os.path.exists(file_path) == False: os.makedirs(file_path) local=os.path.join(file_path,file_name) f = open(local,"w+") f.write(result) f.close() #coding=utf-8 import urllib import urllib2 import os def getPage(url): request = urllib2.Request(url) response = urllib2.urlopen(request) return response.read() url='http://www.dataanswer.top/' #把该地址改成图片/文件/视频/网页的地址即可 result=getPage(url) file_name='test.doc' file_path='doc' if os.path.exists(file_path) == False: os.makedirs(file_path) local=os.path.join(file_path,file_name) urllib.urlretrieve(local)
三,普通模拟登录
import urllib import urllib2 import cookielib filename = 'cookie.txt' #声明一个MozillaCookieJar对象实例来保存cookie,之后写入文件 cookie = cookielib.MozillaCookieJar(filename) opener = urllib2.build_opener(urllib2.HTTPCookieProcessor(cookie)) postdata = urllib.urlencode({ 'name':'春天里', 'pwd':'1222222' }) #登录的URL loginUrl = 'http://www.dataanswer.top/LoginService?action=tologin' #模拟登录,并把cookie保存到变量 result = opener.open(loginUrl,postdata) #保存cookie到cookie.txt中 cookie.save(ignore_discard=True, ignore_expires=True) #利用cookie请求访问另一个网址 gradeUrl = 'http://www.dataanswer.top/LoginService?action=myHome' #请求访问 result = opener.open(gradeUrl) print result.read()
四,处理验证码登录
#先把验证码图片下载下来保存,再人工读入
#coding=utf-8 import sys, time, os, re import urllib, urllib2, cookielib loginurl = 'https://www.douban.com/accounts/login' cookie = cookielib.CookieJar() opener = urllib2.build_opener(urllib2.HTTPCookieProcessor(cookie)) params = { "form_email":"13161055481", "form_password":"wwwwwww", "source":"index_nav" #没有的话登录不成功 } #从首页提交登录 response=opener.open(loginurl) #验证成功跳转至登录页 print(response.geturl()) if response.geturl() == "https://www.douban.com/accounts/login": html=response.read() print(html) #验证码图片地址--图片地址加密怎么办??? imgurl=re.search('<img id="captcha_image" src="(.+?)" alt="captcha" class="captcha_image"/>', html) print(imgurl) if imgurl: url=imgurl.group(1) #将图片保存至同目录下 res=urllib.urlretrieve(url,'v.jpg') #获取captcha-id参数 captcha=re.search('<input type="hidden" name="captcha-id" value="(.+?)"/>',html) if captcha: vcode=raw_input('请输入图片上的验证码:') params["captcha-solution"]=vcode params["captcha-id"]=captcha.group(1) params["user_login"]="登录" #提交验证码验证 response=opener.open(loginurl, urllib.urlencode(params)) ''' 登录成功跳转至首页 ''' if response.geturl() == "https://www.douban.com/": print 'login success ! ' print '准备进行发帖' addtopicurl="http://www.douban.com/group/python/new_topic" res=opener.open(addtopicurl) html=res.read() else: print("Fail3") else: print("Fail2") else: print("Fail1") else: print("Fail0")
五,爬取js网站
#利用selenium模拟浏览器,结合html的解析
#coding=utf-8 #1、安装 python-pip #sudo apt-get install python-pip #2、安装selenium #sudo pip install -U selenium from selenium import webdriver driver = webdriver.Firefox() driver.get('http://www.newsmth.net/nForum/#!article/Intern/206790') html=driver.page_source.encode('utf-8','ignore') #这个函数获取页面的html print(html) driver.close()
六,全网爬虫
#广度优先,模拟爬取队列
#coding=utf-8 """ 全网爬取所有链接,包括外链--广度优先 """ import urllib2 import re from bs4 import BeautifulSoup import time #爬虫开始的时间 t=time.time() #设置的暂停爬取条数 N_STOP=10 #存放已经爬取过的url CHECKED_URL=[] #存放待爬取的url CHECKING_URL=[] #存放连接失败的url FAIL_URL=[] #存放不能连接的url ERROR_URL=[] #失败后允许连接的次数 RETRY=3 #连接超时时间 TIMEOUT=20 class url_node: def __init__(self,url): """ url节点初始化 :param url:String 当前url """ self.url=url self.content='' def __is_connectable(self): """ 检验url是否可以连接 """ #在允许连接次数下连接 for i in range(RETRY): try: #打开url没有报错,则表示可连接 response=urllib2.urlopen(self.url,timeout=TIMEOUT) return True except: #如果在尝试允许连接次数下报错,则不可连接 if i==RETRY-1: return False def get_next(self): """ 获取爬取该页中包含的其他所有的url """ soup=BeautifulSoup(self.content) #******************在此处可以从网页中解析你想要的内容************************************ next_urls=soup.findAll('a') if len(next_urls)!=0: for link in next_urls: tmp_url=link.get('href') #如果url不在爬取过的列表中也不在待爬取列表中则把其放到待爬列表中(没有确保该url有效) if tmp_url not in CHECKED_URL and tmp_url not in CHECKING_URL: CHECKING_URL.append(tmp_url) def run(self): if self.url: if self.__is_connectable(): try: #获取爬取页面的所有内容 self.content=urllib2.urlopen(self.url,timeout=TIMEOUT).read() #从该页面中获取url self.get_next() except: #把连接失败的存放起来 FAIL_URL.append(self.url) print('[!]Connect Failed') else: #把不能连接的存放起来 ERROR_URL.append(self.url) else: print("所给的初始url有问题!") if __name__=='__main__': #把初始的url放到待爬的列表中 CHECKING_URL.append('http://www.36dsj.com/') #不断的从待爬的列表中获取url进行爬取 ff=open("Mytest.txt",'w') i=0 for url in CHECKING_URL: #对该url进行爬取 url_node(url).run() #存放已经爬取过的url CHECKED_URL.append(url) #删除CHECKING_URL中已经爬取过的url CHECKING_URL.remove(url) i+=1 if i==N_STOP: #打出停止时的url,下次可以把该url作为初始继续 print url print("爬取过的列表长度:%d") % len(CHECKED_URL) print("待爬取的列表长度:%d") % len(CHECKING_URL) print("连接失败的列表长度:%d") % len(FAIL_URL) print("不能连接的列表长度:%d") % len(ERROR_URL) break ff.close() print("time:%d s") % (time.time()-t)
七,某个网站的站内所有目录爬虫
#把缩写的站内网址还原
#coding=utf-8 """ 爬取同一个网站所有的url,不包括外链 """ import urllib2 import re from bs4 import BeautifulSoup import time t=time.time() HOST='' CHECKED_URL=[] CHECKING_URL=[] RESULT=[] RETRY=3 TIMEOUT=20 class url_node: def __init__(self,url): """ url节点初始化 :param url:String 当前url """ self.url=self.handle_url(url,is_next_url=False) self.next_url=[] self.content='' def handle_url(self,url,is_next_url=True): """ 将所有的url处理成标准形式 """ global CHECKED_URL global CHECKING_URL #去掉尾部的‘/’ url=url[0:len(url)-1] if url.endswith('/') else url if url.find(HOST)==-1: if not url.startswith('http'): url='http://'+HOST+url if url.startswith('/') else 'http://'+HOST+'/'+url else: #如果含有http说明是外链,url的host不是当前的host,返回空 return else: if not url.startswith('http'): url='http://'+url if is_next_url: #下一层url放入待检测列表 if url not in CHECKING_URL: CHECKING_URL.append(url) else: #对于当前需要检测的url将参数都替换为1,然后加入规则表 #参数相同类型不同的url只检测一次 rule=re.compile(r'=.*?\&|=.*?$') result=re.sub(rule,'=1&',url) if result in CHECKED_URL: return '[!] Url has checked!' else: CHECKED_URL.append(result) RESULT.append(url) return url def __is_connectable(self): print("进入__is_connectable()函数") #检验是否可以连接 retry=3 timeout=2 for i in range(RETRY): try: #print("进入_..............函数") response=urllib2.urlopen(self.url,timeout=TIMEOUT) return True except: if i==retry-1: return False def get_next(self): #获取当前所有的url #print("进入get_next()函数") soup=BeautifulSoup(self.content) next_urls=soup.findAll('a') if len(next_urls)!=0: for link in next_urls: self.handle_url(link.get('href')) #print(link.text) def run(self): #print("进入run()函数") if self.url: #print self.url if self.__is_connectable(): try: self.content=urllib2.urlopen(self.url,timeout=TIMEOUT).read() self.get_next() except: print('[!]Connect Failed') #处理https开头的url的类和方法 class Poc: def run(self,url): global HOST global CHECKING_URL url=check_url(url) if not url.find('https'): HOST=url[:8] else: HOST=url[7:] for url in CHECKING_URL: print(url) url_node(url).run() def check_url(url): url='http://'+url if not url.startswith('http') else url url=url[0:len(url)-1] if url.endswith('/') else url for i in range(RETRY): try: response=urllib2.urlopen(url,timeout=TIMEOUT) return url except: raise Exception("Connect error") if __name__=='__main__': HOST='www.dataanswer.com' CHECKING_URL.append('http://www.dataanswer.com/') f=open('36大数据','w') for url in CHECKING_URL: f.write(url+'\n') print(url) url_node(url).run() print RESULT print "URL num:"+str(len(RESULT)) print("time:%d s") % (time.time()-t)
八,多线程
#对列和线程的结合
#!/usr/bin/env python # -*- coding:utf-8 -*- """ 一个简单的Python爬虫, 使用了多线程, 爬取豆瓣Top前250的所有电影 """ import urllib2, re, string import threading, Queue, time import sys reload(sys) sys.setdefaultencoding('utf8') _DATA = [] FILE_LOCK = threading.Lock() SHARE_Q = Queue.Queue() #构造一个不限制大小的的队列 _WORKER_THREAD_NUM = 3 #设置线程的个数 class MyThread(threading.Thread) : def __init__(self, func) : super(MyThread, self).__init__() #调用父类的构造函数 self.func = func #传入线程函数逻辑 def run(self) : self.func() def worker() : global SHARE_Q while not SHARE_Q.empty(): url = SHARE_Q.get() #获得任务 my_page = get_page(url) find_title(my_page) #获得当前页面的电影名 #write_into_file(temp_data) time.sleep(1) SHARE_Q.task_done() def get_page(url) : """ 根据所给的url爬取网页HTML Args: url: 表示当前要爬取页面的url Returns: 返回抓取到整个页面的HTML(unicode编码) Raises: URLError:url引发的异常 """ try : my_page = urllib2.urlopen(url).read().decode("utf-8") except urllib2.URLError, e : if hasattr(e, "code"): print "The server couldn't fulfill the request." print "Error code: %s" % e.code elif hasattr(e, "reason"): print "We failed to reach a server. Please check your url and read the Reason" print "Reason: %s" % e.reason return my_page def find_title(my_page) : """ 通过返回的整个网页HTML, 正则匹配前100的电影名称 Args: my_page: 传入页面的HTML文本用于正则匹配 """ temp_data = [] movie_items = re.findall(r'<span.*?class="title">(.*?)</span>', my_page, re.S) for index, item in enumerate(movie_items) : if item.find(" ") == -1 : #print item, temp_data.append(item) _DATA.append(temp_data) def main() : global SHARE_Q threads = [] douban_url = "http://movie.douban.com/top250?start={page}&filter=&type=" #向队列中放入任务, 真正使用时, 应该设置为可持续的放入任务 for index in xrange(10) : SHARE_Q.put(douban_url.format(page = index * 25)) for i in xrange(_WORKER_THREAD_NUM) : thread = MyThread(worker) thread.start() #线程开始处理任务 print("第%s个线程开始工作") % i threads.append(thread) for thread in threads : thread.join() SHARE_Q.join() with open("movie.txt", "w+") as my_file : for page in _DATA : for movie_name in page: my_file.write(movie_name + "\n") print "Spider Successful!!!" if __name__ == '__main__': main()
九,爬虫框架Scrapy
items.py:用来定义需要保存的变量,其中的变量用Field来定义,有点像python的字典
pipelines.py:用来将提取出来的Item进行处理,处理过程按自己需要进行定义
spiders:定义自己的爬虫
爬虫的类型也有好几种:
1)spider:最基本的爬虫,其他的爬虫一般是继承了该最基本的爬虫类,提供访问url,返回response的功能,会默认调用parse方法
2)CrawlSpider:继承spider的爬虫,实际使用比较多,设定rule规则进行网页的跟进与处理, 注意点:编写爬虫的规则的时候避免使用parse名,因为这会覆盖继承的spider的的方法parse造成错误。 其中比较重要的是对Rule的规则的编写,要对具体的网页的情况进行分析。
3)XMLFeedSpider 与 CSVFeedSpider
(1)打开命令行,执行:scrapy startproject tutorial(项目名称)
(2)scrapy.cfg是项目的配置文件,用户自己写的spider要放在spiders目录下面
(3)解析:name属性很重要,不同spider不能使用相同的name
start_urls是spider抓取网页的起始点,可以包括多个url
parse方法是spider抓到一个网页以后默认调用的callback,避免使用这个名字来定义自己的方法。
当spider拿到url的内容以后,会调用parse方法,并且传递一个response参数给它,response包含了抓到的网页的内容,在parse方法里,你可以从抓到的网页里面解析数据。
(3)开始抓取,进入生成的项目根目录tutorial/,执行 scrapy crawl dmoz, dmoz是spider的name。
(4)保存对象:在items.py中添加一些类,这些类用来描述我们要保存的数据
from scrapy.item import Item, Field
class DmozItem(Item):
title = Field()
link = Field()
desc = Field()
(5)执行scrapy crawl dmoz --set FEED_URI=items.json --set FEED_FORMAT=json后得到保存的文件
(6)让scrapy自动抓取网页上的所有链接
在parse方法里面提取我们需要的链接,然后构造一些Request对象,并且把他们返回,scrapy会自动的去抓取这些链接