爬虫技术:代理池的维护

一:代理池维护的模块

  1. 抓取模块Crawl,负责从代理网站上抓取代理 ---------------抓取模块

  2. 获取代理Getter,负责获取抓取模块返回的值,并判断是否超过存储模块的最大容量。---------------获取模块

  3.存储模块Redis,负责将抓取的每一条代理存放至有序集合中。---------------存储模块

  4.测试模块Tester,负责异步测试每个代理是否可用。---------------测试模块

  5.调度模块Schedule,负责测试,获取,和对外api的接口运转。---------------调度模块

  6.Flask对外接口,通过视图函数,获取jison值。---------------接口模块

  7.utilis工具类,提供了每个网站的页面解析。---------------工具类模块

二:抓取模块

import re
from ulits import get_66ip_content,get_xc_content,get_89_content
from bs4 import BeautifulSoup

class Crawl(object):

    def get_proxy(self):
        proxies = list()
        for value in self.crawl_proxy_66(5):
            proxies.append(value)
        for value in self.crawl_get_89proxy(5):
            proxies.append(value)
        for value in self.crawl_xc_proxy(5):
            proxies.append(value)
        return proxies

    def crawl_proxy_66(self, total_page):
        """
        :return:代理
        """
        start_url = "http://www.66ip.cn/{}.html"
        url_list = [start_url.format(i) for i in range(1,total_page + 1)]
        for url in url_list:
            content = get_66ip_content(url)
            soup = BeautifulSoup(content,"lxml")
            div = soup.find("div", id="main")
            table = div.table
            tr_list = table.find_all("tr")
            for tr in tr_list[1:]:
                tr = str(tr) # 把soup.element强行变成str字符串
                pattern = re.compile(r"<td>(.*?)</td><td>(.*?)</td>",re.S)
                result = re.search(pattern,tr)
                ip = result.group(1)
                port = result.group(2)
                yield ":".join([ip,port])

    def crawl_xc_proxy(self,total_page):
        start_url = "https://www.xicidaili.com/wt/{}"
        url_list = [start_url.format(i) for i in range(1,total_page + 1)]
        for url in url_list:
            html = get_xc_content(url)
            soup = BeautifulSoup(html, "lxml")
            div = soup.find("div",id="body")
            table = div.table
            tr_list = table.find_all("tr")
            for tr in tr_list[1:]:
                tr = str(tr)
                pattern = re.compile("<td>(.*?)</td>", re.S)
                result = re.findall(pattern, tr)
                if len(result) > 2:
                    ip = result[0]
                    port = result[1]
                    yield ":".join([ip, port])

    def crawl_get_89proxy(self,total_page):
        start_url = "http://www.89ip.cn/index_{}.html"
        url_list = [start_url.format(i) for i in range(1,total_page+1)]
        for url in url_list:
            html = get_89_content(url)
            soup = BeautifulSoup(html, "lxml")
            # 获取class属性的标准写法
            div = soup.find(name="div", attrs={"class": "layui-form"})
            table = div.table
            tr_list = table.find_all("tr")
            for tr in tr_list[1:]:
                tr = str(tr)
                pattern = re.compile("<td>(.*?)</td>", re.S)
                result = re.findall(pattern, tr)
                if len(result) > 2:
                    ip = (result[0]).replace("\n", "")
                    ip = ip.replace("\t", "")
                    port = (result[1]).replace("\t", "")
                    port = port.replace("\n", "")
                    yield ":".join([ip, port])

三:获取模块

from Redis import ReidsClient
from Crawl import Crawl

POOL_MAX_COUNT = 10000
class Getter(object):

    def __init__(self):
       self.redis = ReidsClient()
       self.crwal = Crawl()

    def is_exceed_poolcount(self):
        """
        判读是否超过代理池的最大容量
        :return: True超过,False没有超过
        """
        ret = self.redis.get_count() > POOL_MAX_COUNT
        return ret

    def run(self):
        print("计数器开始计时")
        if not self.is_exceed_poolcount():
            proxies = self.crwal.get_proxy()
            for proxy in proxies:
                self.redis.add(proxy)

if __name__ == ‘__main__‘:
    g = Getter()
    g.run()

四:存储模块

MAX_SOCRE = 100
MIN_SCORE = 0
INITIAL_SCORE = 10
REDIS_HOST = "127.0.0.1"
REDIS_PORT = 6379
REDIS_PASSWORD = None
REDIS_KEY = "proxies"
import redis
from random import choice

class ReidsClient(object):

    def __init__(self,host=REDIS_HOST,port=REDIS_PORT, password=REDIS_PASSWORD):
        self.db = redis.StrictRedis(host=host,port=port,password=password,decode_responses=True) # 这样写存的数据是字符串格式

    def add(self, proxy, score=INITIAL_SCORE):
        """
        添加代理,设置分数最高
        :param proxy:代理
        :param score:分数
        :return:添加结果
        """
        # 从有序集合REDIS_KEY中获取proxy的权重分,没有添加
        if not self.db.zscore(REDIS_KEY,proxy):
            # 将新的proxy放入redis的REDIS_KEY集合中,
            return self.db.zadd(REDIS_KEY,score,proxy)   # 这里容易报一个错是:AttributionErro:int object have no arrtibution items,原因是redis这个包的版本问题
       # 解决包的版本问题:pip install redis==2.10.6
    def get_random(self):
        """
        随机获取代理,获取顺序是最高分的代理
        :return:随机代理
        """
        # 获取分数最高分数
        result = self.db.zrangebyscore(REDIS_KEY,MAX_SOCRE,MAX_SOCRE)
        if len(result):
            return choice(result) # 随机抽取,保证每个代理都可能被获取到。
        else:
            result = self.db.zrevrange(REDIS_KEY,MIN_SCORE,MAX_SOCRE) # 拿不到最高分的代理,那么就拿次高分的代理
            if len(result) > 1:
                return result[0]
            else:
                return "代理池为空"

    def decrease(self,proxy):
        """
        代理不能用的话,分数就减1,分数为0,从代理池中删除
        :param proxy:代理
        :return:修改后的分数
        """
        score = self.db.zscore(REDIS_KEY,proxy)
        if score and score > MIN_SCORE:
            print("代理",proxy,"当前的分数",score,"减1")
            return self.db.zincrby(REDIS_KEY,proxy,-1)

        else:
            print("代理", proxy, "当前的分数", score, "移除")
            return self.db.zrem(REDIS_KEY,proxy)  # 删除无效代理

    def exist(self,proxy):
        """
        判断代理是否存在
        :param proxy:代理
        :return:True or False
        """
        result = self.db.zscore(REDIS_KEY,proxy)
        if result != None:
            return True
        else:
            return False

    def set_max_value(self,proxy):
        """
         将代理设置为MAX_SCORE值
        :param proxy: 代理
        :return:设置结束
        """
        print("代理",proxy,"可用","设置值为",MAX_SOCRE)
        return self.db.zadd(REDIS_KEY,MAX_SOCRE,proxy)

    def get_count(self):
        """
        :return: 返回有序集合中的数量
        """
        return self.db.zcard(REDIS_KEY)

    def get_all(self):
        """
        :return: 获取所有元素,返回列表
        """
        return self.db.zrangebyscore(REDIS_KEY,MIN_SCORE,MAX_SOCRE)

五:测试模块

import time

VALID_STATUSC_CODES = [200]
TEST_URL = "http://www.baidu.com"
BATCH_TEST_SIZE = 100
from Redis import ReidsClient
import aiohttp
import asyncio

class Test(object):
    def __init__(self):
        self.redis = ReidsClient()

    async def test_single_proxy(self,proxy):
        """
        测试单个代理
        :param proxy: 代理
        :return: None
        """
        coon = aiohttp.TCPConnector(verify_ssl=False)  # 区别于requests库;是一个异步非阻塞的库
        async with aiohttp.ClientSession(connector=coon) as session:
            try:
                if isinstance(proxy,bytes):
                    proxy = proxy.decode("utf-8")
                real_proxy = "http://" + proxy
                print("正在测试",proxy)
                async with session.get(TEST_URL,proxy=real_proxy,timeout=15) as response:
                    if response.status in VALID_STATUSC_CODES:
                        self.redis.set_max_value(proxy) # 能用,给代理设置最高的权重分,存入数据库
                        print("代理可以用",proxy)
                    else:
                        self.redis.decrease(proxy) # 超时未请求到,给此代理减分
                        print("响应码不合格",proxy)

            except (ConnectionError,TimeoutError,AttributeError):
                self.redis.decrease(proxy)  # 发生故障给此代理减分
                print("代理请求失败",proxy)

    def run(self):
        """
        测试主函数
        :return:
        """
        print("测试函数开始运行")
        try:
            proxies = self.redis.get_all() # 从数据库中获取全部的代理
            loop = asyncio.get_event_loop()
            for i in range(0,len(proxies),BATCH_TEST_SIZE):
                test_proxies = proxies[i:i + BATCH_TEST_SIZE]
                tasks = [self.test_single_proxy(proxy) for proxy in test_proxies]
                loop.run_until_complete(asyncio.wait(tasks))
                time.sleep(5)
        except Exception as e:
            print("测试器发生错误",e.args)

六:调度模块

import time

TESTER_CYCLE = 20
GETTER_CYCLE = 20
TESTER_ENABLED = True
GETTER_ENABLED = True
API_ENABLED = True
API_HOST="localhost"
API_PORT="8888"

from multiprocessing import Process
from Flask import app
from Getter import Getter
from Tester import Test

class Scheduler(object):
    def schedule_tester(self,cycle=TESTER_CYCLE):
        """
        定时测试代理
        :param cycle:
        :return:
        """
        tester = Test()
        while True:
            print("测试器开始运行")
            tester.run()
            time.sleep(cycle)

    def scheduler_getter(self,cycle=GETTER_CYCLE):
        """
        定时获取代理
        :param cycle:
        :return:
        """
        getter = Getter()
        while True:
            print("开始抓取代理")
            getter.run()
            time.sleep(cycle)

    def scheduler_api(self):
        """
        开启api
        :return:
        """
        app.run(API_HOST,API_PORT)

    def run(self):
        print("代理池开始运行")
        if TESTER_ENABLED:
            tester_process = Process(target=self.schedule_tester)
            tester_process.start()
        if GETTER_ENABLED:
            getter_process = Process(target=self.scheduler_getter)
            getter_process.start()
        if TESTER_ENABLED:
            api_process = Process(target=self.scheduler_api)
            api_process.start()

if __name__ == ‘__main__‘:

    s = Scheduler()
    s.run()

七:接口

from flask import Flask,g
from Redis import ReidsClient

__all__ = ["app"]
app = Flask(__name__)

def get_coon():
    if not hasattr(g,"redis"):
        g.redis = ReidsClient()
    return g.redis

@app.route("/")
def index():
    return "<h2>Welcome to Proxy Pool System</h2>"

@app.route("/random")
def get_proxy():
    """
    获得随机代理
    :return: 随机代理
    """
    coon = get_coon()
    return coon.get_random()

@app.route("/count")
def get_count():
    """
    获取代理池的总量
    :return:
    """
    coon = get_coon()
    return str(coon.get_count())

if __name__ == ‘__main__‘:
    app.run()

八:运行结果截图

程序运行情况

数据库情况

接口访问情况

九:本地使用

import requests
PROXY_URL = "http://localhost:8888/random"

def get_proxy():
    try:
        response = requests.get(PROXY_URL)
        if response.status_code == 200:
            return response.content.decode("utf-8")

    except ConnectionError:
        return None

get_proxy()
print(get_proxy())

十:总结

为什么要使用Flask接口呢:

  1.使用本地的redis链接,直接调取里面的random方法也可以拿到可用的代理,但是这样就暴露了reids的用户名和密码,对数据不安全。

  2.代理池要部署到远端服务器,而远端的reids只允许本地连接,那么我们就不能获取代理。

  3.爬虫主机,不是用python语言编写,就无法使用RedisClient这个类了,这样没有扩展性。

  4.如果reids数据库有更新,那么我们就得修改爬虫端的程序,比较麻烦。

  

原文地址:https://www.cnblogs.com/meloncodezhang/p/11592325.html

时间: 2024-07-30 07:02:30

爬虫技术:代理池的维护的相关文章

python3编写网络爬虫18-代理池的维护

一.代理池的维护 上面我们利用代理可以解决目标网站封IP的问题 在网上有大量公开的免费代理 或者我们也可以购买付费的代理IP但是无论是免费的还是付费的,都不能保证都是可用的 因为可能此IP被其他人使用来爬取同样的目标站点而被封禁或者代理服务器突然故障或者网络繁忙 一旦选用了一个不可用的代理,这势必会影响爬虫的工作效率 1.准备工作 需要安装Redis数据库并启动服务 另外还需要安装aiohttp.requests.redis-py.pyquery.flask库 redis数据库安装 下载地址 h

代理池的维护(一)

介绍代理池的维护 一.准备工作 安装redis数据库并启动服务,另外还需安装atihttp,requests, redis-py,pyquery,flask 二.代理池的架构 分为4个模块:存储模块,获取模块,检测模块,借口模块. 1.存储模块:使用Redis的有序集合,用来做代理的去重和状态标识,同时也是中心模块和基础模块,将其他模块串联起来 2.获取模块:定时从代理网站获取代理,将获取的代理传递给存储模块,并保存到数据库 3.检测模块:定时通过存储模块获取所有代理,对代理进行检测,根据不同的

快速构建Python爬虫IP代理池服务

在公司做分布式深网爬虫,搭建了一套稳定的代理池服务,为上千个爬虫提供有效的代理,保证各个爬虫拿到的都是对应网站有效的代理IP,从而保证爬虫快速稳定的运行,当然在公司做的东西不能开源出来.不过呢,闲暇时间手痒,所以就想利用一些免费的资源搞一个简单的代理池服务. 1.问题 代理IP从何而来? 刚自学爬虫的时候没有代理IP就去西刺.快代理之类有免费代理的网站去爬,还是有个别代理能用.当然,如果你有更好的代理接口也可以自己接入. 免费代理的采集也很简单,无非就是:访问页面页面 —> 正则/xpath提取

python爬虫-代理池的维护

简介 我们可以从网上或者付费获取大量代理,但是这其中很多依然不可用,那么搭建高效的代理池,对代理ip进行筛选是十分必要的 准备工作: 安装Redis数据库,还需要安装aiohttp.requests.redis-py.pyquery.Flask库,安装流程请百度自行查询 由于文件内容较多,所以就不一一讲解了,直接创建一个Python Package模块包,下次直接调用 创建一个Python Package包,取名为proxypool 一.创建一个setting.py文件,用于存放配置信息.代码如

爬虫IP代理池

下载安装 下载源码: git clone [email protected]:jhao104/proxy_pool.git 或者直接到https://github.com/jhao104/proxy_pool/releases 下载zip文件 安装依赖: pip install -r requirements.txt 配置Config/setting.py: # Config/setting.py 为项目配置文件 # 配置DB DATABASES = { "default": { &q

打造IP代理池,Python爬取Boss直聘,帮你获取全国各类职业薪酬榜

爬虫面临的问题 不再是单纯的数据一把抓 多数的网站还是请求来了,一把将所有数据塞进去返回,但现在更多的网站使用数据的异步加载,爬虫不再像之前那么方便 很多人说js异步加载与数据解析,爬虫可以做到啊,恩是的,无非增加些工作量,那是你没遇到牛逼的前端,多数的解决办法只能靠渲染浏览器抓取,效率低下,接着往下走 ? 千姿百态的登陆验证 从12306的说说下面哪个糖是奶糖,到现在各大网站的滑动拼图.汉子点击解锁,这些操作都是在为了阻止爬虫的自动化运行. 你说可以先登录了复制cookie,但cookie也有

爬虫搭建动态代理池

代理是什么? 代理实际上就是代理服务器, 代理服务器的工作机制很象我们生活中常常提及的代理商,假设你的机器为A机,你想获得的数据由B机提供,代理服务器为C机,那么具体的连接过程是这样的. 首先,A机需要B机的数据,它与C机建立连接,C机接收到A机的数据请求后,与B机建立连接,下载A机所请求的B机上的数据到本地,再将此数据发送至A机,完成代理任务.如图(图片有点丑): 为什么要使用代理? 我们在做爬虫的过程中经常会遇到这样的情况,最初爬虫正常运行,正常抓取数据,一切看起来都是那么美好,然而一杯茶的

使用redis所维护的代理池抓取微信文章

搜狗搜索可以直接搜索微信文章,本次就是利用搜狗搜搜出微信文章,获得详细的文章url来得到文章的信息.并把我们感兴趣的内容存入到mongodb中. 因为搜狗搜索微信文章的反爬虫比较强,经常封IP,所以要在封了IP之后切换IP,这里用到github上的一个开源类,当运行这个类时,就会动态的在redis中维护一个ip池,并通过flask映射到网页中,可以通过访问 localhost:5000/get/ 来获取IP 这是搜狗微信搜索的页面, 构造搜索url .搜索时会传递的参数,通过firefox浏览器

Python爬虫之ip代理池

可能在学习爬虫的时候,遇到很多的反爬的手段,封ip 就是其中之一. 对于封IP的网站.需要很多的代理IP,去买代理IP,对于初学者觉得没有必要,每个卖代理IP的网站有的提供了免费IP,可是又很少,写了个IP代理池 .学习应该就够了 ip代理池: 1,在各大网站爬去免费代理ip2,检查ip可用 可用存入数据库1和23,在数据库1中拿出少量代理ip存入数据库2(方便维护)4,定时检查数据库1和数据库2的代理数量,以及是否可用5,调用端口 1,在各大网站爬去免费代理ip 1 def IPList_61