scrapy爬取知乎问答

登陆

参考 https://github.com/zkqiang/Zhihu-Login

# -*- coding: utf-8 -*-
import scrapy

import time
import re
import base64
import hmac
import hashlib
import json
import matplotlib.pyplot as plt
from PIL import Image

class ZhihuSpider(scrapy.Spider):
    name = ‘zhihu‘
    allowed_domains = [‘www.zhihu.com‘]
    start_urls = [‘http://www.zhihu.com/‘]

    login_url = ‘https://www.zhihu.com/signup‘
    login_api = ‘https://www.zhihu.com/api/v3/oauth/sign_in‘
    login_data = {
        ‘client_id‘: ‘c3cef7c66a1843f8b3a9e6a1e3160e20‘,
        ‘grant_type‘: ‘password‘,
        ‘source‘: ‘com.zhihu.web‘,
        ‘username‘: "+86xxxxxx",
        ‘password‘: "xxxxxxxx",
        # 传入‘cn‘是倒立汉字验证码,
        ‘lang‘: ‘en‘,
        ‘ref_source‘: ‘homepage‘
    }
    headers = {
        ‘Connection‘: ‘keep-alive‘,
        ‘Host‘: ‘www.zhihu.com‘,
        ‘Referer‘: ‘https://www.zhihu.com/‘,
        ‘User-Agent‘: ‘Mozilla/5.0 (Windows NT 10.0; Win64; x64) ‘
                        ‘AppleWebKit/537.36 (KHTML, like Gecko) ‘
                        ‘Chrome/69.0.3497.100 Safari/537.36‘
    }

    def start_requests(self):
        if self.login_data["lang"] == ‘cn‘:
            api = ‘https://www.zhihu.com/api/v3/oauth/captcha?lang=cn‘
        else:
            api = ‘https://www.zhihu.com/api/v3/oauth/captcha?lang=en‘
        yield scrapy.Request(url=api, headers=self.headers, callback=self._is_need_captcha)

    def _is_need_captcha(self, response):
        show_captcha = re.search(r‘true‘, response.text)

        if show_captcha:
            yield scrapy.Request(url=response.url,
                                 headers=self.headers,
                                 method="PUT",
                                 callback=self._get_captcha)
        else:
            timestamp = str(int(time.time() * 1000))
            self.login_data.update({
                ‘captcha‘: "",
                ‘timestamp‘: timestamp,
                ‘signature‘: self._get_signature(timestamp)
            })
            yield scrapy.FormRequest(
                url=self.login_api,
                formdata=self.login_data,
                headers=self.headers,
                callback=self.check_login
            )

    def _get_captcha(self, response):
        json_data = json.loads(response.text)
        img_base64 = json_data[‘img_base64‘].replace(r‘\n‘, ‘‘)
        with open(‘./captcha.jpg‘, ‘wb‘) as f:
            f.write(base64.b64decode(img_base64))
        img = Image.open(‘./captcha.jpg‘)
        if self.login_data["lang"] == ‘cn‘:
            plt.imshow(img)
            print(‘点击所有倒立的汉字,按回车提交‘)
            points = plt.ginput(7)
            capt = json.dumps({‘img_size‘: [200, 44],
                               ‘input_points‘: [[i[0] / 2, i[1] / 2] for i in points]})
        else:
            img.show()
            capt = input(‘请输入图片里的验证码:‘)
        # 这里必须先把参数 POST 验证码接口
        yield scrapy.FormRequest(url=response.url,
                           formdata={‘input_text‘: capt},
                           headers=self.headers,
                           callback=self.captcha_login,
                           meta={"captcha":capt}
                           )

    def captcha_login(self, response):
        timestamp = str(int(time.time() * 1000))
        self.login_data.update({
            ‘captcha‘: response.meta[‘captcha‘],
            ‘timestamp‘: timestamp,
            ‘signature‘: self._get_signature(timestamp)
        })

        yield scrapy.FormRequest(
            url=self.login_api,
            formdata=self.login_data,
            headers=self.headers,
            callback=self.check_login
        )

    def check_login(self, response):
        yield scrapy.Request(
            url=self.login_url,
            headers=self.headers,
            callback=self.parse
        )

    def _get_signature(self, timestamp):
        """
        通过 Hmac 算法计算返回签名
        实际是几个固定字符串加时间戳
        :param timestamp: 时间戳
        :return: 签名
        """
        ha = hmac.new(b‘d1b964811afb40118a12068ff74a12f4‘, digestmod=hashlib.sha1)
        grant_type = self.login_data[‘grant_type‘]
        client_id = self.login_data[‘client_id‘]
        source = self.login_data[‘source‘]
        ha.update(bytes((grant_type + client_id + source + timestamp), ‘utf-8‘))
        return ha.hexdigest()

    def parse(self, response):
        print(response.text)

数据库设计

DROP TABLE IF EXISTS `zhihu_question`;
CREATE TABLE `zhihu_question` (
  `zhuhu_id` bigint(20) NOT NULL,
  `topics` varchar(255) DEFAULT NULL,
  `url` varchar(300) NOT NULL,
  `title` varchar(255) NOT NULL,
  `content` longtext NOT NULL,
  `create_time` datetime DEFAULT NULL,
  `update_time` datetime DEFAULT NULL,
  `answer_num` int(11) NOT NULL DEFAULT ‘0‘,
  `comments_num` int(11) NOT NULL DEFAULT ‘0‘,
  `watch_user_num` int(11) NOT NULL DEFAULT ‘0‘,
  `click_num` int(11) NOT NULL DEFAULT ‘0‘,
  `crawl_time` datetime NOT NULL,
  `crawl_update_time` datetime DEFAULT NULL,
  PRIMARY KEY (`zhuhu_id`)
) ENGINE=InnoDB DEFAULT CHARSET=utf8;
DROP TABLE IF EXISTS `zhihu_answer`;
CREATE TABLE `zhihu_answer` (
  `zhihu_id` bigint(20) NOT NULL,
  `url` varchar(255) NOT NULL,
  `question_id` bigint(20) NOT NULL,
  `author_id` varchar(100) DEFAULT NULL,
  `content` longtext NOT NULL,
  `praise_num` int(11) NOT NULL DEFAULT ‘0‘,
  `comments_num` int(11) NOT NULL DEFAULT ‘0‘,
  `create_time` datetime NOT NULL,
  `update_time` datetime NOT NULL,
  `crawl_time` datetime NOT NULL,
  `crawl_update_time` datetime DEFAULT NULL,
  PRIMARY KEY (`zhihu_id`)
) ENGINE=InnoDB DEFAULT CHARSET=utf8;

页面解析

    def parse(self, response):
        """
        提取出html页面中的所有url 并跟踪这些url进行一步爬取
        如果提取的url中格式为 /question/xxx 就下载之后直接进入解析函数
        """
        all_urls = response.css("a::attr(href)").extract()
        all_urls = [urljoin(response.url, url) for url in all_urls]
        all_urls = filter(lambda x: True if x.startswith("https") else False, all_urls)
        for url in all_urls:
            match_obj = re.match("(.*zhihu.com/question/(\d+))(/|$).*", url)
            if match_obj:
                # 如果提取到question相关的页面则下载后交由提取函数进行提取
                request_url = match_obj.group(1)
                yield scrapy.Request(request_url, headers=self.headers, callback=self.parse_question)
            else:
                # 如果不是question页面则直接进一步跟踪
                yield scrapy.Request(url, headers=self.headers, callback=self.parse)

    def parse_question(self, response):
        match_obj = re.match("(.*zhihu.com/question/(\d+))(/|$).*", response.url)
        if match_obj:
            question_id = int(match_obj.group(2))

        item_loader = ItemLoader(item=ZhihuQuestionItem(), response=response)
        item_loader.add_css("title", "h1.QuestionHeader-title::text")
        item_loader.add_css("content", ".QuestionHeader-detail")
        item_loader.add_value("url", response.url)
        item_loader.add_value("zhihu_id", question_id)
        item_loader.add_css("answer_num", ".List-headerText span::text")
        item_loader.add_css("comments_num", ".QuestionHeader-actions button::text")
        item_loader.add_css("watch_user_num", ".NumberBoard-value::text")
        item_loader.add_css("topics", ".QuestionHeader-topics .Popover div::text")

        question_item = item_loader.load_item()

        yield scrapy.Request(self.start_answer_url.format(question_id, 20, 0), headers=self.headers, callback=self.parse_answer)
        yield question_item

    def parse_answer(self, response):
        #处理question的answer
        ans_json = json.loads(response.text)
        is_end = ans_json["paging"]["is_end"]
        next_url = ans_json["paging"]["next"]

        #提取answer的具体字段
        for answer in ans_json["data"]:
            answer_item = ZhihuAnswerItem()
            answer_item["zhihu_id"] = answer["id"]
            answer_item["url"] = answer["url"]
            answer_item["question_id"] = answer["question"]["id"]
            answer_item["author_id"] = answer["author"]["id"] if "id" in answer["author"] else None
            answer_item["content"] = answer["content"] if "content" in answer else None
            answer_item["parise_num"] = answer["voteup_count"]
            answer_item["comments_num"] = answer["comment_count"]
            answer_item["create_time"] = answer["created_time"]
            answer_item["update_time"] = answer["updated_time"]
            answer_item["crawl_time"] = datetime.datetime.now()

            yield answer_item

        if not is_end:
            yield scrapy.Request(next_url, headers=self.headers, callback=self.parse_answer)

items

class ZhihuQuestionItem(scrapy.Item):
    #知乎的问题 item
    zhihu_id = scrapy.Field()
    topics = scrapy.Field()
    url = scrapy.Field()
    title = scrapy.Field()
    content = scrapy.Field()
    answer_num = scrapy.Field()
    comments_num = scrapy.Field()
    watch_user_num = scrapy.Field()
    click_num = scrapy.Field()
    crawl_time = scrapy.Field()

    def get_insert_sql(self):
        #插入知乎question表的sql语句
        insert_sql = """
            insert into zhihu_question(zhihu_id, topics, url, title, content, answer_num, comments_num,
              watch_user_num, click_num, crawl_time
              )
            VALUES (%s, %s, %s, %s, %s, %s, %s, %s, %s, %s)
            ON DUPLICATE KEY UPDATE content=VALUES(content), answer_num=VALUES(answer_num), comments_num=VALUES(comments_num),
              watch_user_num=VALUES(watch_user_num), click_num=VALUES(click_num)
        """
        zhihu_id = self["zhihu_id"][0]
        topics = ",".join(self["topics"])
        url = self["url"][0]
        title = "".join(self["title"])
        content = "".join(self["content"])
        answer_num = extract_num("".join(self["answer_num"]))
        comments_num = extract_num("".join(self["comments_num"]))

        if len(self["watch_user_num"]) == 2:
            watch_user_num = int(self["watch_user_num"][0])
            click_num = int(self["watch_user_num"][1])
        else:
            watch_user_num = int(self["watch_user_num"][0])
            click_num = 0

        crawl_time = datetime.datetime.now().strftime(SQL_DATETIME_FORMAT)

        params = (zhihu_id, topics, url, title, content, answer_num, comments_num,
                  watch_user_num, click_num, crawl_time)

        return insert_sql, params

class ZhihuAnswerItem(scrapy.Item):
    #知乎的问题回答item
    zhihu_id = scrapy.Field()
    url = scrapy.Field()
    question_id = scrapy.Field()
    author_id = scrapy.Field()
    content = scrapy.Field()
    parise_num = scrapy.Field()
    comments_num = scrapy.Field()
    create_time = scrapy.Field()
    update_time = scrapy.Field()
    crawl_time = scrapy.Field()

    def get_insert_sql(self):
        #插入知乎question表的sql语句
        insert_sql = """
            insert into zhihu_answer(zhihu_id, url, question_id, author_id, content, parise_num, comments_num,
              create_time, update_time, crawl_time
              ) VALUES (%s, %s, %s, %s, %s, %s, %s, %s, %s, %s)
              ON DUPLICATE KEY UPDATE content=VALUES(content), comments_num=VALUES(comments_num), parise_num=VALUES(parise_num),
              update_time=VALUES(update_time)
        """

        create_time = datetime.datetime.fromtimestamp(self["create_time"]).strftime(SQL_DATETIME_FORMAT)
        update_time = datetime.datetime.fromtimestamp(self["update_time"]).strftime(SQL_DATETIME_FORMAT)
        params = (
            self["zhihu_id"], self["url"], self["question_id"],
            self["author_id"], self["content"], self["parise_num"],
            self["comments_num"], create_time, update_time,
            self["crawl_time"].strftime(SQL_DATETIME_FORMAT),
        )

        return insert_sql, params

pipelines

    def do_insert(self, cursor, item):
        # 执行具体的插入
        # 根据不同的item 构建不同的sql语句并插入到mysql中
        insert_sql, params = item.get_insert_sql()
        cursor.execute(insert_sql, params)

原文地址:https://www.cnblogs.com/gaoyongjian/p/9905367.html

时间: 2024-10-11 15:53:58

scrapy爬取知乎问答的相关文章

scrapy 爬取知乎问题、答案 ,并异步写入数据库(mysql)

  python版本  python2.7 爬取知乎流程: 一 .分析 在访问知乎首页的时候(https://www.zhihu.com),在没有登录的情况下,会进行重定向到(https://www.zhihu.com/signup?next=%2F)这个页面, 爬取知乎,首先要完成登录操作,登陆的时候观察往那个页面发送了post或者get请求.可以利用抓包工具来获取登录时密码表单等数据的提交地址. 1.利用抓包工具,查看用户名密码数据的提交地址页就是post请求,将表单数据提交的网址,经过查看

利用 Scrapy 爬取知乎用户信息

思路:通过获取知乎某个大V的关注列表和被关注列表,查看该大V和其关注用户和被关注用户的详细信息,然后通过层层递归调用,实现获取关注用户和被关注用户的关注列表和被关注列表,最终实现获取大量用户信息. 一.新建一个scrapy项目 scrapy startproject zhihuuser 移动到新建目录下: cd zhihuuser 新建spider项目: scrapy genspider zhihu 二.这里以爬取知乎大V轮子哥的用户信息来实现爬取知乎大量用户信息. a) 定义 spdier.p

运维学python之爬虫高级篇(七)scrapy爬取知乎关注用户存入mongodb

首先,祝大家开工大吉!本篇将要介绍的是从一个用户开始,通过抓关注列表和粉丝列表,实现用户的详细信息抓取并将抓取到的结果存储到 MongoDB. 1 环境需求 基础环境沿用之前的环境,只是增加了MongoDB(非关系型数据库)和PyMongo(Python 的 MongoDB 连接库),默认我认为大家都已经安装好并启动 了MongoDB 服务. 项目创建.爬虫创建.禁用ROBOTSTXT_OBEY设置略(可以参考上一篇) 2 测试爬虫效果 我这里先写一个简单的爬虫,爬取用户的关注人数和粉丝数,代码

使用scrapy爬取知乎图片

settings.py # -*- coding: utf-8 -*- # Scrapy settings for zhihutupian project # # For simplicity, this file contains only settings considered important or # commonly used. You can find more settings consulting the documentation: # # https://doc.scrap

scrapy-redis实现分布式爬取知乎问答

原文地址:https://www.cnblogs.com/byadmin/p/12215657.html

Python爬虫爬取知乎小结

博客首发至Marcovaldo's blog (http://marcovaldong.github.io/) 最近学习了一点网络爬虫,并实现了使用python来爬取知乎的一些功能,这里做一个小的总结.网络爬虫是指通过一定的规则自动的从网上抓取一些信息的程序或脚本.我们知道机器学习和数据挖掘等都是从大量的数据出发,找到一些有价值有规律的东西,而爬虫则可以帮助我们解决获取数据难的问题,因此网络爬虫是我们应该掌握的一个技巧. python有很多开源工具包供我们使用,我这里使用了requests.Be

Scrapy爬取美女图片 (原创)

有半个月没有更新了,最近确实有点忙.先是华为的比赛,接着实验室又有项目,然后又学习了一些新的知识,所以没有更新文章.为了表达我的歉意,我给大家来一波福利... 今天咱们说的是爬虫框架.之前我使用python爬取慕课网的视频,是根据爬虫的机制,自己手工定制的,感觉没有那么高大上,所以我最近玩了玩 python中强大的爬虫框架Scrapy. Scrapy是一个用 Python 写的 Crawler Framework ,简单轻巧,并且非常方便.Scrapy 使用 Twisted 这个异步网络库来处理

Scrapy爬取美女图片续集 (原创)

上一篇咱们讲解了Scrapy的工作机制和如何使用Scrapy爬取美女图片,而今天接着讲解Scrapy爬取美女图片,不过采取了不同的方式和代码实现,对Scrapy的功能进行更深入的运用. 在学习Scrapy官方文档的过程中,发现Scrapy自身实现了图片和文件的下载功能,不需要咱们之前自己实现图片的下载(不过原理都一样). 在官方文档中,我们可以看到下面一些话:Scrapy为下载item中包含的文件(比如在爬取到产品时,同时也想保存对应的图片)提供了一个可重用的 item pipelines .

第三百三十四节,web爬虫讲解2—Scrapy框架爬虫—Scrapy爬取百度新闻,爬取Ajax动态生成的信息

第三百三十四节,web爬虫讲解2-Scrapy框架爬虫-Scrapy爬取百度新闻,爬取Ajax动态生成的信息 crapy爬取百度新闻,爬取Ajax动态生成的信息,抓取百度新闻首页的新闻标题和rul地址 有多网站,当你浏览器访问时看到的信息,在html源文件里却找不到,由得信息还是滚动条滚动到对应的位置后才显示信息,那么这种一般都是 js 的 Ajax 动态请求生成的信息 我们以百度新闻为列: 1.分析网站 首先我们浏览器打开百度新闻,在网页中间部分找一条新闻信息 然后查看源码,看看在源码里是否有