实现多线程爬取数据并保存到mongodb

多线程爬取二手房网页并将数据保存到mongodb的代码:

import pymongo
import threading
import time

from lxml import etree
import requests
from queue import Queue

index_url=‘https://m.lianjia.com/gz/ershoufang/pg{}/‘
detail_url=‘https://m.lianjia.com{}‘

# 设置爬取主页的页数
INDEX_PAGE_NUM=200

# 定义一个类

# 0定义主页url队列、主页html队列、详情页url队列、html队列、内容队列
# 1获取首页url并解析详情页url
# 2获取详情页的内容
# 3保存内容
# 4设置多线程调用方法

# 设置mongodb
client = pymongo.MongoClient(‘localhost‘)
# 设置数据库名
db = client[‘ershoufang‘]
# 指定集合名
index = ‘index_info‘
detail = ‘detail_info‘

class lianJia():
    def __init__(self):
        self.index_url_queue=Queue()

        self.html_index_queue=Queue()

        self.index_content_queue=Queue()
        self.detail_content_queue = Queue()

    #     获取主页的url和html内容并解析出index页内容和详情页url
    def get_index(self):
        for i in range(INDEX_PAGE_NUM):
            # print(index_url.format(i+1))
            url=index_url.format(i+1)
            self.index_url_queue.put(url)
            # index=requests.get(index_url.format(i+1)).content.decode()
            # self.html_index_queue.put(index)
    # 获取主页html
    def get_index_html(self):
        while True:
            url=self.index_url_queue.get()
            index = requests.get(url).content.decode()
            self.html_index_queue.put(index)
            self.index_url_queue.task_done()
    def parse_index(self):
        while True:
            # 获取队列里得内容
            html1=self.html_index_queue.get()
            xml=etree.HTML(html1)
            pingjie_list=xml.xpath(‘‘‘//ul[@class=‘lists‘]/li[position()>1]‘‘‘)
            # 将 pingjie_list拼接在xpath前,少写xpath语句
            index_content_list=[]
            for pj in pingjie_list:
                index_infor={}
                # #判空炒作,如果为空则显示none if len(index_infor[‘title‘]) > 0 else None
                index_infor[‘title‘]=pj.xpath(‘‘‘./div/div[@class=‘item_list‘]/div[1]/text()‘‘‘)

                index_infor[‘title‘]=index_infor[‘title‘][0] if len(index_infor[‘title‘]) > 0 else None
                index_infor[‘detail_url‘] = pj.xpath(‘‘‘./a/@href‘‘‘)[0]
                index_infor[‘index_detail‘]=pj.xpath(‘‘‘./div/div[2]/div[2]/text()‘‘‘)
                index_infor[‘index_detail‘]=index_infor[‘index_detail‘][0] if len(index_infor[‘index_detail‘])>0 else None
                index_infor[‘total_price‘]=pj.xpath(‘‘‘./div/div[2]/div[position()>2]/span[1]/em/text()‘‘‘)
                index_infor[‘total_price‘]= index_infor[‘total_price‘][0] if len( index_infor[‘total_price‘])>0 else None
                index_infor[‘average_price‘]=pj.xpath(‘‘‘./div/div[@class=‘item_list‘]/div[3]/span[2]/text()‘‘‘)
                index_infor[‘average_price‘]=index_infor[‘average_price‘][0]if len(index_infor[‘average_price‘])>0 else None
                index_content_list.append(index_infor)
                #  队列保存时不能在循环里 否之回保存很多个队列
                # self.index_content_queue.put(index_content_list)
                # 把content_list放进content_queue里面

            self.index_content_queue.put(index_content_list)
            # print(index_content_list)

            # 每从队列中获取一个数,队列则减少一个数,所以此代码必须写
            self.html_index_queue.task_done()

    # 获取详情页内容
    def get_detail(self):
        pass

    # 保存内容
    def save_content(self):
        while True:
            index_conten_list=self.index_content_queue.get()

            for i in index_conten_list:
                # print(i[‘title‘])
                if i[‘title‘]==None or i[‘total_price‘]==None or i[‘average_price‘]==None:
                    print(‘该数据为空,不进行保存‘)

                else:
                    db[‘index_info‘].insert(i)
                    # db[‘detailDta‘].insert(detail_datas)
                    print(‘保存数据成功‘)
            self.index_content_queue.task_done()

    # 主线程:分配各种子线程去执行class里得每一个函数
    # 使用队列的方式得设置多线程进行调用函数,才能让程序执行速度更快
    def run(self):
        # 设置线程列表
        thread_list=[]
        # start_time=time.time()
        # 1.url_list
        # threading.Thread不需要传参数,参数都是从队列里面取得
        # for i in range(20):
        t_index_u=threading.Thread(target=self.get_index)
        thread_list.append(t_index_u)

        # 2.遍历,发送请求,获取响应
        for i in range(20):
            t_index_html=threading.Thread(target=self.get_index_html)
            thread_list.append(t_index_html)

        # 3.提取数据
        for i in range(2):
            t_parse_index=threading.Thread(target=self.parse_index)
            thread_list.append(t_parse_index)

        # 4.保存数据
        t_save=threading.Thread(target=self.save_content)
        thread_list.append(t_save)
        #     循环开启各子线程
        for t in thread_list:
            # 表示主线程结束,子线程(设置为true无限循环)也跟着结束(用主线程控制子线程)
            t.setDaemon(True)
            # 启动线程
            t.start()
        for q in [self.index_url_queue,self.html_index_queue,self.index_content_queue]:
            # 让主线程等待阻塞,等待队列的任务完成(即队列为空时 )之后再进行主线程
            q.join()
            # end_time=time.time()
            # print(‘总耗时%.2f秒‘%(end_time-start_time))

if __name__==‘__main__‘:
    sk = time.clock()
    func=lianJia()
    func.run()
    ek = time.clock()
    print(‘程序总耗时:‘,ek-sk)

多线程爬取糗事百科:

# coding=utf-8
import requests
from lxml import etree
import threading
from queue import Queue

# https://docs.python.org/3/library/queue.html#module-queue
# 队列使用方法简介
# q.qsize() 返回队列的大小
# q.empty() 如果队列为空,返回True,反之False
# q.full() 如果队列满了,返回True,反之False
# q.full 与 maxsize 大小对应
# q.get([block[, timeout]]) 获取队列,timeout等待时间
# q.get_nowait() 相当q.get(False)
# q.put(item) 写入队列,timeout等待时间
# q.put_nowait(item) 相当q.put(item, False)
# q.task_done() 在完成一项工作之后,q.task_done() 函数向任务已经完成的队列发送一个信号
# q.join() 实际上意味着等到队列为空,再执行别的操作

class QiubaiSpdier:
    def __init__(self):
        self.url_temp = "https://www.qiushibaike.com/8hr/page/{}/"
        self.headers = {"User-Agent":"Mozilla/5.0 (Macintosh; Intel Mac OS X 10_13_2) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/63.0.3239.84 Safari/537.36"}
        self.url_queue = Queue()
        self.html_queue  = Queue()
        self.content_queue = Queue()
    def get_url_list(self):
        # return [self.url_temp.format(i) for i in range(1,14)]
        for i in range(1,14):
            # 把13个索引页面的Url放进url_queue队列里
            self.url_queue.put(self.url_temp.format(i))

    def parse_url(self):
        while True:
            # get方法和task_done搭配使用
            # 在put是队列+1,get和task_done一起使用时队列才会-1
            url = self.url_queue.get()
            print(url)
            response = requests.get(url,headers=self.headers)
            # 然后把索引页的响应页面放进html_queue队列里
            self.html_queue.put(response.content.decode())
            self.url_queue.task_done()

    def get_content_list(self): #提取数据
        while True:
            # 先从索引页响应页面html_queue队列里面取出索引页面
            html_str = self.html_queue.get()

            html = etree.HTML(html_str)
            div_list = html.xpath("//div[@id=‘content-left‘]/div")  #分组
            content_list = []
            for div in div_list:
                item= {}
                item["content"] = div.xpath(".//div[@class=‘content‘]/span/text()")
                item["content"] = [i.replace("\n","") for i in item["content"]]
                item["author_gender"] = div.xpath(".//div[contains(@class,‘articleGender‘)]/@class")
                item["author_gender"] = item["author_gender"][0].split(" ")[-1].replace("Icon","") if len(item["author_gender"])>0 else None
                item["auhtor_age"] = div.xpath(".//div[contains(@class,‘articleGender‘)]/text()")
                item["auhtor_age"] = item["auhtor_age"][0] if len(item["auhtor_age"])>0 else None
                item["content_img"] = div.xpath(".//div[@class=‘thumb‘]/a/img/@src")
                item["content_img"] = "https:"+item["content_img"][0] if len(item["content_img"])>0 else None
                item["author_img"] = div.xpath(".//div[@class=‘author clearfix‘]//img/@src")
                item["author_img"] = "https:"+item["author_img"][0] if len(item["author_img"])>0 else None
                item["stats_vote"] = div.xpath(".//span[@class=‘stats-vote‘]/i/text()")
                item["stats_vote"] = item["stats_vote"][0] if len(item["stats_vote"])>0 else None
                content_list.append(item)
            # 把content_list放进content_queue里面
            self.content_queue.put(content_list)
            self.html_queue.task_done()

    def save_content_list(self): #保存
        while True:
            content_list = self.content_queue.get()
            for i in content_list:
                print(i)
                pass
            self.content_queue.task_done()

    def run(self): #实现主要逻辑
        thread_list = []
        #1.url_list
        # threading.Thread不需要传参数,参数都是从队列里面取得
        t_url = threading.Thread(target=self.get_url_list)
        thread_list.append(t_url)
        #2.遍历,发送请求,获取响应
        for i in range(20): # 添加20个线程
            t_parse = threading.Thread(target=self.parse_url)
            thread_list.append(t_parse)
        #3.提取数据
        for i in range(2): # 添加2个线程
            t_html = threading.Thread(target=self.get_content_list)
            thread_list.append(t_html)
        #4.保存
        t_save = threading.Thread(target=self.save_content_list)
        thread_list.append(t_save)
        for t in thread_list:
            t.setDaemon(True) #把子线程设置为守护线程,该线程不重要,主线程结束,子线程结束(子线程是while true不会自己结束)
            t.start()

        for q in [self.url_queue,self.html_queue,self.content_queue]:
            q.join() #让主线程等待阻塞,等待队列的任务完成(即队列为空时 )之后再进行主线程

        print("主线程结束")

if __name__ == ‘__main__‘:
    qiubai = QiubaiSpdier()
    qiubai.run()

# 所没有tast_done方法,程序最终会卡着不动,无法终止

# 线程的设计注意:耗时的操作要分配一些线程

原文地址:https://www.cnblogs.com/Dark-fire-liehuo/p/9998573.html

时间: 2024-10-10 12:41:54

实现多线程爬取数据并保存到mongodb的相关文章

多线程爬取小说时如何保证章节的顺序

前言 爬取小说时,以每一个章节为一个线程进行爬取,如果不加以控制的话,保存的时候各个章节之间的顺序会乱掉. 当然,这里说的是一本小说保存为单个txt文件,如果以每个章节为一个txt文件,自然不会存在这种情况. 不仅仅是小说,一些其他的数据在多线程爬取时也有类似情况,比如: 漫画:漫画其实是由大量图片组成,一般一本漫画会保存为一个pdf文件,在此过程要保证图片的顺序. 视频:现在网络上的视频大部分是由多个ts文件拼合,最后保存为一个mp4文件,要保证ts文件的顺序. 它们都有一个共同的特点,那就是

多线程爬取都挺好链接并保存到mongodb

一个比较简单,python3多线程使用requests库爬取都挺好,并使用正则提取下载链接,保存到mongodb #!/usr/bin/env python # -*- coding:utf-8 -*- """ @author:Aiker Zhao @file:doutinghao.py @time:下午8:18 """ import requests import re import pymongo from multiprocessing i

借助Chrome和插件爬取数据

工具 Chrome浏览器 TamperMonkey ReRes Chrome浏览器 chrome浏览器是目前最受欢迎的浏览器,没有之一,它兼容大部分的w3c标准和ecma标准,对于前端工程师在开发过程中提供了devtools和插件等工具,非常方便使用.在爬取数据的过程中,最常用的应该是开发工具中的Element.Source和Network功能,分别查看DOM结构,源码和网络请求.同时,有很多基于Chrome浏览器的插件又给我们赋予了浏览器级别的能力,来处理数据. TamperMonkey Ta

Python爬虫入门教程: All IT eBooks多线程爬取

All IT eBooks多线程爬取-写在前面 对一个爬虫爱好者来说,或多或少都有这么一点点的收集癖 ~ 发现好的图片,发现好的书籍,发现各种能存放在电脑上的东西,都喜欢把它批量的爬取下来. 然后放着,是的,就这么放着.......然后慢慢的遗忘掉..... All IT eBooks多线程爬取-爬虫分析 打开网址 http://www.allitebooks.com/ 发现特别清晰的小页面,一看就好爬 在点击一本图书进入,发现下载的小链接也很明显的展示在了我们面前,小激动一把,这么清晰无广告的

多线程爬取糗事百科热门段子 (改写前天的博客)

利用多线程爬取,除了先前用到的几个模块之外,还需用到threading模块和queue模块: 为每一件事情开启一个线程:构造url_list.发送请求.提取数据.保存数据 __init__方法添加三个实例属性队列分别存放:url.响应内容.处理后的数据 改写原先每一个方法里的代码,需要的东西直接从队列中取出,此时方法都无需多余参数了 每当从一个队列取出数据,记得执行task_done()方法,使计数减一 run()方法里把yaozhixing的事情都开启一个线程,比较慢的事情,比如网络请求,可以

shell脚本每隔2s获取某个进程的cpu和mem数据并保存到csv文件

shell脚本每隔2s获取某个进程的cpu和mem数据并保存到csv文件 shell脚本如下echo "%CPU,%MEM" > cpu_test.csvpid=1 #Can be change by yourselfwhile true do top -bn1 -n 1 -p $pid | tail -1 | awk '{ print $9,$10 }' | sed 's/ /,/' >> cpu_test.csv sleep 2 #delay timedone 脚

多线程爬取百度百科

前言:EVERNOTE里的一篇笔记,我用了三个博客才学完...真的很菜...百度百科和故事网并没有太过不一样,修改下编码,debug下,就可以爬下来了,不过应该是我爬的东西太初级了,而且我爬到3000多条链接时,好像被拒绝了...爬取速度也很慢,估计之后要接触一些优化或者多进程,毕竟python是假的多线程.本博客参照代码及PROJECT来源:http://kexue.fm/archives/4385/ 源代码: 1 #! -*- coding:utf-8 -*- 2 import reques

【个人】爬虫实践,利用xpath方式爬取数据之爬取虾米音乐排行榜

实验网站:虾米音乐排行榜 网站地址:http://www.xiami.com/chart 难度系数:★☆☆☆☆ 依赖库:request.lxml的etree (安装lxml:pip install lxml) IDEA开发工具:PyCharm_2017.3 Python版本:Python3 期望结果:爬取出排行版歌名以及对应歌手 运行效果图: 音乐排行榜: 爬取数据结果图: 像这种简单的爬取就没必要使用Scrapy框架进行处理,是在有点大材小用,不过如果你刚开始学Scrapy的话,拿这些简单的练

python多线程爬取网页

#-*- encoding:utf8 -*- ''' Created on 2018年12月25日 @author: Administrator ''' from multiprocessing.dummy import Pool as pl import csv import requests from lxml import etree def spider(url): header = {"User-Agent":"Mozilla/5.0 (Windows NT 6.1