Python高级应用程序设计任务要求(主题链家二手车)

  • 内容简介

    • 链家二手房成交信息(福州)
    • 本文主要使用了multiprocessing模块创建多个进程对象,使用Queue将多个进程联系在一起,也就是线程之间的通信多个对链家的二手房进行数据的爬取,处理,存储等操作。
    • 结构:主从模式:
      • 主控制节点
      • 从爬虫节点
  • 分析与设计
    • 系统主要核心有两大调度器

      • 1、控制调度器

        • 主要负责管理三个进程:一:负责将地址传递给爬虫节点,二:负责读取爬虫节点返回的数据,三:负责将数据提取进程中提交的数据进行数据持久化
      • 2、爬虫调度器
        • 爬虫节点主要是包括两个功能,下载html内容和解析html内容并跟控制节点进行交互
    • 数据库的主要数据库表的实体属性
  • 代码实现
    • 代码的目录结构如下

  1. 控制节点

nodeManager.py

#coding:utf-8
from multiprocessing.managers import BaseManager

import time
import sys
from multiprocessing import Process, Queue

from DataOutput import DataOutput
from UrlManager import UrlManager
‘‘‘
分布式爬虫
‘‘‘
class NodeManager(object):
    def __init__(self):
        sys.setrecursionlimit(100000000)  # 设置递归分界
        self.countPage = 0

    def start_Manager(self, url_q, result_q):
        ‘‘‘
        创建一个分布式管理器
        :param url_q: url队列
        :param result_q: 结果队列
        :return:
        ‘‘‘
        # 把创建的两个队列注册在网络上,利用register方法,callable参数关联了Queue对象,
        # 将Queue对象在网络中暴露
        BaseManager.register(‘get_task_queue‘, callable=lambda: url_q)
        BaseManager.register(‘get_result_queue‘, callable=lambda: result_q)
        # 绑定端口8001,设置验证口令‘baike’。这个相当于对象的初始化
        manager = BaseManager(address=(‘localhost‘, 8001), authkey=‘baike‘.encode(‘utf-8‘))
        # 返回manager对象
        self.countPage = int(input("请爬取您想要爬取的歌手的个数(记得要在爬虫节点没开启之前输入):"))
        return manager

    def url_manager_proc(self, url_q, conn_q, root_url):
        # url管理进程
        url_manager = UrlManager()
        for i in range(1,self.countPage+1):#写死表示要爬取几个列表
            url = ‘https://fz.lianjia.com/chengjiao/pg‘+str(i)+"/"
            url_manager.add_new_url(url)
        while True:
            while (url_manager.has_new_url()):
                # 从URL管理器获取新的url
                new_url = url_manager.get_new_url()
                # 将新的URL发给工作节点
                url_q.put(new_url)
                print(‘old_url=‘, url_manager.old_url_size())

    def result_solve_proc(self, result_q, conn_q, store_q):
        # 数据提取进程
        while (True):
            try:
                if not result_q.empty():
                    # Queue.get(block=True, timeout=None)
                    content = result_q.get(block=True, timeout=None)
                    if content[‘new_urls‘] == ‘end‘:
                        # 结果分析进程接受通知然后结束
                        print(‘结果分析进程接受通知然后结束!‘)
                        store_q.put(‘end‘)
                        return
                    store_q.put(content[‘data‘])  # 解析出来的数据为dict类型
                else:
                    time.sleep(0.1)  # 延时休息
            except BaseException as e:
                time.sleep(0.1)  # 延时休息

    def store_proc(self, store_q):
        # 数据存储进程
        output = DataOutput()
        while True:
            if not store_q.empty():
                data = store_q.get()
                if data == ‘end‘:
                    print(‘存储进程接受通知然后结束!‘)
                    output.add_mysql()
                    df = output.get_house()
                    print(">>>>>>>>>>>>>>>>>>>>二手成交房基本信息表")
                    print(df[[‘id‘, ‘addr‘, ‘house_class‘, ‘size‘, ‘closing_time‘, ‘price‘]])
                    output.show(df)
                    return
                output.store_data(data)
            else:
                time.sleep(0.1)
        pass

if __name__==‘__main__‘:
    #初始化4个队列
    url_q = Queue()
    result_q = Queue()
    store_q = Queue()
    # 数据提取进程存储url的队列
    conn_q = Queue()
    # 数据提取进程存储data的队列
    # 创建分布式管理器
    node = NodeManager()
    manager = node.start_Manager(url_q,result_q)
    #创建URL管理进程、 数据提取进程和数据存储进程
    root_url = ‘https://fz.lianjia.com/chengjiao/‘
    url_manager_proc = Process(target=node.url_manager_proc, args=(url_q,conn_q,root_url,))
    result_solve_proc = Process(target=node.result_solve_proc, args=(result_q,conn_q,store_q,))
    store_proc = Process(target=node.store_proc, args=(store_q,))
    #启动3个进程和分布式管理器
    url_manager_proc.start()
    result_solve_proc.start()
    store_proc.start()
    manager.get_server().serve_forever()#永远服务
#coding:utf-8
import pickle
import hashlib
class UrlManager(object):
    def __init__(self):
        self.new_urls = set()  # 未爬取的URL集合
        self.old_urls = set()  # 已爬取的URL集合
    def has_new_url(self):
        ‘‘‘
        判断是否有未爬取的URL集合
        :return:
        ‘‘‘
        return self.new_url_zize() != 0
    def has_old_url(self):
        ‘‘‘
        判断是否有以爬取的URL集合
        :return:
        ‘‘‘
        return self.old_url_size() != 0
    def get_new_url(self):
        ‘‘‘

        :return:
        ‘‘‘
        new_url = self.new_urls.pop()
        self.add_old_url(new_url)
        return new_url
    def add_new_url(self,url):
        ‘‘‘
        将新的URL添加到未爬取的URL集合中
        :param url:单个URL
        :return:
        ‘‘‘
        if url is None:
            return None
        m = hashlib.md5()
        m.update(url.encode("utf-8"))
        url_md5 = m.hexdigest()[8:-8]
        if url not in self.new_urls and url not in self.old_urls:
            self.new_urls.add(url)
    def add_new_urls(self,urls):
        if urls is None and len(urls) != 0:
            return None
        for url in urls:
            self.add_new_url(url)
    def add_old_url(self,url):
        if url is None:
            return None
        m = hashlib.md5()
        m.update(url.encode("utf-8"))
        # m.hexdigest() 32的长度去中间的16位
        self.old_urls.add(m.hexdigest()[8:-8])
        return True
    def new_url_zize(self):
        ‘‘‘
        获取未爬取URL集合的大小
        :return:
        ‘‘‘
        return len(self.new_urls)
    def old_url_size(self):
        ‘‘‘
        获取已爬取URL集合的大小
        :return:
        ‘‘‘
        return len(self.old_urls)

if __name__ == "__main__":
    urlManager = UrlManager()
    urlManager.get_new_url()

DataOutput.py

#coding:utf-8
import codecs
import time
import pymysql as ps
import pandas as pd
import matplotlib.pyplot as plt
import numpy as np

class DataOutput(object):
    def __init__(self):
        self.datas = []
        self.host = "localhost"
        self.user = "root"
        self.password = ""
        self.database = "lianjia"
        self.charset = "utf-8"
        self.db = None
        self.curs = None

    def store_data(self, data):
        if data is None:
            return
        self.datas.append(data)
    def add_mysql(self):
        return self.output_mysql()
    def output_mysql(self):
        sql = "insert into chenjiao (addr, house_class, size, closing_time,price) values(%s,%s,%s,%s,%s)"
        num = 0
        self.open()
        for data in self.datas:
            try:
                params = (data[‘addr‘], data[‘house_class‘], data[‘size‘], data[‘closing_time‘],data[‘price‘])
                num = num + self.curs.execute(sql, params)
                self.db.commit()
            except:
                print(‘存取%s失败‘%data)
                self.db.rollback()
        self.close()
        return num
    def open(self):
        self.db = ps.connect(host=self.host, user=self.user, password=self.password, database=self.database)
        self.curs = self.db.cursor()
    def close(self):
        self.curs.close()
        self.db.close()

    def get_house(self):
        self.open()
        try:
            sql = sql = "select * from chenjiao order by id asc"
            datas = pd.read_sql(sql=sql, con=self.db)
            return  datas
            self.close()
        except:
            print("显示失败!")
            self.close()
    def show(self,data):
        print(data.describe())
        dataHouseClass = data[‘house_class‘]
        dataDict = {}
        for value in dataHouseClass.values:
            if value in dataDict.keys():
                dataDict[value] = dataDict[value]+1
            else:
                dataDict[value] = 1
        plt.figure()
        plt.rcParams[‘font.sans-serif‘] = [‘SimHei‘]
        zone1 = plt.subplot(1,2,1)
        plt.bar([‘平均值‘,‘最小值‘,‘最大值‘,‘25%‘,‘50%‘,‘75%‘],[data.describe().loc[‘mean‘,‘price‘],data.describe().loc[‘min‘,‘price‘],data.describe().loc[‘max‘,‘price‘],data.describe().loc[‘25%‘,‘price‘],data.describe().loc[‘50%‘,‘price‘],data.describe().loc[‘75%‘,‘price‘]])
        plt.ylabel(‘价格‘)
        plt.title(‘基本信息表‘)
        zone2 = plt.subplot(1, 2, 2)
        plt.pie(dataDict.values(),labels=dataDict.keys(),autopct=‘%1.1f%%‘)
        plt.title(‘比例图‘)
        plt.show()
  1. 爬虫节点

SpiderWord.py

#coding:utf-8
from multiprocessing.managers import BaseManager
import time
import sys

from HtmlDownloader import HtmlDownloader
from HtmlParser import HtmlParser

class SpiderWork(object):
    def __init__(self):
        sys.setrecursionlimit(1000000)  # 例如这里设置为一百万
        #初始化分布式进程中的工作节点的连接工作
        # 实现第一步:使用BaseManager注册获取Queue的方法名称
        BaseManager.register(‘get_task_queue‘)
        BaseManager.register(‘get_result_queue‘)
        # 实现第二步:连接到服务器:
        server_addr = ‘127.0.0.1‘
        print((‘Connect to server %s...‘ % server_addr))
        # 端口和验证口令注意保持与服务进程设置的完全一致:
        self.m = BaseManager(address=(server_addr, 8001), authkey=‘baike‘.encode(‘utf-8‘))
        # 从网络连接:
        self.m.connect()
        # 实现第三步:获取Queue的对象:
        self.task = self.m.get_task_queue()
        self.result = self.m.get_result_queue()

        #初始化网页下载器和解析器
        self.downloader = HtmlDownloader()
        self.parser = HtmlParser()
        print(‘init finish‘)

    def crawl(self):
        while(True):
            try:
                if not self.task.empty():
                    url = self.task.get()
                    print(‘爬虫节点正在解析:%s‘%url.encode(‘utf-8‘))
                    print(self.task.qsize())
                    content = self.downloader.download(url)
                    new_urls,datas = self.parser.parser(url,content)
                    for data in datas:
                        print(data)
                        self.result.put({"new_urls":new_urls,"data":data})
                    if self.task.qsize() <= 0:
                        print(‘爬虫节点通知控制节点停止工作...‘)
                        #接着通知其它节点停止工作
                        self.result.put({‘new_urls‘:‘end‘,‘data‘:‘end‘})
                        return
            except EOFError as e:
                print("连接工作节点失败")

                return
            except Exception as e:
                print(e)
                print(‘Crawl  faild ‘)

if __name__=="__main__":
    spider = SpiderWork()
    spider.crawl()

HtmlDownloader.py

#coding:utf-8
import requests
import chardet
from selenium import webdriver

class HtmlDownloader(object):
    def __init__(self):
        opt = webdriver.chrome.options.Options()
        opt.set_headless()
        self.browser = webdriver.Chrome(chrome_options=opt)

    def download(self,url):
        if url is None:
            return None
        self.browser.get(url)
        # self.browser.switch_to.frame(‘g_iframe‘)
        html = self.browser.page_source
        return html

注意:静态内容跟动态内容的爬取

HtmlParser.py

#coding:utf-8
import re
import urllib.parse
from bs4 import BeautifulSoup

class HtmlParser(object):

    def parser(self,page_url,html_cont):
        ‘‘‘
        用于解析网页内容抽取URL和数据
        :param page_url: 下载页面的URL
        :param html_cont: 下载的网页内容
        :return:返回URL和数据
        ‘‘‘
        if page_url is None or html_cont is None:
            return
        soup = BeautifulSoup(html_cont,‘html.parser‘)
        new_urls = self._get_new_urls(page_url,soup)
        new_datas = self._get_new_data(page_url,soup)
        return new_urls,new_datas

    def _get_new_urls(self,page_url,soup):
        new_urls = set()
        return new_urls

    def _get_new_data(self,page_url,soup):
        ‘‘‘
        抽取有效数据
        :param page_url:下载页面的URL
        :param soup:
        :return:返回有效数据
        ‘‘‘
        dataList = []
        liList = soup.select(‘ul.listContent>li‘)
        for li in liList:
            title = li.select(‘div > div.title > a‘)
            result = re.split(r‘[\s]+‘, title[0].string) #使用正则表达式分割
            addr = result[0]
            house_class = result[1]
            size = result[2]
            # 定位 eg:高楼层(共26层) 塔楼
            # position = str(li.select(‘div > div.flood > div.positionInfo‘)[0].string)
            closing_time = str(li.select(‘div > div.address > div.dealDate‘)[0].string) #加str() 防止报:RecursionError: maximum recursion depth exceeded while pickling an object
            price = int(re.compile(r‘[\d]+‘).findall(li.select(‘div > div.address > div.totalPrice > span‘)[0].string)[0])
            data = {‘addr‘:addr,‘house_class‘:house_class,‘size‘:size,‘closing_time‘:closing_time,‘price‘:price}
            dataList.append(data)
        return dataList
  • 操作与效果
  1. 注意导入运行过程中需要的一些模块包
from multiprocessing.managers import BaseManagerimport timeimport sysfrom multiprocessing import Process, Queueimport hashlibimport pymysql as psimport pandas as pdimport matplotlib.pyplot as pltimport numpy as npimport requestsimport chardetfrom selenium import webdriverfrom bs4 import BeautifulSoupimport re

  2、运行NodeManager(控制节点)---》》输入爬取的范围---》》最后运行SpiderWord(爬虫节点)

  3、效果图

提前输入爬取的成交安分页个数来算,会显示出爬取地址的个数

启动爬虫节点,链接控制节点与之通信

数据进行存储

数据库内容

最终效果图

原文地址:https://www.cnblogs.com/cfz666/p/12008342.html

时间: 2024-08-03 14:09:07

Python高级应用程序设计任务要求(主题链家二手车)的相关文章

Python高级应用程序设计

Python高级应用程序设计任务要求 用Python实现一个面向主题的网络爬虫程序,并完成以下内容:(注:每人一题,主题内容自选,所有设计内容与源代码需提交到博客园平台) 一.主题式网络爬虫设计方案(15分) 1.主题式网络爬虫名称 名称:爬取视频网站中的电影排名信息2.主题式网络爬虫爬取的内容与数据特征分析 本次爬虫主要爬取各个视频网站中的电影排名以及评分3.主题式网络爬虫设计方案概述(包括实现思路与技术难点) 本次设计方案主要依靠request库对目标页面进行信息的爬取采集,再用Beauti

Python高级应用程序设计任务

Python高级应用程序设计任务要求 用Python实现一个面向主题的网络爬虫程序,并完成以下内容:(注:每人一题,主题内容自选,所有设计内容与源代码需提交到博客园平台) 一.主题式网络爬虫设计方案(15分) 1.主题式网络爬虫名称 新浪微博热点话题爬虫.2.主题式网络爬虫爬取的内容与数据特征分析 爬取新浪热点话题跟阅读量.3.主题式网络爬虫设计方案概述(包括实现思路与技术难点) 先爬取页面的HTML,然后使用正则表达式爬取话题跟阅读量,再存在文件中. 二.主题页面的结构特征分析(15分)1.主

Python 高级应用程序设计任务

一.主题式网络爬虫设计方案 1.主题式网络爬虫的名称 1.1链家网站的爬取 2,主题式网络爬虫的内容与数据特征分析 2.1爬虫的内容 房源信息的名称,小区名称,价格,楼层,代理人,单价,发布时间. 2.2 数据特征分析 2.2.1对楼层做一个词云并可视化 2.2.2对发布时间做一个折线图 3,主题式网络爬虫设计方案概述(包括实现思路和技术难点) 3.1实现思路 创建一个get的类,定义get_alldata()方法用来获取网页上的全部信息,get_detail()方法用来对整数数据的进一步加工和

Python高级应用程序设计任务要求

1.案例内容简介 易车网新车信息的爬取 内容步骤: 爬取,解析(动态requests+ajax/selenium),清洗,持久化(mysql),可视化(seaborn) 2.案例分析与设计 (1) 系统框架 整个框架分为六个模块:爬虫调度器.URL管理器.HTML下载器.HTML解析器.数据存储器.数据可视化 (2) 数据库设计 用于记录奥迪汽车信息 表ad_data id Int 自增主键 name Varchar(255) 汽车名称 time_to_market Varchar(255) 上

python 学习 - 爬虫入门练习 爬取链家网二手房信息

import requests from bs4 import BeautifulSoup import sqlite3 conn = sqlite3.connect("test.db") c = conn.cursor() for num in range(1,101): url = "https://cs.lianjia.com/ershoufang/pg%s/"%num headers = { 'User-Agent': 'Mozilla/5.0 (Windo

python高级编程:有用的设计模式2

# -*- coding: utf-8 -*- __author__ = 'Administrator' #python高级编程:有用的设计模式 #代理 """ 代理对一个代价昂贵或者远程的资源提供了一个非直接访问的机制 在客户和主意之间,如图.它用来优化对高代价主题的访问,比如,在前一章中描述的memoize装饰器可以被认为是一个代理 ,它还可以用提供到一个主题智能访问,例如,大的视频文件可以封闭在代理中,以避免在用户仅仅请教其标题时就将文件载入到内存中 urllib2出给

Python高级编程pdf

下载地址:网盘下载 内容简介  · · · · · · <Python高级编程>通过大量的实例,介绍了Python语言的最佳实践和敏捷开发方法,并涉及整个软件生命周期的高级主题,诸如持续集成.版本控制系统.包的发行和分发.开发模式.文档编写等.<Python高级编程>首先介绍如何设置最优的开发环境,然后以Python敏捷开发方法为线索,阐述如何将已被验证的面向对象原则应用到设计中.这些内容为开发人员和项目管理人员提供了整个软件工程中的许多高级概念以及专家级的建议,其中有些内容的意义

三.python高级

三.python高级 1.元类 1.1 Python 中类方法.类实例方法.静态方法有何区别? 类方法:是类对象的方法,在定义时需要在上方使用@classmethod进行装饰,形参为cls,表示类对象,类对象和实例对象都可调用: 类实例方法:是类实例化对象的方法,只有实例对象可以调用,形参为 self,指代对象本身: 静态方法:是一个任意函数,在其上方使用@staticmethod进行装饰,可以用对象直接调用, 静态方法实际上跟该类没有太大关系 2.内存管理与垃圾回收机制 2.1 Python

面试题--python高级

第三章Python 高级 一.元类 1.Python 中类方法.类实例方法.静态方法有何区别?(2018-3-30-lxy) 类方法:是类对象的方法,在定义时需要在上方使用"@classmethod"进行装饰,形参为cls, 表示类对象,类对象和实例对象都可调用: 类实例方法:是类实例化对象的方法,只有实例对象可以调用,形参为self,指代对象本身: 静态方法:是一个任意函数,在其上方使用"@staticmethod"进行装饰,可以用对象直接调用, 静态方法实际上跟