webmagic 增量爬取

 webmagic  是一个很好并且很简单的爬虫框架,其教程网址:http://my.oschina.net/flashsword/blog/180623

  webmagic参考了scrapy的模块划分,分为Spider(整个爬虫的调度框架)、Downloader(页面下载)、PageProcessor(链接提取和页面分析)、Scheduler(URL管理)、Pipeline(离线分析和持久化)几部分。只不过scrapy通过middleware实现扩展,而webmagic则通过定义这几个接口,并将其不同的实现注入主框架类Spider来实现扩展。

关于Scheduler(URL管理) 最基本的功能是实现对已经爬取的URL进行标示。

目前scheduler有三种实现方式:

  1)内存队列

  2)文件队列

  3)redis队列

文件队列保存URL,能实现中断后,继续爬取时,实现增量爬取。

  如果我只有一个主页的URL,比如:http://www.cndzys.com/yundong/。如果直接引用webmagic的FileCacheQueueScheduler的话,你会发现第二次启动的时候,什么也爬不到。可以说第二次启动基本不爬取数据了。因为FileCacheQueueScheduler 把http://www.cndzys.com/yundong/ 记录了,然后不再进行新的爬取。虽然是第二次增量爬取,但还是需要保留某些URL重新爬取,以保证爬取结果是我们想要的。我们可以重写FileCacheQueueScheduler里的比较方法。

package com.fortunedr.crawler.expertadvice;

import java.io.BufferedReader;
import java.io.Closeable;
import java.io.File;
import java.io.FileNotFoundException;
import java.io.FileReader;
import java.io.FileWriter;
import java.io.IOException;
import java.io.PrintWriter;
import java.util.LinkedHashSet;
import java.util.Set;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.Executors;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;

import org.apache.commons.io.IOUtils;
import org.apache.commons.lang3.math.NumberUtils;

import us.codecraft.webmagic.Request;
import us.codecraft.webmagic.Task;
import us.codecraft.webmagic.scheduler.DuplicateRemovedScheduler;
import us.codecraft.webmagic.scheduler.MonitorableScheduler;
import us.codecraft.webmagic.scheduler.component.DuplicateRemover;

/**
 * Store urls and cursor in files so that a Spider can resume the status when shutdown.<br>
 *增加去重的校验,对需要重复爬取的网址进行正则过滤
 * @author [email protected] <br>
 * @since 0.2.0
 */
public class SpikeFileCacheQueueScheduler extends DuplicateRemovedScheduler implements MonitorableScheduler,Closeable {

    private String filePath = System.getProperty("java.io.tmpdir");

    private String fileUrlAllName = ".urls.txt";

    private Task task;

    private String fileCursor = ".cursor.txt";

    private PrintWriter fileUrlWriter;

    private PrintWriter fileCursorWriter;

    private AtomicInteger cursor = new AtomicInteger();

    private AtomicBoolean inited = new AtomicBoolean(false);

    private BlockingQueue<Request> queue;

    private Set<String> urls;

    private ScheduledExecutorService flushThreadPool;

    private String regx;

    public SpikeFileCacheQueueScheduler(String filePath) {
        if (!filePath.endsWith("/") && !filePath.endsWith("\\")) {
            filePath += "/";
        }
        this.filePath = filePath;
        initDuplicateRemover();
    }

    private void flush() {
        fileUrlWriter.flush();
        fileCursorWriter.flush();
    }

    private void init(Task task) {
        this.task = task;
        File file = new File(filePath);
        if (!file.exists()) {
            file.mkdirs();
        }
        readFile();
        initWriter();
        initFlushThread();
        inited.set(true);
        logger.info("init cache scheduler success");
    }

    private void initDuplicateRemover() {
        setDuplicateRemover(
                new DuplicateRemover() {
                    @Override
                    public boolean isDuplicate(Request request, Task task) {
                        if (!inited.get()) {
                            init(task);
                        }
                        boolean temp=false;
                        String url=request.getUrl();
                        temp=!urls.add(url);//原来验证URL是否存在
                        //正则匹配
                        if(url.matches(regx)){//二次校验,如果符合我们需要重新爬取的,返回false。可以重新爬取
                            temp=false;
                        }
                        return temp;
                    }

                    @Override
                    public void resetDuplicateCheck(Task task) {
                        urls.clear();
                    }

                    @Override
                    public int getTotalRequestsCount(Task task) {
                        return urls.size();
                    }
                });
    }

    private void initFlushThread() {
        flushThreadPool = Executors.newScheduledThreadPool(1);
        flushThreadPool.scheduleAtFixedRate(new Runnable() {
            @Override
            public void run() {
                flush();
            }
        }, 10, 10, TimeUnit.SECONDS);
    }

    private void initWriter() {
        try {
            fileUrlWriter = new PrintWriter(new FileWriter(getFileName(fileUrlAllName), true));
            fileCursorWriter = new PrintWriter(new FileWriter(getFileName(fileCursor), false));
        } catch (IOException e) {
            throw new RuntimeException("init cache scheduler error", e);
        }
    }

    private void readFile() {
        try {
            queue = new LinkedBlockingQueue<Request>();
            urls = new LinkedHashSet<String>();
            readCursorFile();
            readUrlFile();
            // initDuplicateRemover();
        } catch (FileNotFoundException e) {
            //init
            logger.info("init cache file " + getFileName(fileUrlAllName));
        } catch (IOException e) {
            logger.error("init file error", e);
        }
    }

    private void readUrlFile() throws IOException {
        String line;
        BufferedReader fileUrlReader = null;
        try {
            fileUrlReader = new BufferedReader(new FileReader(getFileName(fileUrlAllName)));
            int lineReaded = 0;
            while ((line = fileUrlReader.readLine()) != null) {
                urls.add(line.trim());
                lineReaded++;
                if (lineReaded > cursor.get()) {
                    queue.add(new Request(line));
                }
            }
        } finally {
            if (fileUrlReader != null) {
                IOUtils.closeQuietly(fileUrlReader);
            }
        }
    }

    private void readCursorFile() throws IOException {
        BufferedReader fileCursorReader = null;
        try {
            fileCursorReader = new BufferedReader(new FileReader(getFileName(fileCursor)));
            String line;
            //read the last number
            while ((line = fileCursorReader.readLine()) != null) {
                cursor = new AtomicInteger(NumberUtils.toInt(line));
            }
        } finally {
            if (fileCursorReader != null) {
                IOUtils.closeQuietly(fileCursorReader);
            }
        }
    }

    public void close() throws IOException {
        flushThreadPool.shutdown();
        fileUrlWriter.close();
        fileCursorWriter.close();
    }

    private String getFileName(String filename) {
        return filePath + task.getUUID() + filename;
    }

    @Override
    protected void pushWhenNoDuplicate(Request request, Task task) {
        queue.add(request);
        fileUrlWriter.println(request.getUrl());
    }

    @Override
    public synchronized Request poll(Task task) {
        if (!inited.get()) {
            init(task);
        }
        fileCursorWriter.println(cursor.incrementAndGet());
        return queue.poll();
    }

    @Override
    public int getLeftRequestsCount(Task task) {
        return queue.size();
    }

    @Override
    public int getTotalRequestsCount(Task task) {
        return getDuplicateRemover().getTotalRequestsCount(task);
    }

    public String getRegx() {
        return regx;
    }
    /**
     * 设置保留需要重复爬取url的正则表达式
     * @param regx
     */
    public void setRegx(String regx) {
        this.regx = regx;
    }

}

那么在爬虫时就引用自己特定的FileCacheQueueScheduler就可以

spider.addRequest(requests);
        SpikeFileCacheQueueScheduler file=new SpikeFileCacheQueueScheduler(filePath);
        file.setRegx(regx);//http://www.cndzys.com/yundong/(index)?[0-9]*(.html)?
        spider.setScheduler(file );

  这样就实现了增量爬取。

优化的想法:一般某个网站的内容列表都是首页是最新内容。上面的方式是可以实现增量爬取,但是还是需要爬取很多“无用的”列表页面。

能不能实现,当爬取到上次"最新"URL之后就不再爬取。就是不用爬取其他多余的leib

时间: 2024-10-02 14:21:35

webmagic 增量爬取的相关文章

scrapy-deltafetch实现增量爬取

详情:https://blog.csdn.net/zsl10/article/details/52885597 安装:Berkeley DB # cd /usr/local/src # wget http://download.oracle.com/berkeley-db/db-4.7.25.NC.tar.gz # tar zxvf db-4.7.25.NC.tar.gz # cd build_unix # ../dist/configure # make&&make install 安装

爬取电影天堂最新电影的名称和下载链接(增量爬取mysql存储版)

这次的程序是在上次的基础上进行修改,把持久化储存方式改成mysql,并增加了断点续爬功能. import requests import re from fake_useragent import UserAgent import random import time import pymysql from hashlib import md5 from lxml import etree class DianyingtiantangSpider(object): def __init__(se

为什么列表增量爬取要单线程?

适用于对数据同步要求高,每天的增量数据不太大的情况. 1.防止漏爬,单线程时刚爬完第1页有新数据产生,在爬第2页的时候首条数据已经爬过,出现重复而已,不会丢失:多线程时比如3个线程,每页10条分别为1-10,11-20,21-30,第3页被先爬取了,此时服务器新增了一条,另一个线程刚好抓取第2页,原来的第20条数据被顶到第3页了,可第3页已经爬过了,这样就漏掉了一条数据. 2.增量停止,既然是增量爬,就得知道什么时候停止,比如根据时间.记录数等,就是得知道上次爬到哪了,如果列表没有规律那就瞎了,

nutch的定时增量爬取

译文来着: http://wiki.apache.org/nutch/Crawl 介绍(Introduction) 注意:脚本中没有直接使用Nutch的爬去命令(bin/nutch crawl或者是"Crawl"类),所以url过滤的实现并不依赖"conf/crawl-urlfilter.txt",而是应该在"regex-urlfilter.txt"中设定实现. 爬取步骤(Steps) 脚本大致分为8部: Inject URLs(注入urls)

webmagic之爬取数据存储为TXT

1.获取标题建立文件TXT 创建以标题命名的TXT public static void create(String title) throws IOException { Configuration config = new Configuration(); config.set("fs.default.name", "hdfs://192.168.146.110:9000"); DistributedFileSystem fs = (DistributedFil

爬虫 + 数据分析 - 7 CrawlSpider(全站爬取), 分布式, 增量式爬虫

一.全站爬取(CrawlSpider) 1.基本概念 作用:就是用于进行全站数据的爬取 - CrawlSpider就是Spider的一个子类 - 如何新建一个基于CrawlSpider的爬虫文件 - scrapy genspider -t crawl xxx www.xxx.com - LinkExtractor连接提取器:根据指定规则(正则)进行连接的提取 - Rule规则解析器:将链接提取器提取到的链接进行请求发送,然后对获取的页面数据进行 指定规则(callback)的解析 - 一个链接提

垂直爬虫爬取分页数据

为了爬取全部详情页,一般从列表页开始多线程并发爬取,并发线程数受网络环境(一般表现为超时)和服务器性能影响(一般表现为http响应500). 1.第一页作为抓取入口url,解析出详情页url及其他分页url,详情页优先爬,避免缓存的url过多: 2.查看总共多少页(如果分页中没有总共多少页,通过总记录数/每页记录数计算出多少页),爬取过程不解析分页url,一次性添加全部分页url, 当然也可以在爬第一页的时候添加全部分页,每爬完一页解析出详情页url,详情页优先爬: 3.有些网站在详情页提供上一

爬取Ajax动态加载网页

常见的反爬机制及处理方式 1.Headers反爬虫 :Cookie.Referer.User-Agent 解决方案: 通过F12获取headers,传给requests.get()方法 2.IP限制 :网站根据IP地址访问频率进行反爬,短时间内进制IP访问 解决方案: 1.构造自己IP代理池,每次访问随机选择代理,经常更新代理池 2.购买开放代理或私密代理IP 3.降低爬取的速度 3.User-Agent限制 :类似于IP限制 解决方案: 构造自己的User-Agent池,每次访问随机选择 5.

爬虫5 scrapy框架2 全站爬取cnblogs, scarpy请求传参, 提高爬取效率, 下载中间件, 集成selenium, fake-useragent, 去重源码分析, 布隆过滤器, 分布式爬虫, java等语言概念补充, bilibili爬视频参考

1 全站爬取cnblogs # 1 scrapy startproject cnblogs_crawl # 2 scrapy genspider cnblogs www.cnblogs.com 示例: # cnblogs_crawl/cnblogs_crawl/spiders/cnblogs.py import scrapy from cnblogs_crawl.items import CnblogsCrawlItem from scrapy.http import Request class