因为项目需要,做了一个网络爬虫的小DEMO。
为实现高性能的网络爬虫,首先考虑采用APACE的HttpClient进行页面的采集和解析,HttpClient可以很方便的通过URL获得远程内容,例如一个小程序:
CloseableHttpClienthttp client = HttpClients.createDefault(); HttpGet httpget = newHttpGet("http://localhost/"); CloseableHttpResponse response = httpclient.execute(httpget); try { HttpEntity entity =response.getEntity(); if (entity != null) { long len =entity.getContentLength(); if (len != -1 && len <2048) { System.out.println(EntityUtils.toString(entity)); } else { // Stream contentout } } } finally { response.close(); }
还可以做页面解析和模拟登陆等,功能相当强大。
其次,如果是网络爬虫或者网络采集,可能需要做大量的URL地址收集和分析,所以需要通过NoSQL数据库来提高执行的效率,Redis、Memcache、BerkeleyDB都是不错的选择。这里选择了BerkeleyDB数据库。虽然采用传统队列或其他形式可能性能会更高,但会带来大量的内存消耗,并不一定能找到符合条件的大内存服务器。
然后,对URL地址需要进行过滤,判断是否是已读的URL地址,如果已读就存入已读数据库,如果未读则放入未读数据库,有点类似队列的形式,以此避免重复读取URL地址。当然更进一步的需要判断页面内容是否重复,降低读取重复页面的概率。
再然后,对页面进行解析,提取关键内容和URL地址。
最后,为了保证性能,采用多线程的实现方式,在多服务器的模式下还可以采用分布式算法来实现更高的性能。
按照上面的思路,写了一个小程序:
1、部分配置信息,以CrawlConfig来做配置,也可以把这些存储为xml文件,这里采集的是163网站
(1)CrawlConfig.java
…… public class CrawlConfig { public static final String CRAWL_PATH = "http://www.163.com"; public static final String CRAWL_LIMIT_PATH = "http://www.163.com"; public static final String CRAWL_VISITED_FRONTIER = "d:\\cache\\hevisited"; public static final String CRAWL_UNVISITED_FRONTIER = "d:\\cache\\heunvisited"; public static final String CRAWL_DOWNLOAD_PATH = "d:\\download\\163\\"; public static final int CRAWL_THREAD_NUM = 6; }
(2) CrawlUrl.java作为URL地址的对象,当然除了URL属性外还可以存储其他信息
…… public class CrawlUrl implements Serializable{ private static final long serialVersionUID = 79332323432323L; public CrawlUrl() { } private String oriUrl; //原始url private String url; //url地址 public String getOriUrl() { return oriUrl; } public void setOriUrl(String oriUrl) { this.oriUrl = oriUrl; } public String getUrl() { return url; } public void setUrl(String url) { this.url = url; } }
(3)LinkFilter.java ,作为URL地址的过滤器
public interface LinkFilter { public boolean accept(String url); }
2、编写访问BerkelyDB的代码(请先安装BerkeleyDB,并引入BerkeleyDB的Je包
(1)AbstractFrontier.java
…… public abstract class AbstractFrontier { private Environment env; private static String CLASS_CATALOG = "java_class_catalog"; protected StoredClassCatalog javaCatalog; protected Database catalogdatabase; protected static Database database = null ; protected String homeDirectory = null; public AbstractFrontier(String homeDirectory) throws DatabaseException, FileNotFoundException { this.homeDirectory = homeDirectory; System.out.println("open environment: " + homeDirectory); //设置环境参数,打开env EnvironmentConfig envConfig = new EnvironmentConfig(); envConfig.setTransactional(true); envConfig.setAllowCreate(true); env = new Environment(new File(homeDirectory), envConfig); //设置数据库参数 DatabaseConfig dbConfig = new DatabaseConfig(); dbConfig.setTransactional(true); dbConfig.setAllowCreate(true); //打开数据库 catalogdatabase = env.openDatabase(null, CLASS_CATALOG, dbConfig); javaCatalog = new StoredClassCatalog(catalogdatabase); //设置参数 DatabaseConfig dbConfigTe = new DatabaseConfig(); dbConfigTe.setTransactional(true); dbConfigTe.setAllowCreate(true); //打开数据库 database = env.openDatabase(null, "URL", dbConfig); } public void close() throws DatabaseException { database.close(); javaCatalog.close(); env.close(); } protected abstract void put(Object key, Object value); protected abstract Object get(Object key); protected abstract Object delete(Object key); }
(2)Frontier.java
…… public interface Frontier { public CrawlUrl getNext() throws Exception; public boolean putUrl(CrawlUrl url) throws Exception; }
(3)考虑到并发的BDBFrontier.java
…… public class BDBFrontier extends AbstractFrontier implements Frontier{ private StoredMap pendingUrisDB = null; public static int threads = CrawlConfig.CRAWL_THREAD_NUM; /** * Creates a new instance of BDBFrontier. * * @param homeDirectory * @throws DatabaseException * @throws FileNotFoundException */ public BDBFrontier(String homeDirectory) throws DatabaseException, FileNotFoundException { super(homeDirectory); EntryBinding keyBinding = new SerialBinding(javaCatalog, String.class); EntryBinding valueBinding = new SerialBinding(javaCatalog, CrawlUrl.class); pendingUrisDB = new StoredMap(database, keyBinding, valueBinding, true); } /** * * clearAll: * 清除数据库 * * @param 参数 * @return void 返回值 * @throws * */ public void clearAll() { if(!pendingUrisDB.isEmpty()) pendingUrisDB.clear(); } /** * 获得下一条记录 * @see com.fc.frontier.Frontier#getNext() */ @Override public synchronized CrawlUrl getNext() throws Exception { CrawlUrl result = null; while(true) { if(!pendingUrisDB.isEmpty()) { Set entrys = pendingUrisDB.entrySet(); Entry<String, CrawlUrl> entry = (Entry<String, CrawlUrl>) pendingUrisDB.entrySet().iterator().next(); result = entry.getValue(); //下一条记录 delete(entry.getKey()); //删除当前记录 System.out.println("get:" + homeDirectory + entrys); return result; } else { threads --; if(threads > 0) { wait(); threads ++; } else { notifyAll(); return null; } } } } /** * 存入url * @see com.fc.frontier.Frontier#putUrl(com.fc.CrawlUrl) */ @Override public synchronized boolean putUrl(CrawlUrl url) throws Exception { if(url.getOriUrl() != null && !url.getOriUrl().equals("") && !pendingUrisDB.containsKey(url.getOriUrl())) { Set entrys = pendingUrisDB.entrySet(); put(url.getOriUrl(), url); notifyAll(); System.out.println("put:" + homeDirectory + entrys); return true; } return false; } public boolean contains(Object key) { if(pendingUrisDB.containsKey(key)) return true; return false; } /** * 存入数据库 * @see com.fc.frontier.AbstractFrontier#put(java.lang.Object, java.lang.Object) */ @Override protected synchronized void put(Object key, Object value) { pendingUrisDB.put(key, value); } /** * 从数据库取出 * @see com.fc.frontier.AbstractFrontier#get(java.lang.Object) */ @Override protected synchronized Object get(Object key) { return pendingUrisDB.get(key); } /** * 删除 * @see com.fc.frontier.AbstractFrontier#delete(java.lang.Object) */ @Override protected synchronized Object delete(Object key) { return pendingUrisDB.remove(key); } /** * * calculateUrl: * 对Url进行计算,可以用压缩算法 * * @param 参数 * @return String 返回值 * @throws * */ private String calculateUrl(String url) { return url; } public static void main(String[] strs) { try { BDBFrontier bdbFrontier = new BDBFrontier("d:\\cache"); CrawlUrl url = new CrawlUrl(); url.setOriUrl("http://www.163.com"); bdbFrontier.putUrl(url); System.out.println(((CrawlUrl)bdbFrontier.getNext()).getOriUrl()); bdbFrontier.close(); }catch(Exception e) { e.printStackTrace(); } } }
3、核心部分:包括了URL获取、页面解析、页面下载,页面的解析和下载会比较消耗时间。
(1)RetievePage.java,实现了URL访问和页面的下载
…… public class RetrievePage { private static String USER_AGENT = "Mozilla/4.0 (compatible; MSIE 6.0; Windows NT 5.1; SV1; QQDownload 1.7; .NET CLR 1.1.4322; CIBA; .NET CLR 2.0.50727"; private static String DEFAULT_CHARSET = "GB2312,utf-8;q=0.7,*;q=0.7"; private static String ACCEPT = "text/html,application/xhtml+xml,application/xml;q=0.9,*/*;q=0.8"; /** * 下载文件 * @param path * @return * @throws Exception * @throws IOException */ public static boolean downloadPage(String path) throws Exception,IOException { CloseableHttpClient httpclient = HttpClients.createDefault(); HttpGet httpget = new HttpGet(path); httpget.addHeader("Accept-Charset", DEFAULT_CHARSET); // httpget.addHeader("Host", host); httpget.addHeader("Accept", ACCEPT); httpget.addHeader("User-Agent", USER_AGENT); RequestConfig requestConfig = RequestConfig.custom() //设置超时 .setSocketTimeout(1000) .setConnectTimeout(1000) .build(); httpget.setConfig(requestConfig); CloseableHttpResponse response = httpclient.execute(httpget); try { HttpEntity entity = response.getEntity(); StatusLine statusLine = response.getStatusLine(); if(statusLine.getStatusCode() == HttpStatus.SC_MOVED_PERMANENTLY || //如果是转移 statusLine.getStatusCode() == HttpStatus.SC_MOVED_TEMPORARILY || statusLine.getStatusCode() == HttpStatus.SC_SEE_OTHER || statusLine.getStatusCode() == HttpStatus.SC_TEMPORARY_REDIRECT) { Header header = httpget.getFirstHeader("location"); if(header != null){ String newUrl = header.getValue(); if(newUrl == null || newUrl.equals("")) { newUrl = "/"; HttpGet redirect = new HttpGet(newUrl); } } } if(statusLine.getStatusCode() == HttpStatus.SC_OK) { //成功访问 if (entity == null) { throw new ClientProtocolException("Response contains no content"); } else { InputStream instream = entity.getContent(); String filename = getFilenameByUrl(path,entity.getContentType().getValue()); OutputStream outstream = new FileOutputStream(CrawlConfig.CRAWL_DOWNLOAD_PATH + filename); //存储到磁盘 try { //System.out.println(convertStreamToString(instream)); int tempByte = -1; while((tempByte = instream.read())>0) { outstream.write(tempByte); } return true; } catch(Exception e){ e.printStackTrace(); return false; } finally { if(instream != null) { instream.close(); } if(outstream != null) { outstream.close(); } } } } return false; }finally { response.close(); } } public static String getFilenameByUrl(String url, String contentType) { url = url.substring(7); if(contentType.indexOf("html") != -1) { url = url.replaceAll("[\\?/:*|<>\"]","_") + ".html"; return url; } else { url = url.replaceAll("[\\?/:*|<>\"]","_") + contentType.substring(contentType.lastIndexOf(‘/‘) + 1); return url; } } /** * 转换流数据为字符串 * @param is * @return */ public static String convertStreamToString(InputStream is) { BufferedReader reader = new BufferedReader(new InputStreamReader(is)); StringBuilder sb = new StringBuilder(); String line = null; try { while ((line = reader.readLine()) != null) { sb.append(line + "/n"); } } catch (IOException e) { e.printStackTrace(); } finally { } return sb.toString(); } public static void main(String[] args) { try{ System.out.println("下载开始"); RetrievePage.downloadPage("http://www.baidu.com"); System.out.println("下载结束"); } catch(HttpException e){ e.printStackTrace(); } catch(IOException e) { e.printStackTrace(); } catch (Exception e) { e.printStackTrace(); } } }
(2)HtmlParserTool.java,实现页面的解析,提取URL地址
…… public class HtmlParserTool { public static Set<String> extractLinks(String url, LinkFilter filter){ Set<String> links = new HashSet<String>(); try { Parser parser = new Parser(url); parser.setEncoding("gb2312"); NodeFilter frameFilter = new NodeFilter() { //过滤节点 public boolean accept(Node node) { if(node.getText().startsWith("frame src=")) { return true; } else { return false; } } }; OrFilter linkFilter = new OrFilter(new NodeClassFilter(LinkTag.class), frameFilter); NodeList list = parser.extractAllNodesThatMatch(linkFilter); //获取所有合适的节点 for(int i = 0; i <list.size();i++) { Node tag = list.elementAt(i); if(tag instanceof LinkTag) { //链接文字 LinkTag linkTag = (LinkTag) tag; String linkUrl = linkTag.getLink();//url String text = linkTag.getLinkText();//链接文字 System.out.println(linkUrl + "**********" + text); if(filter.accept(linkUrl)) links.add(linkUrl); } else if (tag instanceof ImageTag) //<img> 标签 //链接图片 { ImageTag image = (ImageTag) list.elementAt(i); System.out.print(image.getImageURL() + "********");//图片地址 System.out.println(image.getText());//图片文字 if(filter.accept(image.getImageURL())) links.add(image.getImageURL()); } else//<frame> 标签 { //提取 frame 里 src 属性的链接如 <frame src="test.html"/> String frame = tag.getText(); int start = frame.indexOf("src="); frame = frame.substring(start); int end = frame.indexOf(" "); if (end == -1) end = frame.indexOf(">"); frame = frame.substring(5, end - 1); System.out.println(frame); if(filter.accept(frame)) links.add(frame); } } return links; } catch (ParserException e) { e.printStackTrace(); return null; } } }
(3)MyCrawler.java实现页面的采集,这里采用了宽度优先的采集规则,当然更复杂的考虑这里还要设置深度,这里主要采用域名前缀作为过滤条件。另外多线程环境下,需要考虑数据的同步问题。
…… public class MyCrawler { public static BDBFrontier visitedFrontier; public static BDBFrontier unvisitedFrontier; private static int num = 0; public MyCrawler() { try{ if(visitedFrontier == null){ visitedFrontier = new BDBFrontier(CrawlConfig.CRAWL_VISITED_FRONTIER); //采用Nosql数据库存储访问地址方式 visitedFrontier.clearAll(); } if(unvisitedFrontier == null) { unvisitedFrontier = new BDBFrontier(CrawlConfig.CRAWL_UNVISITED_FRONTIER); unvisitedFrontier.clearAll(); } }catch(Exception e) { e.printStackTrace(); } } private void initCrawlerWithSeeds(String[] seeds) { synchronized (this) { try { for(int i = 0;i<seeds.length;i++){ CrawlUrl url = new CrawlUrl(); //采用berkeleyDB形式 url.setOriUrl(seeds[i]); unvisitedFrontier.putUrl(url); } } catch(Exception e) { e.printStackTrace(); } } } public void crawling(String[] seeds, int threadId) { try { LinkFilter filter = new LinkFilter() { @Override public boolean accept(String url) { Pattern pattern = Pattern.compile("^((https|http|ftp|rtsp|mms)?://)" + "+(([0-9a-z_!~*‘().&=+$%-]+: )?[0-9a-z_!~*‘().&=+$%-][email protected])?" + "(([0-9]{1,3}\\.){3}[0-9]{1,3}" + "|" + "([0-9a-z_!~*‘()-]+\\.)*" + "([0-9a-z][0-9a-z-]{0,61})?[0-9a-z]\\." + "[a-z]{2,6})" + "(:[0-9]{1,4})?" + "((/?)|" + "(/[0-9a-z_!~*‘().;?:@&=+$,%#-]+)+/?)$"); Matcher matcher = pattern.matcher(url); boolean isMatch= matcher.matches(); if(isMatch && url.startsWith(CrawlConfig.CRAWL_LIMIT_PATH)) { return true; } else { return false; } } }; initCrawlerWithSeeds(seeds); //采用berkeleyDB方式存储 CrawlUrl visitedCrawlUrl = (CrawlUrl)unvisitedFrontier.getNext(); //visitedFrontier.putUrl(visitedCrawlUrl); do{ System.out.println("线程:" + threadId); if(visitedCrawlUrl == null) { continue; } String visitedUrl = visitedCrawlUrl.getOriUrl(); if(visitedFrontier.contains(visitedUrl)) { //同步数据 visitedCrawlUrl = (CrawlUrl)unvisitedFrontier.getNext(); continue; } visitedFrontier.putUrl(visitedCrawlUrl); if(null == visitedUrl || "".equals(visitedUrl.trim())) { //抓取的地址为空 visitedFrontier.putUrl(visitedCrawlUrl); visitedCrawlUrl = (CrawlUrl)unvisitedFrontier.getNext(); continue; } try{ RetrievePage.downloadPage(visitedUrl); //下载页面 Set<String> links = HtmlParserTool.extractLinks(visitedUrl, filter); for(String link :links) { if(!visitedFrontier.contains(link) &&!unvisitedFrontier.contains(link) ) { CrawlUrl unvisitedCrawlUrl = new CrawlUrl(); unvisitedCrawlUrl.setOriUrl(link); unvisitedFrontier.putUrl(unvisitedCrawlUrl); } } }catch(ConnectTimeoutException e) { //超时继续读下一个地址 visitedFrontier.putUrl(visitedCrawlUrl); visitedCrawlUrl = (CrawlUrl)unvisitedFrontier.getNext(); num ++; e.printStackTrace(); continue; }catch(SocketTimeoutException e) { visitedFrontier.putUrl(visitedCrawlUrl); visitedCrawlUrl = (CrawlUrl)unvisitedFrontier.getNext(); num ++; e.printStackTrace(); continue; } visitedCrawlUrl = (CrawlUrl)unvisitedFrontier.getNext(); num ++; }while(BDBFrontier.threads >0 && num < 1000); } catch (IOException e) { e.printStackTrace(); } catch(Exception e) { e.printStackTrace(); } } }
(4)以Runnable接口形式实现d额多线程
…… public class MyCrawlerByThread extends MyCrawler implements Runnable{ private int threadId; public MyCrawlerByThread(int id) { this.threadId = id; } /** * (non-Javadoc) * @see java.lang.Runnable#run() */ @Override public void run() { try { crawling(new String[]{CrawlConfig.CRAWL_PATH}, threadId); }catch(Exception e) { e.printStackTrace(); } } public static void main(String[] args) { try { long startTime=System.currentTimeMillis(); System.out.println("采集开始"); ArrayList<Thread> threadList = new ArrayList<Thread>(CrawlConfig.CRAWL_THREAD_NUM); for(int i = 0 ; i < CrawlConfig.CRAWL_THREAD_NUM; i++) { MyCrawlerByThread crawler = new MyCrawlerByThread(i); Thread t = new Thread(crawler); t.start(); threadList.add(t); Thread.sleep(10L); } while(threadList.size() > 0) { Thread child = (Thread) threadList.remove(0); child.join(); } System.out.println("采集结束"); long endTime=System.currentTimeMillis(); System.out.println("程序运行时间: "+(endTime-startTime)+"ms"); } catch(Exception e) { e.printStackTrace(); } } }
执行结果:
采集开始
……
采集结束
程序运行时间: 25777ms
最后对采集性能进行分析,先后采用LinkQueue队列单线程、BerkeleyDB单线程、BerkeleyDB多线程方案进行网络采集,测试数据对比如下:
综上,多线程可以带来明显的性能提升。