Pipeline & PageProcesser

这两部分是应该程序员自己实现的部分,因为PageProcesser关乎如何解析页面而Pipeline则是存储,推荐使用OOSpider也就是注解式编程。

Downloader

public interface Downloader {

/**
* Downloads web pages and store in Page object.
*
* @param request request
* @param task task
* @return page
*/
public Page download(Request request, Task task);

/**
* Tell the downloader how many threads the spider used.
* @param threadNum number of threads
*/
public void setThread(int threadNum);
}

主要的实现类又3个,我只重点介绍一下HttpClientDownloader,有兴趣的可以自己看看源码

@ThreadSafe
public class HttpClientDownloader extends AbstractDownloader {

private Logger logger = LoggerFactory.getLogger(getClass());

private final Map<String, CloseableHttpClient> httpClients = new HashMap<String, CloseableHttpClient>();

private HttpClientGenerator httpClientGenerator = new HttpClientGenerator();

private CloseableHttpClient getHttpClient(Site site, Proxy proxy) {
if (site == null) {
return httpClientGenerator.getClient(null, proxy);
}
String domain = site.getDomain();
CloseableHttpClient httpClient = httpClients.get(domain);
if (httpClient == null) {
synchronized (this) {
httpClient = httpClients.get(domain);
if (httpClient == null) {
httpClient = httpClientGenerator.getClient(site, proxy);
httpClients.put(domain, httpClient);
}
}
}
return httpClient;
}

@Override
public Page download(Request request, Task task) {
Site site = null;
if (task != null) {
site = task.getSite();
}
Set<Integer> acceptStatCode;
String charset = null;
Map<String, String> headers = null;
if (site != null) {
acceptStatCode = site.getAcceptStatCode();
charset = site.getCharset();
headers = site.getHeaders();
} else {
acceptStatCode = Sets.newHashSet(200);
}
logger.info("downloading page {}", request.getUrl());
CloseableHttpResponse httpResponse = null;
int statusCode=0;
try {
HttpHost proxyHost = null;
Proxy proxy = null; //TODO
if (site.getHttpProxyPool() != null && site.getHttpProxyPool().isEnable()) {
proxy = site.getHttpProxyFromPool();
proxyHost = proxy.getHttpHost();
} else if(site.getHttpProxy()!= null){
proxyHost = site.getHttpProxy();
}

HttpUriRequest httpUriRequest = getHttpUriRequest(request, site, headers, proxyHost);
httpResponse = getHttpClient(site, proxy).execute(httpUriRequest);???
statusCode = httpResponse.getStatusLine().getStatusCode();
request.putExtra(Request.STATUS_CODE, statusCode);
if (statusAccept(acceptStatCode, statusCode)) {
Page page = handleResponse(request, charset, httpResponse, task);
onSuccess(request);
return page;
} else {
logger.warn("code error " + statusCode + "\t" + request.getUrl());
return null;
}
} catch (IOException e) {
logger.warn("download page " + request.getUrl() + " error", e);
if (site.getCycleRetryTimes() > 0) {
return addToCycleRetry(request, site);
}
onError(request);
return null;
} finally {
request.putExtra(Request.STATUS_CODE, statusCode);
if (site.getHttpProxyPool()!=null && site.getHttpProxyPool().isEnable()) {
site.returnHttpProxyToPool((HttpHost) request.getExtra(Request.PROXY), (Integer) request
.getExtra(Request.STATUS_CODE));
}
try {
if (httpResponse != null) {
//ensure the connection is released back to pool
EntityUtils.consume(httpResponse.getEntity());
}
} catch (IOException e) {
logger.warn("close response fail", e);
}
}
}

@Override
public void setThread(int thread) {
httpClientGenerator.setPoolSize(thread);
}

protected boolean statusAccept(Set<Integer> acceptStatCode, int statusCode) {
return acceptStatCode.contains(statusCode);
}

protected HttpUriRequest getHttpUriRequest(Request request, Site site, Map<String, String> headers,HttpHost proxy) {
RequestBuilder requestBuilder = selectRequestMethod(request).setUri(request.getUrl());
if (headers != null) {
for (Map.Entry<String, String> headerEntry : headers.entrySet()) {
requestBuilder.addHeader(headerEntry.getKey(), headerEntry.getValue());
}
}
RequestConfig.Builder requestConfigBuilder = RequestConfig.custom()
.setConnectionRequestTimeout(site.getTimeOut())
.setSocketTimeout(site.getTimeOut())
.setConnectTimeout(site.getTimeOut())
.setCookieSpec(CookieSpecs.BEST_MATCH);
if (proxy !=null) {
requestConfigBuilder.setProxy(proxy);
request.putExtra(Request.PROXY, proxy);
}
requestBuilder.setConfig(requestConfigBuilder.build());
return requestBuilder.build();
}

protected RequestBuilder selectRequestMethod(Request request) {
String method = request.getMethod();
if (method == null || method.equalsIgnoreCase(HttpConstant.Method.GET)) {
//default get
return RequestBuilder.get();
} else if (method.equalsIgnoreCase(HttpConstant.Method.POST)) {
RequestBuilder requestBuilder = RequestBuilder.post();
NameValuePair[] nameValuePair = (NameValuePair[]) request.getExtra("nameValuePair");
if (nameValuePair != null && nameValuePair.length > 0) {
requestBuilder.addParameters(nameValuePair);
}
return requestBuilder;
} else if (method.equalsIgnoreCase(HttpConstant.Method.HEAD)) {
return RequestBuilder.head();
} else if (method.equalsIgnoreCase(HttpConstant.Method.PUT)) {
return RequestBuilder.put();
} else if (method.equalsIgnoreCase(HttpConstant.Method.DELETE)) {
return RequestBuilder.delete();
} else if (method.equalsIgnoreCase(HttpConstant.Method.TRACE)) {
return RequestBuilder.trace();
}
throw new IllegalArgumentException("Illegal HTTP Method " + method);
}

protected Page handleResponse(Request request, String charset, HttpResponse httpResponse, Task task) throws IOException {
String content = getContent(charset, httpResponse);
Page page = new Page();
page.setRawText(content);
page.setUrl(new PlainText(request.getUrl()));
page.setRequest(request);
page.setStatusCode(httpResponse.getStatusLine().getStatusCode());
return page;
}

protected String getContent(String charset, HttpResponse httpResponse) throws IOException {
if (charset == null) {
byte[] contentBytes = IOUtils.toByteArray(httpResponse.getEntity().getContent());
String htmlCharset = getHtmlCharset(httpResponse, contentBytes);
if (htmlCharset != null) {
return new String(contentBytes, htmlCharset);
} else {
logger.warn("Charset autodetect failed, use {} as charset. Please specify charset in Site.setCharset()", Charset.defaultCharset());
return new String(contentBytes);
}
} else {
return IOUtils.toString(httpResponse.getEntity().getContent(), charset);
}
}

protected String getHtmlCharset(HttpResponse httpResponse, byte[] contentBytes) throws IOException {
String charset;
// charset
// 1、encoding in http header Content-Type
String value = httpResponse.getEntity().getContentType().getValue();
charset = UrlUtils.getCharset(value);
if (StringUtils.isNotBlank(charset)) {
logger.debug("Auto get charset: {}", charset);
return charset;
}
// use default charset to decode first time
Charset defaultCharset = Charset.defaultCharset();
String content = new String(contentBytes, defaultCharset.name());
// 2、charset in meta
if (StringUtils.isNotEmpty(content)) {
Document document = Jsoup.parse(content);
Elements links = document.select("meta");
for (Element link : links) {
// 2.1、html4.01 <meta http-equiv="Content-Type" content="text/html; charset=UTF-8" />
String metaContent = link.attr("content");
String metaCharset = link.attr("charset");
if (metaContent.indexOf("charset") != -1) {
metaContent = metaContent.substring(metaContent.indexOf("charset"), metaContent.length());
charset = metaContent.split("=")[1];
break;
}
// 2.2、html5 <meta charset="UTF-8" />
else if (StringUtils.isNotEmpty(metaCharset)) {
charset = metaCharset;
break;
}
}
}
logger.debug("Auto get charset: {}", charset);
// 3、todo use tools as cpdetector for content decode
return charset;
}
}

其中包括了添加http proxy这部分官方文档都没有介绍,如果需要那就自行看源码吧- -b
再看带那种的这部分

if (statusAccept(acceptStatCode, statusCode)) {
Page page = handleResponse(request, charset, httpResponse, task);
onSuccess(request);
return page;
} else {
logger.warn("code error " + statusCode + "\t" + request.getUrl());
return null;
}

acceptStatCode默认是200,如果出现其他resultCode那么将会直接return null,也不会释放HttpClient的资源,也就是下面的finally块不会被执行。也算是一个bug吧

finally {
request.putExtra(Request.STATUS_CODE, statusCode);
if (site.getHttpProxyPool()!=null && site.getHttpProxyPool().isEnable()) {
site.returnHttpProxyToPool((HttpHost) request.getExtra(Request.PROXY), (Irequest
.getExtra(Request.STATUS_CODE));
}
try {
if (httpResponse != null) {
//ensure the connection is released back to pool
EntityUtils.consume(httpResponse.getEntity());
}
} catch (IOException e) {
logger.warn("close response fail", e);
}
}

到此为止,所有的关于WebMagic的主体源码都介绍完毕了,如果你需要使用那么目前的知识已经足够了,如果出现bug还是需要自行更改,还好WebMagic给我们提供了更换组件的接口,使用起来还是很方便的。

时间: 2024-10-04 17:32:28

Pipeline & PageProcesser的相关文章

[持续交付实践] pipeline:pipeline 使用之快速入门

什么是pipeline 先介绍下什么是Jenkins 2.0,Jenkins 2.0的精髓是Pipeline as Code,是帮助Jenkins实现CI到CD转变的重要角色.什么是Pipeline,简单来说,就是一套运行于Jenkins上的工作流框架,将原本独立运行于单个或者多个节点的任务连接起来,实现单个任务难以完成的复杂发布流程.Pipeline的实现方式是一套Groovy DSL,任何发布流程都可以表述为一段Groovy脚本,并且Jenkins支持从代码库直接读取脚本,从而实现了Pipe

MongoDB 学习笔记之 Aggregation Pipeline实战实现inner join

 Aggregation Pipeline实战实现inner join: leftT集合: comments集合: 现在我们要用aggregation实现inner join db.comments.aggregate([{ $lookup: { from:"leftT", localField:"timestamp", foreignField:"timestamp", as: "timestampCol" }}, {$un

使用beanstalkd实现定制化持续集成过程中pipeline - 持续集成系列

持续集成是一种项目管理和流程模型,依赖于团队中各个角色的配合.各个角色的意识和配合不是一朝一夕能练就的,我们的工作只是提供一种方案和能力,这就是持续集成能力的服务化.而在做持续集成能力服务化的过程中,最核心的一点就是,如何实现一个可定制化的任务流,即所谓的pipeline. 在传统的持续集成工具实现了pipeline功能,以供串联上下游job,并把多个job联系成一次完整的构建,例如jenkins的pipeline插件. 但是各种持续集成工具,或多或少都有自己的短板,总结起来如下: 1.配置并不

图解Netty之Pipeline、channel、Context之间的数据流向。

声明:本文为原创博文,禁止转载.       以下所绘制图形均基于Netty4.0.28版本. 一.connect(outbound类型事件)  当用户调用channel的connect时,会发起一个outbound类型的事件,该事件将在pipeline中传递(pipeline.connect),首先由tail handler处理,该handler只是将事件透传给下一个outbound类型的用户Handler(如果有),事件依次传递下去,直到传递到head handler,该handler会调用

Jenkins pipeline 入门到精通系列文章

Jenkins2 入门到精通系列文章. Jenkins2 下载与启动jenkins2 插件安装jenkins2 hellopipelinejenkins2 pipeline介绍jenkins2 javahelloworldjenkins2 groovy入门jenkins2 pipeline入门jenkins2 pipeline高级jenkins2 Jenkinsfilejenkins2 multibranchjenkins2 Jenkinsfile和loadjenkins2 groovy脚本参考

aggregation 详解4(pipeline aggregations)

概述 管道聚合处理的对象是其它聚合的输出(桶或者桶的某些权值),而不是直接针对文档. 管道聚合的作用是为输出增加一些有用信息. 管道聚合大致分为两类: parent 此类聚合的"输入"是其[父聚合]的输出,并对其进行进一步处理.一般不生成新的桶,而是对父聚合桶信息的增强. sibling 此类聚合的输入是其[兄弟聚合]的输出.并能在同级上计算新的聚合. 管道聚合通过 buckets_path 参数指定他们要进行聚合计算的权值对象,buckets_path 参数有其自己的使用语法. 管道

MVC之前的那点事儿系列(9):MVC如何在Pipeline中接管请求的?(转载)

MVC之前的那点事儿系列(9):MVC如何在Pipeline中接管请求的? 文章内容 上个章节我们讲到了,可以在HttpModules初始化之前动态添加Route的方式来自定义自己的HttpHandler,最终接管请求的,那MVC是这么实现的么?本章节我们就来分析一下相关的MVC源码来验证一下我们的这个问题. 先创建一个MVC3的Web Application,选择默认的模板以便创建以后就默认包含HomeController和AccountController.我们知道MVC要先接管请求才能通过

【转】Netty那点事(三)Channel中的Pipeline

[原文]https://github.com/code4craft/netty-learning/blob/master/posts/ch3-pipeline.md Channel是理解和使用Netty的核心.Channel的涉及内容较多,这里我使用由浅入深的介绍方法.在这篇文章中,我们主要介绍Channel部分中Pipeline实现机制.为了避免枯燥,借用一下<盗梦空间>的“梦境”概念,希望大家喜欢. 一层梦境:Channel实现概览 在Netty里,Channel是通讯的载体,而Chann

OpenGL学习 Following the Pipeline

Passing Data to the Vertex Shader Vertex Attributes At the start of the OpenGL pipeline,we use the in keyword to bring inputs into the vertex shader. Between stages,in and out can be used to form conduits from shader to shader and pass databetween th