python asyncio异步代理池

使用python asyncio实现了一个异步代理池,根据规则爬取代理网站上的免费代理,在验证其有效后存入redis中,定期扩展代理的数量并检验池中代理的有效性,移除失效的代理。同时用aiohttp实现了一个server,其他的程序可以通过访问相应的url来从代理池中获取代理。

源码

Github

环境

  • Python 3.5+
  • Redis
  • PhantomJS(可选)
  • Supervisord(可选)

因为代码中大量使用了asyncioasyncawait语法,它们是在Python3.5中才提供的,所以最好使用Python3.5及以上的版本,我使用的是Python3.6。

依赖

  • redis
  • aiohttp
  • bs4
  • lxml
  • requests
  • selenium

selenium包主要是用来操作PhantomJS的。

下面来对代码进行说明。

1. 爬虫部分

核心代码

async def start(self):
    for rule in self._rules:
        parser = asyncio.ensure_future(self._parse_page(rule)) # 根据规则解析页面来获取代理
        logger.debug(‘{0} crawler started‘.format(rule.__rule_name__))

        if not rule.use_phantomjs:
            await page_download(ProxyCrawler._url_generator(rule), self._pages, self._stop_flag) # 爬取代理网站的页面
        else:
            await page_download_phantomjs(ProxyCrawler._url_generator(rule), self._pages,
                                          rule.phantomjs_load_flag, self._stop_flag) # 使用PhantomJS爬取

        await self._pages.join()

        parser.cancel()

        logger.debug(‘{0} crawler finished‘.format(rule.__rule_name__))

上面的核心代码实际上是一个用asyncio.Queue实现的生产-消费者模型,下面是该模型的一个简单实现:

import asyncio
from random import random

async def produce(queue, n):
    for x in range(1, n + 1):
        print(‘produce ‘, x)
        await asyncio.sleep(random())
        await queue.put(x) # 向queue中放入item

async def consume(queue):
   while 1:
       item = await queue.get() # 等待从queue中获取item
       print(‘consume ‘, item)
       await asyncio.sleep(random())
       queue.task_done() # 通知queue当前item处理完毕 

async def run(n):
    queue = asyncio.Queue()
    consumer = asyncio.ensure_future(consume(queue))
    await produce(queue, n) # 等待生产者结束
    await queue.join()  # 阻塞直到queue不为空
    consumer.cancel() # 取消消费者任务,否则它会一直阻塞在get方法处

def aio_queue_run(n):
    loop = asyncio.get_event_loop()
    try:
        loop.run_until_complete(run(n)) # 持续运行event loop直到任务run(n)结束
    finally:
        loop.close()

if __name__ == ‘__main__‘:
    aio_queue_run(5)

运行上面的代码,一种可能的输出如下:

produce  1
produce  2
consume  1
produce  3
produce  4
consume  2
produce  5
consume  3
consume  4
consume  5

爬取页面

async def page_download(urls, pages, flag):
    url_generator = urls
    async with aiohttp.ClientSession() as session:
        for url in url_generator:
            if flag.is_set():
                break

            await asyncio.sleep(uniform(delay - 0.5, delay + 1))
            logger.debug(‘crawling proxy web page {0}‘.format(url))
            try:
                async with session.get(url, headers=headers, timeout=10) as response:
                    page = await response.text()
                    parsed = html.fromstring(decode_html(page)) # 使用bs4来辅助lxml解码网页:http://lxml.de/elementsoup.html#Using only the encoding detection
                    await pages.put(parsed)
                    url_generator.send(parsed) # 根据当前页面来获取下一页的地址
            except StopIteration:
                break
            except asyncio.TimeoutError:
                logger.error(‘crawling {0} timeout‘.format(url))
                continue # TODO: use a proxy
            except Exception as e:
                logger.error(e)

使用aiohttp实现的网页爬取函数,大部分代理网站都可以使用上面的方法来爬取,对于使用js动态生成页面的网站可以使用selenium控制PhantomJS来爬取——本项目对爬虫的效率要求不高,代理网站的更新频率是有限的,不需要频繁的爬取,完全可以使用PhantomJS。

解析代理

最简单的莫过于用xpath来解析代理了,使用Chrome浏览器的话,直接通过右键就能获得选中的页面元素的xpath:

安装Chrome的扩展“XPath Helper”就可以直接在页面上运行和调试xpath,十分方便:

BeautifulSoup不支持xpath,使用lxml来解析页面,代码如下:

async def _parse_proxy(self, rule, page):
    ips = page.xpath(rule.ip_xpath) # 根据xpath解析得到list类型的ip地址集合
    ports = page.xpath(rule.port_xpath) # 根据xpath解析得到list类型的ip地址集合

    if not ips or not ports:
        logger.warning(‘{2} crawler could not get ip(len={0}) or port(len={1}), please check the xpaths or network‘.
              format(len(ips), len(ports), rule.__rule_name__))
        return

    proxies = map(lambda x, y: ‘{0}:{1}‘.format(x.text.strip(), y.text.strip()), ips, ports)

    if rule.filters: # 根据过滤字段来过滤代理,如“高匿”、“透明”等
        filters = []
        for i, ft in enumerate(rule.filters_xpath):
            field = page.xpath(ft)
            if not field:
                logger.warning(‘{1} crawler could not get {0} field, please check the filter xpath‘.
                      format(rule.filters[i], rule.__rule_name__))
                continue
            filters.append(map(lambda x: x.text.strip(), field))

        filters = zip(*filters)
        selector = map(lambda x: x == rule.filters, filters)
        proxies = compress(proxies, selector)

    for proxy in proxies:
        await self._proxies.put(proxy) # 解析后的代理放入asyncio.Queue中

爬虫规则

网站爬取、代理解析、滤等等操作的规则都是由各个代理网站的规则类定义的,使用元类和基类来管理规则类。基类定义如下:

class CrawlerRuleBase(object, metaclass=CrawlerRuleMeta):

    start_url = None
    page_count = 0
    urls_format = None
    next_page_xpath = None
    next_page_host = ‘‘

    use_phantomjs = False
    phantomjs_load_flag = None

    filters = ()

    ip_xpath = None
    port_xpath = None
    filters_xpath = ()

各个参数的含义如下:

  • start_url(必需)

    爬虫的起始页面。

  • ip_xpath(必需)

    爬取IP的xpath规则。

  • port_xpath(必需)

    爬取端口号的xpath规则。

  • page_count

    爬取的页面数量。

  • urls_format

    页面地址的格式字符串,通过urls_format.format(start_url, n)来生成第n页的地址,这是比较常见的页面地址格式。

  • next_page_xpathnext_page_host

    由xpath规则来获取下一页的url(常见的是相对路径),结合host得到下一页的地址:next_page_host + url

  • use_phantomjs, phantomjs_load_flag

    use_phantomjs用于标识爬取该网站是否需要使用PhantomJS,若使用,需定义phantomjs_load_flag(网页上的某个元素,str类型)作为PhantomJS页面加载完毕的标志。

  • filters

    过滤字段集合,可迭代类型。用于过滤代理。

    爬取各个过滤字段的xpath规则,与过滤字段按顺序一一对应。

元类CrawlerRuleMeta用于管理规则类的定义,如:如果定义use_phantomjs=True,则必须定义phantomjs_load_flag,否则会抛出异常,不在此赘述。

目前已经实现的规则有西刺代理快代理360代理66代理秘密代理。新增规则类也很简单,通过继承CrawlerRuleBase来定义新的规则类YourRuleClass,放在proxypool/rules目录下,并在该目录下的__init__.py中添加from . import YourRuleClass(这样通过CrawlerRuleBase.__subclasses__()就可以获取全部的规则类了),重启正在运行的proxy pool即可应用新的规则。

2. 检验部分

免费的代理虽然多,但是可用的却不多,所以爬取到代理后需要对其进行检验,有效的代理才能放入代理池中,而代理也是有时效性的,还要定期对池中的代理进行检验,及时移除失效的代理。

这部分就很简单了,使用aiohttp通过代理来访问某个网站,若超时,则说明代理无效。

async def validate(self, proxies):
    logger.debug(‘validator started‘)
    while 1:
        proxy = await proxies.get()
        async with aiohttp.ClientSession() as session:
            try:
                real_proxy = ‘http://‘ + proxy
                async with session.get(self.validate_url, proxy=real_proxy, timeout=validate_timeout) as resp:
                    self._conn.put(proxy)
            except Exception as e:
                logger.error(e)

        proxies.task_done()

3. server部分

使用aiohttp实现了一个web server,启动后,访问http://host:port即可显示主页:

  • 访问http://host:port/get来从代理池获取1个代理,如:‘127.0.0.1:1080‘
  • 访问http://host:port/get/n来从代理池获取n个代理,如:"[‘127.0.0.1:1080‘, ‘127.0.0.1:443‘, ‘127.0.0.1:80‘]"
  • 访问http://host:port/count来获取代理池的容量,如:‘42‘

因为主页是一个静态的html页面,为避免每来一个访问主页的请求都要打开、读取以及关闭该html文件的开销,将其缓存到了redis中,通过html文件的修改时间来判断其是否被修改过,如果修改时间与redis缓存的修改时间不同,则认为html文件被修改了,则重新读取文件,并更新缓存,否则从redis中获取主页的内容。

返回代理是通过aiohttp.web.Response(text=ip.decode(‘utf-8‘))实现的,text要求str类型,而从redis中获取到的是bytes类型,需要进行转换。返回的多个代理,使用eval即可转换为list类型。

返回主页则不同,是通过aiohttp.web.Response(body=main_page_cache, content_type=‘text/html‘),这里body要求的是bytes类型,直接将从redis获取的缓存返回即可,conten_type=‘text/html‘必不可少,否则无法通过浏览器加载主页,而是会将主页下载下来——在运行官方文档中的示例代码的时候也要注意这点,那些示例代码基本上都没有设置content_type

这部分不复杂,注意上面提到的几点,而关于主页使用的静态资源文件的路径,可以参考之前的博客《aiohttp之添加静态资源路径》。

4. 运行

将整个代理池的功能分成了3个独立的部分:

  • proxypool

    定期检查代理池容量,若低于下限则启动代理爬虫并对代理检验,通过检验的爬虫放入代理池,达到规定的数量则停止爬虫。

  • proxyvalidator

    用于定期检验代理池中的代理,移除失效代理。

  • proxyserver

    启动server。

这3个独立的任务通过3个进程来运行,在Linux下可以使用supervisod来=管理这些进程,下面是supervisord的配置文件示例:

; supervisord.conf
[unix_http_server]
file=/tmp/supervisor.sock   

[inet_http_server]
port=127.0.0.1:9001       

[supervisord]
logfile=/tmp/supervisord.log
logfile_maxbytes=5MB
logfile_backups=10
loglevel=debug
pidfile=/tmp/supervisord.pid
nodaemon=false
minfds=1024
minprocs=200                 

[rpcinterface:supervisor]
supervisor.rpcinterface_factory = supervisor.rpcinterface:make_main_rpcinterface

[supervisorctl]
serverurl=unix:///tmp/supervisor.sock

[program:proxyPool]
command=python /path/to/ProxyPool/run_proxypool.py
redirect_stderr=true
stdout_logfile=NONE

[program:proxyValidator]
command=python /path/to/ProxyPool/run_proxyvalidator.py
redirect_stderr=true
stdout_logfile=NONE

[program:proxyServer]
command=python /path/to/ProxyPool/run_proxyserver.py
autostart=false
redirect_stderr=true
stdout_logfile=NONE

因为项目自身已经配置了日志,所以这里就不需要再用supervisord捕获stdout和stderr了。通过supervisord -c supervisord.conf启动supervisord,proxyPoolproxyServer则会随之自动启动,proxyServer需要手动启动,访问http://127.0.0.1:9001即可通过网页来管理这3个进程了:

supervisod的官方文档说目前(版本3.3.1)不支持python3,但是我在使用过程中没有发现什么问题,可能也是由于我并没有使用supervisord的复杂功能,只是把它当作了一个简单的进程状态监控和启停工具了。

原文地址:https://www.cnblogs.com/trunkslisa/p/9830609.html

时间: 2024-10-10 03:07:29

python asyncio异步代理池的相关文章

python开源IP代理池--IPProxys

今天博客开始继续更新,谢谢大家对我的关注和支持.这几天一直是在写一个ip代理池的开源项目.通过前几篇的博客,我们可以了解到突破反爬虫机制的一个重要举措就是代理ip.拥有庞大稳定的ip代理,在爬虫工作中将起到重要的作用,但是从成本的角度来说,一般稳定的ip池都很贵,因此我这个开源项目的意义就诞生了,爬取一些代理网站提供的免费ip(虽然70%都是不好使的,但是扛不住量大,网站多),检测有效性后存储到数据库中,同时搭建一个http服务器,提供一个api接口,供大家的爬虫程序调用. 好了,废话不多说,咱

快速构建Python爬虫IP代理池服务

在公司做分布式深网爬虫,搭建了一套稳定的代理池服务,为上千个爬虫提供有效的代理,保证各个爬虫拿到的都是对应网站有效的代理IP,从而保证爬虫快速稳定的运行,当然在公司做的东西不能开源出来.不过呢,闲暇时间手痒,所以就想利用一些免费的资源搞一个简单的代理池服务. 1.问题 代理IP从何而来? 刚自学爬虫的时候没有代理IP就去西刺.快代理之类有免费代理的网站去爬,还是有个别代理能用.当然,如果你有更好的代理接口也可以自己接入. 免费代理的采集也很简单,无非就是:访问页面页面 —> 正则/xpath提取

python爬虫-代理池的维护

简介 我们可以从网上或者付费获取大量代理,但是这其中很多依然不可用,那么搭建高效的代理池,对代理ip进行筛选是十分必要的 准备工作: 安装Redis数据库,还需要安装aiohttp.requests.redis-py.pyquery.Flask库,安装流程请百度自行查询 由于文件内容较多,所以就不一一讲解了,直接创建一个Python Package模块包,下次直接调用 创建一个Python Package包,取名为proxypool 一.创建一个setting.py文件,用于存放配置信息.代码如

打造IP代理池,Python爬取Boss直聘,帮你获取全国各类职业薪酬榜

爬虫面临的问题 不再是单纯的数据一把抓 多数的网站还是请求来了,一把将所有数据塞进去返回,但现在更多的网站使用数据的异步加载,爬虫不再像之前那么方便 很多人说js异步加载与数据解析,爬虫可以做到啊,恩是的,无非增加些工作量,那是你没遇到牛逼的前端,多数的解决办法只能靠渲染浏览器抓取,效率低下,接着往下走 ? 千姿百态的登陆验证 从12306的说说下面哪个糖是奶糖,到现在各大网站的滑动拼图.汉子点击解锁,这些操作都是在为了阻止爬虫的自动化运行. 你说可以先登录了复制cookie,但cookie也有

Python黑魔法 --- 异步IO( asyncio) 协程

https://www.jianshu.com/p/b5e347b3a17c python asyncio 网络模型有很多中,为了实现高并发也有很多方案,多线程,多进程.无论多线程和多进程,IO的调度更多取决于系统,而协程的方式,调度来自用户,用户可以在函数中yield一个状态.使用协程可以实现高效的并发任务.Python的在3.4中引入了协程的概念,可是这个还是以生成器对象为基础,3.5则确定了协程的语法.下面将简单介绍asyncio的使用.实现协程的不仅仅是asyncio,tornado和g

python爬虫+多线程+多进程+构建IP代理池

目标网站:静听网 网站url:http://www.audio699.com/ 目标文件:所有在线听的音频文件 附:我有个喜好就是晚上睡觉听有声书,然而很多软件都是付费才能听,免费在线网站虽然能听,但是禁ip很严重,就拿静听网来说,你听一个在线音频,不能一个没听完就点击下一集,甚至不能快进太快,否则直接禁你5分钟才能再听,真的是太太讨厌了... 于是我就想用爬虫给它爬下来存储本地就nice了. 我把我的大概分析步骤分享出来. 步骤1: 我查看静听网网页url有一个规律,基网址是http://ww

python代理池的构建2——代理ip是否可用的处理

上一篇博客地址:python代理池的构建1——代理IP类的构建,以及配置文件.日志文件.requests请求头 一.代理ip是否可用的处理 #-*-coding:utf-8-*- #check ip ''' 目标:检查代理IP速度,匿名程度以及支持的协议类型. 步骤: 检查代理IP速度和匿名程度; 代理IP速度:就是从发送请求到获取响应的时间间隔 匿名程度检查: 对http://httpbin.org/get 或https://httpbin.org/get 发送请求 如果响应的origin 中

python代理池的构建3——爬取代理ip

上篇博客地址:python代理池的构建2——代理ip是否可用的处理和检查 一.基础爬虫模块(Base_spider.py) #-*-coding:utf-8-*- ''' 目标: 实现可以指定不同URL列表,分组的XPATH和详情的XPATH,从不同页面上提取代理的IP,端口号和区域的通用爬虫; 步骤: 1.在base_ spider.py文件中,定义 一个BaseSpider类, 继承object 2.提供三个类成员变量: urls:代理IP网址的URL的列表 group_ xpath:分组X

python学习 —— 建立IP代理池

代码: from bs4 import BeautifulSoup from requests import Session, get, post from time import sleep import random import re, os class ProxyIpPool(object): def __init__(self,page): object.__init__(self) self.page = page def init_proxy_ip_pool(self): url