本文介绍一个简单的多线程并发爬虫,这里说的简单是指爬取的数据规模不大,单机运行,并且不使用数据库,但保证多线程下的数据的一致性,并且能让爬得正起劲的爬虫停下来,而且能保存爬取状态以备下次继续。
爬虫实现的步骤基本如下:
- 分析网页结构,选取自己感兴趣的部分;
- 建立两个Buffer,一个用于保存已经访问的URL,一个用户保存带访问的URL;
- 从待访问的Buffer中取出一个URL来爬取,保存这个URL中感兴趣的信息;并将这个URL加入已经访问的Buffer中,然后将这个URL中的所有外链URLs中没有被访问过的URL加到待访问Buffer;
- 只要待访问的Buffer不为空,重复上一步。
这次是为了给博客园的用户进行一次pagerank排名,爬取了博客园各个用户的粉丝与关注者。博客园的用户用17万多个。爬取的页面是http://home.cnblogs.com/u/+userId,每个用户的url只需要用用户的id表示就可以了,用户id按平均10B来计算,保存所有用户也只需1.7Mb内存。因此我把两个Buffer都放在内存中。
这个项目的就四个java文件,结构如下:
下面对整个爬虫的实现过程进行详细的介绍。
一、登录
要获取用户的粉丝与关注,必须先登录,博客园的模拟登陆算是比较简单,找到登录时要上传的参数,然后Pos发送即登录成功,可以使用Chrome的工具,打开登录页面,调好账号和密码后,按F12弹出工具,按登录就能看到要传的参数了,再POST一个这些参数就好了。
我之前是使用这样的方式,后来用使用Jsoup解析的参数,代码实现如下:
1 /**
2 * 使用Joup解析登录参数,然后POST发送参数实现登录
3 *
4 * @throws UnsupportedEncodingException
5 * @throws IOException
6 */
7 private static void login() throws UnsupportedEncodingException,
8 IOException {
9 CookieHandler.setDefault(new CookieManager());
10 // 获取登录页面
11 String page = getPage(LOGIN_URL);
12 // 从登录去取出参数,并填充账号和密码
13 Document doc = Jsoup.parse(page);
14 // 取登录表格
15 Element loginform = doc.getElementById("frmLogin");
16 Elements inputElements = loginform.getElementsByTag("input");
17 List<String> paramList = new ArrayList<String>();
18 for (Element inputElement : inputElements) {
19 String key = inputElement.attr("name");
20 String value = inputElement.attr("value");
21 if (key.equals("tbUserName"))
22 value = Test.Name;
23 else if (key.equals("tbPassword"))
24 value = Test.passwd;
25 paramList.add(key + "=" + URLEncoder.encode(value, "UTF-8"));
26 }
27 // 封装请求参数
28 StringBuilder para = new StringBuilder();
29 for (String param : paramList) {
30 if (para.length() == 0) {
31 para.append(param);
32 } else {
33 para.append("&" + param);
34 }
35 }
36 // POST发送登录
37 String result = sendPost(LOGIN_URL, para.toString());
38 if (!result.contains("followees")) {
39 cookies = null;
40 System.out.println("登录失败");
41 } else
42 System.out.println("登录成功");
43 }
二、获取粉丝与关注
登录成功就可以爬取粉丝和关注了,关注在http://home.cnblogs.com/u/userid/followees/链接中,而粉丝在http://home.cnblogs.com/u/userid/followers/,两个网页结构基本相同,只需要把选择一下followees(被关注者)和(followers)关注者,用Jsoup解析avatar_list中avatar_name就好了,代码如下:
1 /**
2 * 获取一页中的关注or粉丝
3 *
4 * @param pageHtml
5 * @return
6 */
7
8 private List<String> getOnePageFriends(Document doc) {
9 List<String> firends = new ArrayList<String>();
10 Elements inputElements = doc.getElementsByClass("avatar_name");
11 for (Element inputElement : inputElements) {
12 Elements links = inputElement.getElementsByTag("a");
13 for (Element link : links) {
14 //从href中解析出用户id
15 String href = link.attr("href");
16 firends.add(href.substring(3, href.length() - 1));
17 }
18 }
19 return firends;
20 }
每一页显示50个粉丝or关注者,需要分页爬取,获取下一页跟获取用户粉丝差不多,找到元素就好。
三、爬取单个用户
爬取的一个用户的过程就是先分页爬取粉丝,再爬取关注者,然后把爬过的用户放入访问Buffer中,再把爬到的用户放到未访问队列中。
1 @Override
2 public void run() {
3 while (stop.get() == false) {
4 // 取出一个待访问
5 String userId = mUserBuffer.pickOne();
6 try {
7 // 爬取粉丝
8 List<String> fans = crawUser(userId, "/followers");
9 // 爬取关注者
10 List<String> heros = crawUser(userId, "/followees");
11 // 只需要保持粉丝关系即可
12 StringBuilder sb = new StringBuilder(userId).append("\t");
13 for (String friend : fans) {
14 sb.append(friend).append("\t");
15 }
16 sb.deleteCharAt(sb.length() - 1).append("\n");
17 saver.save(sb.toString());
18 // 被关注者应该放进队列里面,以供下次爬取他的粉丝
19 fans.addAll(heros);
20 mUserBuffer.addUnCrawedUsers(fans);
21 } catch (Exception e) {
22 saver.log(e.getMessage());
23 // 访问错误时,放入访问出错的队列中,以备以后重新访问。
24 mUserBuffer.addErrorUser(userId);
25 }
26 }
一页一页爬取单个用户如下:
1 /**
2 * 爬取用户,根据tag来决定是爬该用户关注的人,还是该用户的粉丝
3 *
4 * @param userId
5 * @return
6 * @throws IOException
7 */
8 private List<String> crawUser(String userId, String tag) throws IOException {
9 //构造URL
10 StringBuilder urlBuilder = new StringBuilder(USER_HOME);
11 urlBuilder.append("/u/").append(userId).append(tag);
12 //请求页面
13 String page = getPage(urlBuilder.toString());
14 Document doc = Jsoup.parse(page);
15 List<String> friends = new ArrayList<String>();
16 //爬取第一页
17 friends.addAll(getOnePageFriends(doc));
18 String nextUrl = null;
19 //不断地爬取下一页
20 while ((nextUrl = getNextUrl(doc)) != null) {
21 page = getPage(nextUrl);
22 doc = Jsoup.parse(page);
23 friends.addAll(getOnePageFriends(doc));
24 }
25 return friends;
26 }
整个爬虫结构就是这样了:
1 public class UserCrawler implements Runnable {
2 // 停止任务标志
3 private static AtomicBoolean stop;
4 // 当前爬虫的id
5 private int id;
6 // 用户缓存
7 private UserBuffer mUserBuffer;
8 // 日志与粉丝保存工具
9 private Saver saver;
10
11 static {
12 stop = new AtomicBoolean(false);
13 try {
14 // 登录一次即可
15 login();
16 // 保存数据线程先启动
17 Saver.getInstance().start();
18 } catch (IOException e) {
19 e.printStackTrace();
20 }
21 // new Thread(new CommandListener()).start();
22 }
23
24 public UserCrawler(UserBuffer userBuffer) {
25 mUserBuffer = userBuffer;
26 mUserBuffer.crawlerCountIncrease();
27 id = c++;
28 saver = Saver.getInstance();
29 }
30
31 @Override
32 public void run() {
33 if (id > 0) {
34 // 等第一个线程启动一段时候再开始新的线程
35 try {
36 TimeUnit.SECONDS.sleep(20 + id);
37 } catch (InterruptedException e) {
38 e.printStackTrace();
39 }
40 }
41 System.out.println("UserCrawler " + id + " start");
42 int retry = 3;// 重置尝试次数
43 while (stop.get() == false) {
44 // 取出一个待访问
45 String userId = mUserBuffer.pickOne();
46 if (userId == null) {// 队列元素已经为空
47 retry--;// 重试3次
48 if (retry <= 0)
49 break;
50 continue;
51 }
52 ...//爬取用户
53 }
54 System.out.println("UserCrawler " + id + " stop");
55 // 当前线程停止了
56 mUserBuffer.crawlerCountDecrease();
57 }
58
59 private List<String> crawUser(String userId, String tag) throws IOException {
60
61 }
62
63 /**
64 * 获取一页中的关注or粉丝
65 *
66 * @param pageHtml
67 * @return
68 */
69
70 private List<String> getOnePageFriends(Document doc) {
71 ...
72 }
73
74 /**
75 * 获取下一页的地址
76 *
77 * @param doc
78 * @return
79 */
80 private String getNextUrl(Document doc) {
81
82 }
83
84 private static String getPage(String pageUrl) throws IOException {
85
86 }
87
88 /***
89 * 终止所有爬虫任务
90 */
91 public static void stop() {
92 System.out.println("正在终止...");
93 stop.compareAndSet(false, true);
94 UserBuffer.getInstance().prepareForStop();
95 }
96
97 private static void login() throws UnsupportedEncodingException,
98 IOException {
99 ...
100 }
101
102
103 private static String sendPost(String url, String postParams)
104 throws IOException {
105 ...
106
107 }
四、Buffer并发控制
在用户Buffer设置一个已经访问的用户集合、一个访问出错的用户集合和一个待访问的队列。
1 private UserBuffer() {
2 crawedUsers = new HashSet<String>();// 已经访问的用户,包括访问成功和访问出错的用户
3 errorUsers = new HashSet<String>();// 访问出错的用户
4 unCrawedUsers = new LinkedList<String>();// 未访问的用户
5 }
UserBuffer全局唯一,因此采用单例模式,使用的集合和队列都不是线程安全的数据结构,我认为没有必要使用线程安全的ConcurrentSkipListSet与ConcurrentLinkedQueue,因为将一个用户插入unCrawedUsers队列时需要先判断是否已经存在于crawedUsers用户集合中了,这需要用锁来同步访问,如果不使用锁来控制,单个线程安全的set和queque不能保证多个变量之间的test-try有效性。而使用了锁后,再使用线程安全的数据结构只会增加加锁和解锁的次数,反而降低了性能。
1 /***
2 * 添加未访问的用户
3 *
4 * @param users
5 * @return
6 */
7 public synchronized void addUnCrawedUsers(List<String> users) {
8 // 添加未访问的用户
9 for (String user : users) {
10 if (!crawedUsers.contains(user))
11 unCrawedUsers.add(user);
12 }
13 }
14
15 /**
16 * 从队列中取一个元素,并把这个元素添加到已经访问的集合中,以免重复访问。
17 *
18 *
19 * @return
20 */
21 public synchronized String pickOne() {
22 String newId = unCrawedUsers.poll();
23 // 队列中可能包含重复的id,因为插入队列时只检查是否在访问集合里,
24 // 没有检查是否已经出现在队列里
25 while (crawedUsers.contains(newId)) {
26 newId = unCrawedUsers.poll();
27 }
28 //访问前先把添加到已经访问的集合中
29 crawedUsers.add(newId);
30 return newId;
31 }
32
33 /**
34 * 添加访问出错的用户
35 *
36 * @param userId
37 */
38 public synchronized void addErrorUser(String userId) {
39 errorUsers.add(userId);
40 }
五、安全地终止爬虫
大丈夫能伸能屈,能走能停,爬虫也当如此。爬博客园的用户时,我真是提心吊胆,担心管理员封我的账号,我尽量挑凌晨的时间爬数据,另外就是爬了一会后,我就停下来,过一段时间再爬。要让一个多线程的程序平稳的停下来还真不简单,最担心的就是死锁,发送停止命令后,线程不动却也没有终止,爬了好久的Buffer空间没有保存,那个恨啊。
我的思路:
1、爬虫启动时,向Buffer注册一下,buffer记录启动的爬虫数量;
2、对爬虫设置一个全局的标志,爬虫在每次爬取一个用户前检查终止标志是否被设置;
3、当发生停止命令时,爬虫检查到停止标志被设置,于是通知Buffer自己将要停止,通知完后就结束运行;
4、Buffer收到爬虫停止通知后,将爬虫计数器减1,当计数器为0时,保存工作空间,同时通知关闭日志;
5、为了避免有些爬虫在运行异常时推出而没有通知Buffer,在发出停止命令时,同时通知Buffer准备停止,Buffer设置一个计时器,2分钟后,强制保存爬虫状态和日志。
完整的代码还是放在Github上,有兴趣的同学可以看看。
感谢阅读,转载请注明出处:http://www.cnblogs.com/fengfenggirl/