import requests
import json
from retrying import retry
from lxml import etree
from queue import Queue
import threading
class QiuShi:
def __init__(self):
# 定义三个队列
self.url_queue = Queue()
self.html_queue = Queue()
self.content_list_queue = Queue()
self.headers = {
"User-Agent":"Mozilla/5.0 (Macintosh; Intel Mac OS X 10_13_0) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/61.0.3163.100 Safari/537.36"
}
def get_url_list(self):
url_list = [‘https://www.qiushibaike.com/8hr/page/{}/‘.format(i) for i in range(1, 14)]
for url in url_list:
# 将url使用put方法放入队列
self.url_queue.put(url)
@retry(stop_max_attempt_number=3)
def _parse_url(self, url):
response = requests.get(url, headers=self.headers, timeout=3)
assert response.status_code == 200
return etree.HTML(response.content)
def parse_url(self):
# 这里需要一个url。
# 因为url = self.url_queue.get()只会从队列里取一次url.所以这里需要加while True循环来取。
# 当url队列里没有url的时候这里会堵塞等待,只要有就取。
# 但是取过后队列的基数并没有减1(并没有减去刚取走的url),所以要在下面使用task_done()
while True:
url = self.url_queue.get()
print(url)
try:
html = self._parse_url(url)
except:
html = None
# 将html添加到队列里
self.html_queue.put(html)
self.url_queue.task_done()
def get_content_list(self):
# 和上面一样
while True:
html = self.html_queue.get()
if html is not None:
div_list = html.xpath(‘//div[@id="content-left"]/div‘)
content_list = []
for div in div_list:
item = {}
item[‘name‘] = div.xpath(‘.//h2/text()‘)[0].replace("\n", "") if len(div.xpath(‘.//h2/text()‘)) > 0 else None
item[‘content‘] = div.xpath(‘.//div[@class="content"]/span/text()‘)[0].replace("\n", "") if len(div.xpath(‘.//div[@class="content"]/span/text()‘)) > 0 else None
item[‘comment‘] = div.xpath(‘.//i[@class="number"]/text()‘)[1] if len(div.xpath(‘.//i[@class="number"]/text()‘)) > 0 else None
item[‘img‘] = div.xpath(‘.//img/@src‘) if len(div.xpath(‘.//img/@src‘)) > 0 else None
content_list.append(item)
self.content_list_queue.put(content_list)
self.html_queue.task_done()
def save_content_list(self):
while True:
content_list = self.content_list_queue.get()
with open("qiubai.json", "a", encoding="utf-8") as f:
for content in content_list:
json.dump(content, f, ensure_ascii=False, indent=2)
f.write(‘,\n‘)
self.content_list_queue.task_done()
def run(self):
thread_list = []
# 创建一个提取url的线程
t_url = threading.Thread(target=self.get_url_list)
thread_list.append(t_url)
# 因为发送请求比较耗时,这里我们就用多线程来做
for i in range(5):
t_parse = threading.Thread(target=self.parse_url)
thread_list.append(t_parse)
# 提取数据也比较耗时,这里我们也使用多线程
for i in range(3):
t_get_content_list = threading.Thread(target=self.get_content_list)
thread_list.append(t_get_content_list)
# 保存数据必须用一个线程要数据就会乱
t_save = threading.Thread(target=self.save_content_list)
thread_list.append(t_save)
for t in thread_list:
t.setDaemon(True) # 守护线程
t.start()
# 当所有队列里没有数据,基数都等于0的时候主线程结束。否则一直堵塞在q.join()
for q in [self.content_list_queue, self.html_queue, self.url_queue]:
q.join()
if __name__ == ‘__main__‘:
qiubai = QiuShi()
qiubai.run()