import time import json import datetime import threading import requests from lxml import etree from queue import Queue # 爬取免费代理IP 来源xicidaili.com # 多线程验证代理ip是否可用 class ProxyTest: def __init__(self): self.test_url = "http://pv.sohu.com/cityjson?ie=utf-8" self.headers = {"User-Agent": "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/66.0.3359.139 Safari/537.36",} self.request_queue = Queue() def parse_url(self, url, proxies, timeout=3): return requests.get(url, headers=self.headers, proxies=proxies, timeout=timeout).content.decode() # 请求 def request(self): while True: # 获取ip地址 ip = self.request_queue.get() # 发起请求 try: starttime = datetime.datetime.now() html_str = self.parse_url(self.test_url, proxies={"http": ip}, timeout=5) endtime = datetime.datetime.now() use_time = endtime - starttime except Exception as e: # 请求超时 print("timeout %s" % ip) self.request_queue.task_done() continue # 检查返回html try: json_dict = json.loads(html_str[19:-1]) except: print("fail %s, use time %d" % (ip, use_time.seconds)) self.request_queue.task_done() continue if ip.startswith("http://"+json_dict["cip"]): # 代理可用 print("success %s, use time %d, %s" % (ip, use_time.seconds, html_str)) self.request_queue.task_done() # 保存到文件 with open("proxy_ok_ip.json", "a", encoding="utf-8") as f: f.write(ip) f.write("\n") else: # ip不是高匿代理 print("%s invalid, use time %d" % (ip, use_time.seconds)) self.request_queue.task_done() def run(self): # 读取ip地址文件 并存储到队列中 with open("proxy.json", "r", encoding="utf-8") as f: for line in f: self.request_queue.put(line.strip()) # 遍历,发送请求,获取响应 for i in range(30): # daemon=True 把子线程设置为守护线程,该线程不重要主线程结束,子线程结束 threading.Thread(target=self.request, daemon=True).start() self.request_queue.join() #让主线程等待阻塞,等待队列的任务完成之后再完成 print("主线程结束") class Proxy: def __init__(self): self.headers = { "User-Agent": "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/66.0.3359.139 Safari/537.36" } def start_urls_superfastip(self): return ["http://www.superfastip.com/welcome/freeip/%d" % i for i in range(1,11)] def get_content_list_superfastip(self, html_str): content_list = [] html = etree.HTML(html_str) tr_list = html.xpath(‘/html/body/div[3]/div/div/div[2]/div/table/tbody/tr‘) for tr in tr_list: if tr.xpath(‘./td[4]/text()‘)[0].strip() == ‘HTTP‘: item = {} item["ip"] = tr.xpath(‘./td[1]/text()‘)[0].strip() item["port"] = tr.xpath(‘./td[2]/text()‘)[0].strip() content_list.append(item) return content_list def start_urls_xici(self): return ["http://www.xicidaili.com/nn/%d" % i for i in range(1,6)] def get_content_list_xici(self, html_str): content_list = [] html = etree.HTML(html_str) tr_list = html.xpath(‘//table[@id="ip_list"]/tr‘)[1:] for tr in tr_list: item = {} item["ip"] = tr.xpath(‘./td[2]/text()‘)[0].strip() item["port"] = tr.xpath(‘./td[3]/text()‘)[0].strip() content_list.append(item) return content_list def start_urls_kuaidaili(self): return ["https://www.kuaidaili.com/free/inha/%d/" % i for i in range(1, 11)] def get_content_list_kuaidaili(self, html_str): content_list = [] html = etree.HTML(html_str) tr_list = html.xpath(‘//div[@id="list"]/table/tbody/tr‘) for tr in tr_list: item = {} item["ip"] = tr.xpath(‘./td[1]/text()‘)[0].strip() item["port"] = tr.xpath(‘./td[2]/text()‘)[0].strip() content_list.append(item) return content_list def start_urls_89ip(self): return ["http://www.89ip.cn/index_%d.html" % i for i in range(1, 11)] def get_content_list_89ip(self, html_str): content_list = [] html = etree.HTML(html_str) tr_list = html.xpath(‘//div[@class="layui-form"]/table/tbody/tr‘) for tr in tr_list: item = {} item["ip"] = tr.xpath(‘./td[1]/text()‘)[0].strip() item["port"] = tr.xpath(‘./td[2]/text()‘)[0].strip() content_list.append(item) return content_list def parse_url(self, url): return requests.get(url, headers=self.headers).content.decode() def save_content_list(self, content_list): with open("proxy.json", "a", encoding="utf-8") as f: for ip in content_list: f.write("http://%s:%s" % (ip["ip"], ip["port"])) f.write("\n") def run(self): # 构造请求地址列表 start_urls_xici = self.start_urls_xici() start_urls_89ip = self.start_urls_89ip() start_urls_kuaidaili = self.start_urls_kuaidaili() start_urls_superfastip = self.start_urls_superfastip() all_content_list = [] # 存放所有爬取到的ip for url in start_urls_superfastip: html_str = self.parse_url(url) # 获取响应 content_list = self.get_content_list_superfastip(html_str) # 处理响应 all_content_list.extend(content_list) # 将结果加到列表里 time.sleep(0.2) for url in start_urls_xici: html_str = self.parse_url(url) # 获取响应 content_list = self.get_content_list_xici(html_str) # 处理响应 all_content_list.extend(content_list) # 将结果加到列表里 time.sleep(0.2) for url in start_urls_kuaidaili: html_str = self.parse_url(url) content_list = self.get_content_list_kuaidaili(html_str) all_content_list.extend(content_list) time.sleep(0.2) for url in start_urls_89ip: html_str = self.parse_url(url) content_list = self.get_content_list_89ip(html_str) all_content_list.extend(content_list) time.sleep(0.2) print("抓取完成") self.save_content_list(all_content_list) if __name__ == ‘__main__‘: # 抓取数据 spider = Proxy() spider.run() # 检测ip是否可用 proxy = ProxyTest() proxy.run() print("最后可以用的代理IP在proxy_ok_ip.json")
原文地址:https://www.cnblogs.com/blog-rui/p/11031144.html
时间: 2025-01-17 10:42:48