使用redis+flask维护动态代理池

在进行网络爬虫时,会经常有封ip的现象。可以使用代理池来进行代理ip的处理。

代理池的要求:多站抓取,异步检测。定时筛选,持续更新。提供接口,易于提取。

代理池架构:获取器,过滤器,代理队列,定时检测。

使用https://github.com/Germey/ProxyPool/tree/master/proxypool代码进行分析。

run.py里面的代码

from proxypool.api import app
from proxypool.schedule import Schedule

def main():

    s = Schedule()
    s.run()
    app.run()

if __name__ == ‘__main__‘:
    main()

首先运行了一个调度器,接着运行了一个接口。

调度器schedule.py代码

import time
from multiprocessing import Process
import asyncio
import aiohttp
try:
    from aiohttp.errors import ProxyConnectionError,ServerDisconnectedError,ClientResponseError,ClientConnectorError
except:
    from aiohttp import ClientProxyConnectionError as ProxyConnectionError,ServerDisconnectedError,ClientResponseError,ClientConnectorError
from proxypool.db import RedisClient
from proxypool.error import ResourceDepletionError
from proxypool.getter import FreeProxyGetter
from proxypool.setting import *
from asyncio import TimeoutError

class ValidityTester(object):
    test_api = TEST_API

    def __init__(self):
        self._raw_proxies = None
        self._usable_proxies = []

    def set_raw_proxies(self, proxies):
        self._raw_proxies = proxies
        self._conn = RedisClient()

    async def test_single_proxy(self, proxy):
        """
        text one proxy, if valid, put them to usable_proxies.
        """
        try:
            async with aiohttp.ClientSession() as session:
                try:
                    if isinstance(proxy, bytes):
                        proxy = proxy.decode(‘utf-8‘)
                    real_proxy = ‘http://‘ + proxy
                    print(‘Testing‘, proxy)
                    async with session.get(self.test_api, proxy=real_proxy, timeout=get_proxy_timeout) as response:
                        if response.status == 200:
                            self._conn.put(proxy)
                            print(‘Valid proxy‘, proxy)
                except (ProxyConnectionError, TimeoutError, ValueError):
                    print(‘Invalid proxy‘, proxy)
        except (ServerDisconnectedError, ClientResponseError,ClientConnectorError) as s:
            print(s)
            pass

    def test(self):
        """
        aio test all proxies.
        """
        print(‘ValidityTester is working‘)
        try:
            loop = asyncio.get_event_loop()
            tasks = [self.test_single_proxy(proxy) for proxy in self._raw_proxies]
            loop.run_until_complete(asyncio.wait(tasks))
        except ValueError:
            print(‘Async Error‘)

class PoolAdder(object):
    """
    add proxy to pool
    """

    def __init__(self, threshold):
        self._threshold = threshold
        self._conn = RedisClient()
        self._tester = ValidityTester()
        self._crawler = FreeProxyGetter()

    def is_over_threshold(self):
        """
        judge if count is overflow.
        """
        if self._conn.queue_len >= self._threshold:
            return True
        else:
            return False

    def add_to_queue(self):
        print(‘PoolAdder is working‘)
        proxy_count = 0
        while not self.is_over_threshold():
            for callback_label in range(self._crawler.__CrawlFuncCount__):
                callback = self._crawler.__CrawlFunc__[callback_label]
                raw_proxies = self._crawler.get_raw_proxies(callback)
                # test crawled proxies
                self._tester.set_raw_proxies(raw_proxies)
                self._tester.test()
                proxy_count += len(raw_proxies)
                if self.is_over_threshold():
                    print(‘IP is enough, waiting to be used‘)
                    break
            if proxy_count == 0:
                raise ResourceDepletionError

class Schedule(object):
    @staticmethod
    def valid_proxy(cycle=VALID_CHECK_CYCLE):
        """
        Get half of proxies which in redis
        """
        conn = RedisClient()
        tester = ValidityTester()
        while True:
            print(‘Refreshing ip‘)
            count = int(0.5 * conn.queue_len)
            if count == 0:
                print(‘Waiting for adding‘)
                time.sleep(cycle)
                continue
            raw_proxies = conn.get(count)
            tester.set_raw_proxies(raw_proxies)
            tester.test()
            time.sleep(cycle)

    @staticmethod
    def check_pool(lower_threshold=POOL_LOWER_THRESHOLD,
                   upper_threshold=POOL_UPPER_THRESHOLD,
                   cycle=POOL_LEN_CHECK_CYCLE):
        """
        If the number of proxies less than lower_threshold, add proxy
        """
        conn = RedisClient()
        adder = PoolAdder(upper_threshold)
        while True:
            if conn.queue_len < lower_threshold:
                adder.add_to_queue()
            time.sleep(cycle)

    def run(self):
        print(‘Ip processing running‘)
        valid_process = Process(target=Schedule.valid_proxy)
        check_process = Process(target=Schedule.check_pool)
        valid_process.start()
        check_process.start()

调度器里面有个run方法,里面运行了两个进程。一个进程valid_process是从网上获取代理放入数据库;另外一个进程check_process是定时的从数据库拿出代理进行检测。

valid_proxy是定时检测器,里面传入一个时间的参数cycle=VALID_CHECK_CYCLE,定义定时检测的时间。方法里首先定义一个RedisClient()进行数据库的连接,该方法定义在db.py中

import redis
from proxypool.error import PoolEmptyError
from proxypool.setting import HOST, PORT, PASSWORD

class RedisClient(object):
    def __init__(self, host=HOST, port=PORT):
        if PASSWORD:
            self._db = redis.Redis(host=host, port=port, password=PASSWORD)
        else:
            self._db = redis.Redis(host=host, port=port)

    def get(self, count=1):
        """
        get proxies from redis
        """        #从左侧批量获取的方法
        proxies = self._db.lrange("proxies", 0, count - 1)
        self._db.ltrim("proxies", count, -1)
        return proxies

    def put(self, proxy):
        """
        add proxy to right top
        """
        self._db.rpush("proxies", proxy)

    def pop(self):
        """
        get proxy from right.
        """
        try:
            return self._db.rpop("proxies").decode(‘utf-8‘)
        except:
            raise PoolEmptyError

    @property
    def queue_len(self):
        """
        get length from queue.
        """
        return self._db.llen("proxies")

    def flush(self):
        """
        flush db
        """
        self._db.flushall()

if __name__ == ‘__main__‘:
    conn = RedisClient()
    print(conn.pop())

接着还声明了ValidityTester(),用来检测代理是否可用,其中的test_single_proxy()方法是实现异步检测的关键。

check_pool()方法里面需要传入三个参数:两个代理池的上下界限,一个时间。里面有个PoolAdder的add_to_queue()方法。
add_to_queue()方法中使用了一个从网站抓取ip的类FreeProxyGetter(),在getter.py里面
from .utils import get_page
from pyquery import PyQuery as pq
import re

class ProxyMetaclass(type):
    """
        元类,在FreeProxyGetter类中加入
        __CrawlFunc__和__CrawlFuncCount__
        两个参数,分别表示爬虫函数,和爬虫函数的数量。
    """

    def __new__(cls, name, bases, attrs):
        count = 0
        attrs[‘__CrawlFunc__‘] = []
        for k, v in attrs.items():
            if ‘crawl_‘ in k:
                attrs[‘__CrawlFunc__‘].append(k)
                count += 1
        attrs[‘__CrawlFuncCount__‘] = count
        return type.__new__(cls, name, bases, attrs)

class FreeProxyGetter(object, metaclass=ProxyMetaclass):
    def get_raw_proxies(self, callback):
        proxies = []
        print(‘Callback‘, callback)
        for proxy in eval("self.{}()".format(callback)):
            print(‘Getting‘, proxy, ‘from‘, callback)
            proxies.append(proxy)
        return proxies

    def crawl_ip181(self):
        start_url = ‘http://www.ip181.com/‘
        html = get_page(start_url)
        ip_adress = re.compile(‘<tr.*?>\s*<td>(.*?)</td>\s*<td>(.*?)</td>‘)
        # \s* 匹配空格,起到换行作用
        re_ip_adress = ip_adress.findall(html)
        for adress, port in re_ip_adress:
            result = adress + ‘:‘ + port
            yield result.replace(‘ ‘, ‘‘)

    def crawl_kuaidaili(self):
        for page in range(1, 4):
            # 国内高匿代理
            start_url = ‘https://www.kuaidaili.com/free/inha/{}/‘.format(page)
            html = get_page(start_url)
            ip_adress = re.compile(
                ‘<td data-title="IP">(.*)</td>\s*<td data-title="PORT">(\w+)</td>‘
            )
            re_ip_adress = ip_adress.findall(html)
            for adress, port in re_ip_adress:
                result = adress + ‘:‘ + port
                yield result.replace(‘ ‘, ‘‘)

    def crawl_xicidaili(self):
        for page in range(1, 4):
            start_url = ‘http://www.xicidaili.com/wt/{}‘.format(page)
            html = get_page(start_url)
            ip_adress = re.compile(
                ‘<td class="country"><img src="http://fs.xicidaili.com/images/flag/cn.png" alt="Cn" /></td>\s*<td>(.*?)</td>\s*<td>(.*?)</td>‘
            )
            # \s* 匹配空格,起到换行作用
            re_ip_adress = ip_adress.findall(html)
            for adress, port in re_ip_adress:
                result = adress + ‘:‘ + port
                yield result.replace(‘ ‘, ‘‘)

    def crawl_daili66(self, page_count=4):
        start_url = ‘http://www.66ip.cn/{}.html‘
        urls = [start_url.format(page) for page in range(1, page_count + 1)]
        for url in urls:
            print(‘Crawling‘, url)
            html = get_page(url)
            if html:
                doc = pq(html)
                trs = doc(‘.containerbox table tr:gt(0)‘).items()
                for tr in trs:
                    ip = tr.find(‘td:nth-child(1)‘).text()
                    port = tr.find(‘td:nth-child(2)‘).text()
                    yield ‘:‘.join([ip, port])

    def crawl_data5u(self):
        for i in [‘gngn‘, ‘gnpt‘]:
            start_url = ‘http://www.data5u.com/free/{}/index.shtml‘.format(i)
            html = get_page(start_url)
            ip_adress = re.compile(
                ‘ <ul class="l2">\s*<span><li>(.*?)</li></span>\s*<span style="width: 100px;"><li class=".*">(.*?)</li></span>‘
            )
            # \s * 匹配空格,起到换行作用
            re_ip_adress = ip_adress.findall(html)
            for adress, port in re_ip_adress:
                result = adress + ‘:‘ + port
                yield result.replace(‘ ‘, ‘‘)

    def crawl_kxdaili(self):
        for i in range(1, 4):
            start_url = ‘http://www.kxdaili.com/ipList/{}.html#ip‘.format(i)
            html = get_page(start_url)
            ip_adress = re.compile(‘<tr.*?>\s*<td>(.*?)</td>\s*<td>(.*?)</td>‘)
            # \s* 匹配空格,起到换行作用
            re_ip_adress = ip_adress.findall(html)
            for adress, port in re_ip_adress:
                result = adress + ‘:‘ + port
                yield result.replace(‘ ‘, ‘‘)

    def crawl_premproxy(self):
        for i in [‘China-01‘, ‘China-02‘, ‘China-03‘, ‘China-04‘, ‘Taiwan-01‘]:
            start_url = ‘https://premproxy.com/proxy-by-country/{}.htm‘.format(
                i)
            html = get_page(start_url)
            if html:
                ip_adress = re.compile(‘<td data-label="IP:port ">(.*?)</td>‘)
                re_ip_adress = ip_adress.findall(html)
                for adress_port in re_ip_adress:
                    yield adress_port.replace(‘ ‘, ‘‘)

    def crawl_xroxy(self):
        for i in [‘CN‘, ‘TW‘]:
            start_url = ‘http://www.xroxy.com/proxylist.php?country={}‘.format(
                i)
            html = get_page(start_url)
            if html:
                ip_adress1 = re.compile(
                    "title=‘View this Proxy details‘>\s*(.*).*")
                re_ip_adress1 = ip_adress1.findall(html)
                ip_adress2 = re.compile(
                    "title=‘Select proxies with port number .*‘>(.*)</a>")
                re_ip_adress2 = ip_adress2.findall(html)
                for adress, port in zip(re_ip_adress1, re_ip_adress2):
                    adress_port = adress + ‘:‘ + port
                    yield adress_port.replace(‘ ‘, ‘‘)
具体使用方法可以看GitHub。。。。。。

原文地址:https://www.cnblogs.com/xxp17457741/p/9495180.html

时间: 2024-09-30 13:36:25

使用redis+flask维护动态代理池的相关文章

使用redis所维护的代理池抓取微信文章

搜狗搜索可以直接搜索微信文章,本次就是利用搜狗搜搜出微信文章,获得详细的文章url来得到文章的信息.并把我们感兴趣的内容存入到mongodb中. 因为搜狗搜索微信文章的反爬虫比较强,经常封IP,所以要在封了IP之后切换IP,这里用到github上的一个开源类,当运行这个类时,就会动态的在redis中维护一个ip池,并通过flask映射到网页中,可以通过访问 localhost:5000/get/ 来获取IP 这是搜狗微信搜索的页面, 构造搜索url .搜索时会传递的参数,通过firefox浏览器

爬虫搭建动态代理池

代理是什么? 代理实际上就是代理服务器, 代理服务器的工作机制很象我们生活中常常提及的代理商,假设你的机器为A机,你想获得的数据由B机提供,代理服务器为C机,那么具体的连接过程是这样的. 首先,A机需要B机的数据,它与C机建立连接,C机接收到A机的数据请求后,与B机建立连接,下载A机所请求的B机上的数据到本地,再将此数据发送至A机,完成代理任务.如图(图片有点丑): 为什么要使用代理? 我们在做爬虫的过程中经常会遇到这样的情况,最初爬虫正常运行,正常抓取数据,一切看起来都是那么美好,然而一杯茶的

代理池的维护(一)

介绍代理池的维护 一.准备工作 安装redis数据库并启动服务,另外还需安装atihttp,requests, redis-py,pyquery,flask 二.代理池的架构 分为4个模块:存储模块,获取模块,检测模块,借口模块. 1.存储模块:使用Redis的有序集合,用来做代理的去重和状态标识,同时也是中心模块和基础模块,将其他模块串联起来 2.获取模块:定时从代理网站获取代理,将获取的代理传递给存储模块,并保存到数据库 3.检测模块:定时通过存储模块获取所有代理,对代理进行检测,根据不同的

用Flask+Redis维护代理池

为什么要用代理池? 许多网站有专门的反爬虫措施,可能遇到封IP等问题. 互联网上公开了大量免费的代理,利用好资源. 通过定时的检测维护同样可以得到多个可用的代理. 代理池要求 多站抓取,异步检测 定时筛选,持续更新 提供接口,易于读取 代理池架构 原文地址:https://www.cnblogs.com/carious/p/10101270.html

Flask开发系列之Flask+redis实现IP代理池

Flask开发系列之Flask+redis实现IP代理池 6.11-6.15号完善... 简易实现版 import requests import re import time import redis from bloom_filter import BloomFilter import ast pool = redis.ConnectionPool(host='localhost',password='xxx', port=6379, decode_responses=True) r = r

python爬虫-代理池的维护

简介 我们可以从网上或者付费获取大量代理,但是这其中很多依然不可用,那么搭建高效的代理池,对代理ip进行筛选是十分必要的 准备工作: 安装Redis数据库,还需要安装aiohttp.requests.redis-py.pyquery.Flask库,安装流程请百度自行查询 由于文件内容较多,所以就不一一讲解了,直接创建一个Python Package模块包,下次直接调用 创建一个Python Package包,取名为proxypool 一.创建一个setting.py文件,用于存放配置信息.代码如

爬虫技术:代理池的维护

一:代理池维护的模块 1. 抓取模块Crawl,负责从代理网站上抓取代理 ---------------抓取模块 2. 获取代理Getter,负责获取抓取模块返回的值,并判断是否超过存储模块的最大容量.---------------获取模块 3.存储模块Redis,负责将抓取的每一条代理存放至有序集合中.---------------存储模块 4.测试模块Tester,负责异步测试每个代理是否可用.---------------测试模块 5.调度模块Schedule,负责测试,获取,和对外api

25、连接池(DBCP、C3P0)、动态代理与分页技术

连接池 思考: 程序中连接如何管理? 1. 连接资源宝贵:需要对连接管理 2. 连接: a) 操作数据库,创建连接 b) 操作结束, 关闭! 分析: 涉及频繁的连接的打开.关闭,影响程序的运行效率! 连接管理: 预先创建一组连接,有的时候每次取出一个: 用完后,放回: 学习连接池: a. 自定义一个连接池 b. 学习优秀的连接池组件 a) DBCP b) C3P0 动态代理 Java提供了一个Proxy类,调用它的newInstance方法可以生成某个对象的代理对象,使用该方法生成代理对象时,需

【Java EE 学习第15天】【自定义数据库连接池之动态代理的使用】

一.动态代理的作用 使用动态代理可以拦截一个对象某个方法的执行,并执行自定义的方法,其本质是反射 优点:灵活 缺点:由于其本质是反射,所以执行速度相对要慢一些 二.数据库连接池设计思想 1.为什么要使用数据库连接池:创建Connection对象的过程是非常耗时的,为了保证Connection可以重用,应该对Connection进行管理. 2.设计要求: (1)连接池能够实现维护多个连接,必须要保证每一个线程获取到的是不同的Connection对象. (2)提供一个方法能够回收连接. 3.最基本的