爬虫程序定时执行和监控示例
简介
我们的爬虫程序在执行过程中,可能需要满足以下条件:
1、可以每天定时执行,爬取指定电商等网站内容。
2、可以对分布式爬虫进行监控,当爬虫程序挂掉之后,可以通知管理员。
下面我们来介绍如何实现这两个功能。
注意:
这里我们主要演示定时执行和监控功能,所以爬虫程序只是伪代码。如果想要详细了解如何实现网络爬虫,可以参考如下一些文章:
http://blog.csdn.net/u011204847/article/category/6210222
定时执行
Quartz调度框架
简介:
Quartz是一个完全由java编写的开源作业调度框架。尽管Quartz框架整合了许多额外功能, 但就其简易形式,你会发现它非常易用。简单地创建一个实现org.quartz.Job接口的java类。
Job接口包含唯一的方法:
public void execute(JobExecutionContext context) throws JobExecutionException;
在你的Job接口实现类里面,添加一些逻辑到execute()方法。一旦你配置好Job实现类并设定好调度时间表,Quartz将密切注意剩余时间。当调度程序确定该是通知你的作业的时候,Quartz框架将调用你Job实现类(作业类)上的execute()方法并允许做它该做的事情。无需报告任何东西给调度器或调用任何特定的东西。仅仅执行任务和结束任务即可。如果配置你的作业在随后再次被调用,Quartz框架将在恰当的时间再次调用它。
使用格式
首先Quartz Cron 表达式支持到七个域:
名称 是否必须 允许值 特殊字符
秒 是 0-59 , - * /
分 是 0-59 , - * /
时 是 0-23 , - * /
日 是 1-31 , - * ? / L W
月 是 1-12 或 JAN-DEC , - * /
周 是 1-7 或 SUN-SAT , - * ? / L #
年 否 空 或 1970-2099 , - * /
结构,以这个为例
0 11 11 11 11 ? 每年的11月11号 11点11分触发(光棍节)
可以看到基本结构是 秒_分_小时_日_月_[周]_[年] 后面的周和年是可选的
其次,通配符,主要的有星号(*);问号(?);减号(-);逗号(,);斜杠(/);L字母;W字母;井号(#).
通配符含义:
星号:表示任意时刻
问号:只能在日或周字段上使用,如官方文档解释的那样,问号(?)的作用是指明该字段‘没有特定的值
减号:范围,如 1-5秒
逗号:列表,如 1,5,10 秒
斜杠:等步长序列,如3/13秒 表示 3,16,29,42,55,3,16...
L:仅在日和周上支持,表示允许的最后一个值,注意不要让范围和列表与L连用
W:工作日
井号:为给定月份指定具体的工作日实例。把“MON#2”放在周内日期字段中,表示把任务安排在当月的第二个星期一。
使用示例:
代码示例
Pom依赖:
<dependency> <groupId>org.quartz-scheduler</groupId> <artifactId>quartz</artifactId> <version>1.8.4</version> </dependency>
模拟的爬虫程序:
import java.text.SimpleDateFormat; import java.util.Date; //添加quartz依赖 import org.quartz.Job; import org.quartz.JobExecutionContext; import org.quartz.JobExecutionException; //通过quartz实现定时任务需要实现Job接口 public class SpiderTaobao implements Job{ //重写Job中的execute方法 public void execute(JobExecutionContext arg0) throws JobExecutionException { //设置时间输出格式 SimpleDateFormat simpleDateFormat = new SimpleDateFormat("yyyy年MM月dd日 HH时mm分ss秒"); String time = simpleDateFormat.format(new Date()); //打印爬虫执行时间 System.out.println(time); //模拟爬虫程序 System.out.println("爬取淘宝数据。。。。。。。。"); } }
定时器实现爬虫程序定时:
//添加quartz依赖 import org.quartz.CronTrigger; import org.quartz.JobDetail; import org.quartz.Scheduler; import org.quartz.impl.StdSchedulerFactory; public class SpiderTimer { public static void main(String[] args) { try { //1获取一个默认调度器 Scheduler defaultScheduler = StdSchedulerFactory.getDefaultScheduler(); //2开启调度器 defaultScheduler.start(); //封装要调度的任务 String simpleName = SpiderTaobao.class.getSimpleName(); JobDetail jobDetail = new JobDetail(simpleName,Scheduler.DEFAULT_GROUP, SpiderTaobao.class); //表示设置定时操作(每隔5秒执行一次) CronTrigger trigger = new CronTrigger(simpleName,Scheduler.DEFAULT_GROUP, "0/5 * * * * ?"); //3执行调度任务 defaultScheduler.scheduleJob(jobDetail, trigger); } catch (Exception e) { e.printStackTrace(); } } }
执行SpiderTimer 的打印结果:
程序监控
Zookeeper环境
我们使用Zookeeper对爬虫程序进行监控。ZooKeeper是一个分布式的,开放源码的分布式应用程序协调服务,是Google的Chubby一个开源的实现,是Hadoop和Hbase的重要组件。它是一个为分布式应用提供一致性服务的软件,提供的功能包括:配置维护、域名服务、分布式同步、组服务等。
Zookeeper的分布式安装配置和一些命令可以参考:http://blog.csdn.net/u011204847/article/details/51327371
Zookeeper集群启动:
切换到集群所有主机Zookeeper的bin目录下,然后
./zkServer.sh start //每台主机都开启Zookeeper
开启Zookeeper客户端:
切换到一台主机Zookeeper的bin目录下,然后
./zkCli.sh //启动客户端
查看Zookeeper集群中节点信息:(创建临时节点后,在有效时间内,可以像以下这样查看临时节点)
监控流程
1、我们主要使用Zookeeper中临时节点的特性实现对爬虫程序的监控。临时节点默认生存期为40秒(也可以在代码中自定义)。
2、我们创建一个监视器程序,监视器永不停止并且一直监控Zookeeper中临时节点是否存在,如果爬虫程序挂了,那么临时节点会在指定时间消失,监视器发现没有临时节点或者有新增节点后,可以执行代码(例如发邮箱等)通知管理员。
程序示例
注意程序运行需要先开启Zookeeper集群
Pom依赖:
<dependency> <groupId>org.apache.curator</groupId> <artifactId>curator-framework</artifactId> <version>2.8.0</version> </dependency>
爬虫程序SpiderTaobao:
import java.net.InetAddress; import java.text.SimpleDateFormat; import java.util.Date; //添加curator依赖 import org.apache.curator.RetryPolicy; import org.apache.curator.framework.CuratorFramework; import org.apache.curator.framework.CuratorFrameworkFactory; import org.apache.curator.retry.ExponentialBackoffRetry; import org.apache.zookeeper.CreateMode; import org.apache.zookeeper.ZooDefs.Ids; public class SpiderTaobao { public SpiderTaobao(){ //指定zk集群的地址 String connectString = "192.168.33.130:2181,192.168.33.131:2181,192.168.33.132:2181"; //1000 :代表是重试时间间隔 3:表示是重试次数 RetryPolicy retryPolicy = new ExponentialBackoffRetry(1000, 3); //使用curator创建一个zk链接 int sessionTimeoutMs = 2000;//这个值必须在4s--40s之间,表示是链接失效的时间 int connectionTimeoutMs = 1000;//链接超时时间 CuratorFramework client = CuratorFrameworkFactory.newClient(connectString, sessionTimeoutMs , connectionTimeoutMs , retryPolicy); //启动链接 client.start(); try { InetAddress localHost = InetAddress.getLocalHost(); String ip = localHost.getHostAddress(); client.create() .creatingParentsIfNeeded()//如果父节点不存在,则创建 .withMode(CreateMode.EPHEMERAL)//指定节点类型 .withACL(Ids.OPEN_ACL_UNSAFE)//指定节点的权限信息 .forPath("/Spider/"+ip);//指定节点名称 } catch (Exception e) { e.printStackTrace(); } } public static void main(String[] args) { SpiderTaobao spiderTaobao = new SpiderTaobao(); //设置时间输出格式 SimpleDateFormat simpleDateFormat = new SimpleDateFormat("yyyy年MM月dd日 HH时mm分ss秒"); String time = simpleDateFormat.format(new Date()); //打印爬虫执行时间 System.out.println(time); //模拟爬虫程序 System.out.println("爬取淘宝数据。。。。。。。。"); System.out.println("爬取结束"); } }
监视器SpiderWatcher:
import java.text.SimpleDateFormat; import java.util.Date; import java.util.List; import org.apache.curator.RetryPolicy; import org.apache.curator.framework.CuratorFramework; import org.apache.curator.framework.CuratorFrameworkFactory; import org.apache.curator.retry.ExponentialBackoffRetry; import org.apache.zookeeper.WatchedEvent; import org.apache.zookeeper.Watcher; /** * 创建一个监视器,这个监视器需要实现watcher接口 * 接口中有一个process方法。 * 当监视器发现监视的节点发生变化的时候,这个process方法会被调用 * * * 所以这个监视器是一个守护进程,也就是说一个永远不会停止的进程,类似于死循环 * */ public class SpiderWatcher implements Watcher { CuratorFramework client; List<String> childrenList; public SpiderWatcher() { //在这需要指定监视的节点 //指定zk集群的地址 String connectString = "192.168.33.130:2181,192.168.33.131:2181,192.168.33.132:2181"; //1000 :代表是重试时间间隔 3:表示是重试次数 RetryPolicy retryPolicy = new ExponentialBackoffRetry(1000, 3); //使用curator创建一个zk链接 int sessionTimeoutMs = 2000;//这个值必须在4s--40s之间,表示是链接失效的时间 int connectionTimeoutMs = 1000;//链接超时时间 client = CuratorFrameworkFactory.newClient(connectString, sessionTimeoutMs , connectionTimeoutMs , retryPolicy); //启动链接 client.start(); try { //使用spiderwatcher监视器监视spider节点下面的所有子节点的变化情况(向spider节点注册监视器,这个监视器需要重复重复注册) childrenList = client.getChildren().usingWatcher(this).forPath("/Spider"); } catch (Exception e) { e.printStackTrace(); } } public void process(WatchedEvent event) { try { //重复注册监视器 List<String> newChildrenList = client.getChildren().usingWatcher(this).forPath("/Spider"); for (String node : childrenList) { if(!newChildrenList.contains(node)){ //设置时间输出格式 SimpleDateFormat simpleDateFormat = new SimpleDateFormat("yyyy年MM月dd日 HH时mm分ss秒"); String time = simpleDateFormat.format(new Date()); //打印爬虫执行时间 System.out.println(time); System.out.println("节点消失:"+node); //给管理员发送短信,或者邮件 //发短信的话可以使用一些第三方平台 云片网 //发邮件的话使用 javamail } } for (String node : newChildrenList) { if(!childrenList.contains(node)){ //设置时间输出格式 SimpleDateFormat simpleDateFormat = new SimpleDateFormat("yyyy年MM月dd日 HH时mm分ss秒"); String time = simpleDateFormat.format(new Date()); //打印爬虫执行时间 System.out.println(time); System.out.println("节点新增:"+node); } } this.childrenList = newChildrenList; } catch (Exception e) { e.printStackTrace(); } } public void start(){ //为了保证让这个方法一直运行,因为这是一个监视器,不可以挂掉 while(true){ ; } } public static void main(String[] args) { SpiderWatcher spiderWatcher = new SpiderWatcher(); spiderWatcher.start(); } }
监视器启动后状态:
监视器捕获到的程序状态: