进程池爬取并存入mongodb

设置进程池爬取拉钩网:

# coding = utf-8
import json
import pymongo
import pandas as pd
import requests
from lxml import etree
import time
from multiprocessing import Pool

# 设置mongodb
client = pymongo.MongoClient(‘localhost‘)
db = client[‘lagou‘]
# 查询的岗位名称
POSITION_NAME = ‘数据挖掘‘
# 想要爬取的总页面数
PAGE_SUM = 200
# 每页返回的职位数量
PAGE_SIZE = 15
# 指定数据库的名字
DATA_NAME = "DataMiningPosition"

base_url = ‘https://m.lagou.com/search.json?city=%E5%85%A8%E5%9B%BD&positionName={positionName}‘            ‘&pageNo={pageNo}&pageSize={pageSize}‘

def page_index(pageno):
    headers = {
        "Accept": "application/json",
        "Accept-Encoding": "gzip, deflate",
        "Accept-Language": "zh-CN,zh;q=0.9",
        # cookie能不要尽量不要,这里正好不用cookie也可以正常返回数据
        # "Cookie": "user_trace_token=20181119151914-03711263-38a2-4d81-bd81-5f480d930039; _ga=GA1.2.605262108.1542611954; _gid=GA1.2.249787972.1542611954; LGSID=20181119151916-6c3da9fa-ebcb-11e8-8958-5254005c3644; PRE_UTM=; PRE_HOST=www.baidu.com; PRE_SITE=https%3A%2F%2Fwww.baidu.com%2Flink%3Furl%3DOnHWjpEfiW4_pVm7hX8NYOFm0iJ7bz1ZJJlaKPPnmMzLE-6ypKNo0f19ABO5bjW4%26wd%3D%26eqid%3D8f61629100016e18000000065bf263e7; PRE_LAND=https%3A%2F%2Fwww.lagou.com%2Fgongsi%2F147.html; LGUID=20181119151916-6c3dabf3-ebcb-11e8-8958-5254005c3644; index_location_city=%E5%85%A8%E5%9B%BD; JSESSIONID=ABAAABAAAGCABCC2D851CA25D1CFCD2B28DCDD6E00A2C7E; _ga=GA1.3.605262108.1542611954; X_HTTP_TOKEN=a0cc1a4beb8a41f57f144bc0bfd77bd7; sajssdk_2015_cross_new_user=1; sensorsdata2015jssdkcross=%7B%22distinct_id%22%3A%221672adb3834203-08b3706084b44a-3961430f-1327104-1672adb3835428%22%2C%22%24device_id%22%3A%221672adb3834203-08b3706084b44a-3961430f-1327104-1672adb3835428%22%2C%22props%22%3A%7B%22%24latest_traffic_source_type%22%3A%22%E7%9B%B4%E6%8E%A5%E6%B5%81%E9%87%8F%22%2C%22%24latest_referrer%22%3A%22%22%2C%22%24latest_referrer_host%22%3A%22%22%2C%22%24latest_search_keyword%22%3A%22%E6%9C%AA%E5%8F%96%E5%88%B0%E5%80%BC_%E7%9B%B4%E6%8E%A5%E6%89%93%E5%BC%80%22%7D%7D; Hm_lvt_4233e74dff0ae5bd0a3d81c6ccf756e6=1542611954,1542612053,1542612277,1542612493; _gat=1; Hm_lpvt_4233e74dff0ae5bd0a3d81c6ccf756e6=1542613115; LGRID=20181119153837-20bafb1a-ebce-11e8-8958-5254005c3644",
        "Host": "m.lagou.com",
        "Proxy-Connection": "keep-alive",
        "Referer": "http://m.lagou.com/search.html",
        "X-Requested-With": "XMLHttpRequest",
        ‘User-Agent‘: ‘Mozilla/5.0 (Windows NT 6.1; WOW64) AppleWebKit/537.36 (KHTML, like Gecko) ‘
                      ‘Chrome/45.0.2454.85 Safari/537.36 115Browser/6.0.3‘,
    }
    url = base_url.format(positionName=POSITION_NAME, pageNo=pageno, pageSize=PAGE_SIZE)
    response = requests.get(url, headers=headers)
    html = response.text
    content = json.loads(html)
    print(content)
    if content.get("content"):
        return content
    else:
        time.sleep(30)
        return page_index(pageno)

def parse_page_index(content):

    for i in range(15):
        try:
            item = content[‘content‘][‘data‘][‘page‘][‘result‘][i]
            #print(item)
            yield {
                ‘positionId‘: item.get(‘positionId‘),
                ‘positionName‘: item.get(‘positionName‘),
                ‘city‘: item.get(‘city‘),
                ‘createTime‘: item.get(‘createTime‘),
                ‘salary‘: item.get(‘salary‘),
                ‘companyId‘: item.get(‘companyId‘),
                ‘companyFullName‘: item.get(‘companyFullName‘)
            }
        except IndexError as e:
            print(‘可能没有那么多字段‘, e)

def save_to_mongo(data):
    if db[DATA_NAME].update({‘positionId‘: data[‘positionId‘]}, {‘$set‘: data}, True):
        print(‘Saved to Mongo‘, data[‘positionId‘])
    else:
        print(‘Saved to Mongo Failed‘, data[‘positionId‘])

def parse_detail(url):
    # url = "http://m.lagou.com/jobs/4593934.html"
    headers = {
        "User-Agent": "Mozilla/5.0 (Windows NT 10.0; WOW64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/64.0.3282.140 Safari/537.36",
        "Accept": "text / html, application / xhtml + xml, application / xml;q = 0.9, image / webp, image / apng, * / *;q = 0.8",
        "Accept - Encoding": "gzip, deflate",
        "Accept - Language": "zh - CN, zh;q = 0.9",
        "Cache - Control": "max - age = 0",
        "Connection": "eep - alive",
       # "Cookie": "_ga=GA1.2.474762156.1528795210; _gid=GA1.2.574638607.1528795210; user_trace_token=20180612172010-cdf76dc1-6e21-11e8-9af0-525400f775ce; LGUID=20180612172010-cdf772c0-6e21-11e8-9af0-525400f775ce; Hm_lvt_4233e74dff0ae5bd0a3d81c6ccf756e6=1528795210,1528795215,1528795223; index_location_city=%E5%85%A8%E5%9B%BD; X_HTTP_TOKEN=f3ed266ddeee802fb7d402e4f6d4f4a3; JSESSIONID=ABAAABAAAFDABFG9F9C52FA9D8CAE24F139A0131C45E918; _ga=GA1.3.474762156.1528795210; _gat=1; LGSID=20180612184248-597a7795-6e2d-11e8-9479-5254005c3644; PRE_UTM=; PRE_HOST=; PRE_SITE=http%3A%2F%2Fm.lagou.com%2Fsearch.html; PRE_LAND=http%3A%2F%2Fm.lagou.com%2Fjobs%2F4079910.html; LGRID=20180612184505-ab051d02-6e2d-11e8-9479-5254005c3644; Hm_lpvt_4233e74dff0ae5bd0a3d81c6ccf756e6=1528800306"

    }
    try:
        response = requests.get(url, headers=headers)
        if response.status_code == 200:
            print("请求成功")
            text = response.content.decode()
            # print(text)
            html = etree.HTML(text)
            workyear = html.xpath(‘//span[@class="item workyear"]/span/text()‘)
            if workyear:
                workyear = workyear[0]
            else:
                time.sleep(5)
                parse_detail(url)
            positiondesc = html.xpath(‘//div[@class="positiondesc"]//p/text()‘)
            #print(workyear, positiondesc)
            return workyear, positiondesc
    except Exception as e:
        print(e)

# 将爬取的数据存到Mongodb
def to_mongo(page_sum):
    # 拉勾网顶多只能显示到334页
    for page in range(page_sum):
        html = page_index(page)
        items = parse_page_index(html)
        # print(items)
        for item in items:
            print(item)
            save_to_mongo(item)

# 运用进程池将爬取的数据存到Mongodb
def to_mongo_pool(page):
    # 拉勾网顶多只能显示到334页
    content = page_index(page)
    items = parse_page_index(content)
    # print(items)
    for item in items:
        print(item)
        save_to_mongo(item)

# 解析爬取的字条,以便把数据转为DataFrame格式
def parse_items(page_sum):
    for page in range(page_sum):
        html = page_index(page)
        items = parse_page_index(html)
        for item in items:
            positionId = item["positionId"]
            detail_url = "http://m.lagou.com/jobs/{}.html".format(positionId)
            workyear, positiondesc = parse_detail(detail_url)
            print(positionId,positiondesc)
            yield [
                item["positionId"],
                item["positionName"],
                item["city"],
                item["createTime"],
                item["salary"],
                item["companyId"],
                item["companyFullName"],
                workyear,
                positiondesc
            ]

# 把数据保存为csv格式
def to_csv(page_sum):
    item_lists = []
    # print(parse_items())
    for item in parse_items(page_sum):
        item_lists.append(item)
    #print(item_lists)
    data = pd.DataFrame(item_lists,
                        columns=["positionId", "positionName", "city", "createTime", "salary", "companyId",
                                 "companyFullName", "workyear", "positiondesc"])
    data.to_csv("python_positon.csv")

if __name__ == ‘__main__‘:

    #to_csv
    #to_mongo(200)
    #  建议保存到mongodb数据库中

    start_time = time.time()
    pool = Pool()  # pool()参数:进程个数:默认的是电脑cpu的核的个数,如果要指定进程个数,这个进程个数要小于等于cpu的核数
    # 第一个参数是一个函数体,不需要加括号,也不需指定参数。。
    #  第二个参数是一个列表,列表中的每个参数都会传给那个函数体
    pool.map(to_mongo_pool,[i for i in range(PAGE_SUM)])
    # close它只是把进程池关闭
    pool.close()
    # join起到一个阻塞的作用,主进程要等待子进程运行完,才能接着往下运行
    pool.join()
    end_time = time.time()
    print("总耗费时间%.2f秒" % (end_time - start_time))

原文地址:https://www.cnblogs.com/Dark-fire-liehuo/p/9998594.html

时间: 2024-11-02 12:32:39

进程池爬取并存入mongodb的相关文章

梨视频,进程池、线程池爬取

[TOC] 进程池 import requests, re, time from multiprocessing.dummy import Pool import random IpPool = [{'http': '183.147.230.104: 8118'}, {'http': '60.217.64.237: 31923'}, {'http': '221.193.50.166: 8118'}] url = 'https://www.pearvideo.com/category_loadin

Python练习【利用线程池爬取电影网站信息】

功能实现 爬取猫眼电影TOP100(http://maoyan.com/board/4?offset=90) 1). 爬取内容: 电影名称,主演, 上映时间,图片url地址保存到文件中; 2). 文件名为topMovie.csv; 3). 记录方式: 电影名称:主演:上映时间:图片url地址:评分; 4). 并爬取的信息保存在数据库中; 5). 使用多线程/线城池实现; 编程思路 1.利用多线程分配任务 2.编写单线程的任务实现功能 (1)获取指定url页面信息. (2)从指定信息中匹配所需的信

一个咸鱼的Python爬虫之路(四):将爬取数据存入mysql

Python 与 mysql 数据连接 用pymysql import pymysql conn =pymysql.connect(host='127.0.0.1',user='root',password='123456',db='company',charset="utf8") cur=conn.cursor() sql=''' ''' employee=cur.execute(sql) conn.commit() cur.close() conn.close() 基本操作大概就这

使用IP代理池和用户代理池爬取糗事百科文章

简单使用IP代理池和用户代理池的爬虫 import re import random import urllib.request as urlreq import urllib.error as urlerr #用户代理池 uapools = [ "Mozilla/5.0 (Windows NT 10.0; WOW64; rv:52.0) Gecko/20100101 Firefox/52.0", "Mozilla/5.0 (Windows NT 10.0; Win64; x

进程池 爬去梨视频 视频资源

# 导入requests网络请求模块 import requests # 导入lxml标签匹配模块 from lxml import etree # 导入re 正则匹配模块 import re #导入系统路径模块 import os # 导入进程模块 import multiprocessing # 存在视频网址 mylist = [] # 请求函数 def Data(url): #发送请求 test = requests.get(url) # with open('./pa.html','w'

Python--线程队列(queue)、multiprocessing模块(进程对列Queue、管道(pipe)、进程池)、协程

队列(queue) 队列只在多线程里有意义,是一种线程安全的数据结构. get与put方法 ''' 创建一个"队列"对象 import queue q = queue.Queue(maxsize = 10) queue.Queue类即是一个队列的同步实现.队列长度可为无限或者有限.可通过Queue的构造函数的可选参数maxsize来设定队列长度.如果maxsize小于1就表示队列长度无限. 将一个值放入队列中: q.put() 调用队列对象的put()方法在队尾插入一个项目.put()

python 之 并发编程(进程池与线程池、同步异步阻塞非阻塞、线程queue)

9.11 进程池与线程池 池子使用来限制并发的任务数目,限制我们的计算机在一个自己可承受的范围内去并发地执行任务 池子内什么时候装进程:并发的任务属于计算密集型 池子内什么时候装线程:并发的任务属于IO密集型 进程池: from concurrent.futures import ProcessPoolExecutor,ThreadPoolExecutor import time,os,random ? def task(x): print('%s 接客' %os.getpid()) time.

从零开始学爬虫(三)------写入源文件的爬取

开始爬取网页:(2)写入源文件的爬取 为了使代码易于修改,更清晰高效的爬取网页,我们将代码写入源文件进行爬取. 主要分为以下几个步骤: 一.使用scrapy创建爬虫框架: 二.修改并编写源代码,确定我们要爬取的网页及内容 三.开始爬取并存入文件(数据库) 注:为了避免冗长的叙述,更直观地理解,这里先讲具体的操作方法,如果想要深入理解其原理,具体解释在最后. *操作方法: 1.创建爬虫框架 打开命令行,使用cd命令,进入你想要创建文件的位置 scrapy startproject 文件夹名称(假设

用 Python 爬取网易严选妹子内衣信息,探究妹纸们的偏好

今天继续来分析爬虫数据分析文章,一起来看看网易严选商品评论的获取和分析. ? 网易商品评论爬取 分析网页 ? 评论分析 进入到网易严选官网,搜索“文胸”后,先随便点进一个商品. ? 在商品页面,打开 Chrome 的控制台,切换至 Network 页,再把商品页面切换到评价标签下,选择一个评论文字,如“薄款.穿着舒适.满意”,在 Network 中搜索. ? 可以发现,评论文字是通过 listByItemByTag.json 传递过来的,点击进入该请求,并拷贝出该请求的 URL: https:/