0.def _next_request_from_scheduler(self, spider):
C:\Program Files\Anaconda2\Lib\site-packages\scrapy\core\engine.py
def _next_request_from_scheduler(self, spider): slot = self.slot request = slot.scheduler.next_request() #首先从优先级队列取出一个 request if not request: return d = self._download(request, spider) #request交给handler下载 d.addBoth(self._handle_downloader_output, request, spider) #拿到下载结果执行回调 d.addErrback(lambda f: logger.info(‘Error while handling downloader output‘, exc_info=failure_to_exc_info(f), extra={‘spider‘: spider})) d.addBoth(lambda _: slot.remove_request(request)) d.addErrback(lambda f: logger.info(‘Error while removing request from slot‘, exc_info=failure_to_exc_info(f), extra={‘spider‘: spider})) d.addBoth(lambda _: slot.nextcall.schedule()) d.addErrback(lambda f: logger.info(‘Error while scheduling new request‘, exc_info=failure_to_exc_info(f), extra={‘spider‘: spider})) return d
1. request = slot.scheduler.next_request() #scheduler从优先级队列[当前优先级]取出一个request返回给engine
C:\Program Files\Anaconda2\Lib\site-packages\scrapy\core\scheduler.py
class Scheduler(object): def next_request(self): request = self.mqs.pop() #这里的mqs指的是管理多个mqclss的pqclass,其中self.queues = {0:mqclss(),},如果使用dqclass,scheduler会先从指定文件夹加载保存的request,使用 PickleLifoDiskQueue if request: self.stats.inc_value(‘scheduler/dequeued/memory‘, spider=self.spider) else: request = self._dqpop() if request: self.stats.inc_value(‘scheduler/dequeued/disk‘, spider=self.spider) if request: self.stats.inc_value(‘scheduler/dequeued‘, spider=self.spider) return request
C:\Program Files\Anaconda2\Lib\site-packages\queuelib\pqueue.py ########################后面补充入队相关操作
class PriorityQueue(object): def push(self, obj, priority=0): #入队默认优先级为0 if priority not in self.queues: self.queues[priority] = self.qfactory(priority) q = self.queues[priority] q.push(obj) # this may fail (eg. serialization error) if self.curprio is None or priority < self.curprio: #入队的时候发现更高优先级,更新当前优先级,保证优先处理重定向 self.curprio = priority def pop(self): if self.curprio is None: return q = self.queues[self.curprio] m = q.pop() #取出当前pri的一个request if len(q) == 0: #如果取完之后当前pri队列为空,则更新当前pri del self.queues[self.curprio] q.close() prios = [p for p, q in self.queues.items() if len(q) > 0] self.curprio = min(prios) if prios else None #更新当前pri为最小值 return m
2. d = self._download(request, spider) #request交给handler下载
C:\Program Files\Anaconda2\Lib\site-packages\scrapy\core\engine.py
def _download(self, request, spider): slot = self.slot slot.add_request(request) #slot:self.inprogress = set() self.inprogress.add(request) def _on_success(response): assert isinstance(response, (Response, Request)) if isinstance(response, Response): response.request = request # tie request to response received logkws = self.logformatter.crawled(request, response, spider) logger.log(*logformatter_adapter(logkws), extra={‘spider‘: spider}) self.signals.send_catch_log(signal=signals.response_received, response=response, request=request, spider=spider) return response def _on_complete(_): slot.nextcall.schedule() return _ dwld = self.downloader.fetch(request, spider) #DOWNLOADER = ‘scrapy.core.downloader.Downloader‘ dwld.addCallbacks(_on_success) ############################################### dwld.addBoth(_on_complete) ############################################# return dwld
###dwld = self.downloader.fetch(request, spider)
C:\Program Files\Anaconda2\Lib\site-packages\scrapy\core\downloader\__init__.py
from .middleware import DownloaderMiddlewareManager from .handlers import DownloadHandlers class Downloader(object): def __init__(self, crawler): self.handlers = DownloadHandlers(crawler) self.middleware = DownloaderMiddlewareManager.from_crawler(crawler) def fetch(self, request, spider): def _deactivate(response): self.active.remove(request) return response self.active.add(request) dfd = self.middleware.download(self._enqueue_request, request, spider) return dfd.addBoth(_deactivate)
####先在 DownloaderMiddlewareManager 加工 request
C:\Program Files\Anaconda2\Lib\site-packages\scrapy\core\downloader\middleware.py
class DownloaderMiddlewareManager(MiddlewareManager): def download(self, download_func, request, spider): @defer.inlineCallbacks def process_request(request): for method in self.methods[‘process_request‘]: response = yield method(request=request, spider=spider) assert response is None or isinstance(response, (Response, Request)), ‘Middleware %s.process_request must return None, Response or Request, got %s‘ % (six.get_method_self(method).__class__.__name__, response.__class__.__name__) if response: defer.returnValue(response) defer.returnValue((yield download_func(request=request,spider=spider))) #正常流程是对 request 进行一系列加工,去 yield 传入的 _enqueue_request() deferred = mustbe_deferred(process_request, request) #正常流程走完,激活走下面的 process_exception 或 process_response deferred.addErrback(process_exception) deferred.addCallback(process_response) return deferred
####加工后的request存入downloader维护的 self.slots{hostname:slot},顺便从当前slot queue取出request交给handler下载,直到填满当前域名最大并行数
C:\Program Files\Anaconda2\Lib\site-packages\scrapy\core\downloader\__init__.py
class Downloader(object): def __init__(self, crawler): self.slots = {} #字典 self.slots[key] = Slot(conc, delay, self.randomize_delay) def _enqueue_request(self, request, spider): #从scheduler取出request》》》downloader.fetch:mw加工request,在downloader维护的slots字典中选定slot key, slot = self._get_slot(request, spider) #key默认是hostname,指向一个对应的slot。如果字典没有对应的slot,新建。 request.meta[‘download_slot‘] = key def _deactivate(response): slot.active.remove(request) return response slot.active.add(request) ## deferred = defer.Deferred().addBoth(_deactivate) ## slot.queue.append((request, deferred)) # queue无限存储 active set() transferring set() self._process_queue(spider, slot) # 当前hostname对应的slot的并行有空缺,则取出request交给handler,并一定就是当前存入slot queue的request。 return deferred def _process_queue(self, spider, slot): if slot.latercall and slot.latercall.active(): return # Delay queue processing if a download_delay is configured now = time() delay = slot.download_delay() #默认DOWNLOAD_DELAY = 0 if delay: penalty = delay - now + slot.lastseen if penalty > 0: slot.latercall = reactor.callLater(penalty, self._process_queue, spider, slot) return # Process enqueued requests if there are free slots to transfer for this slot #只要当前hostname对应的queue存储不为空而且并行有空缺 while slot.queue and slot.free_transfer_slots() > 0: #默认每个域名即hostname对应一个slot,CONCURRENT_REQUESTS_PER_DOMAIN = 8,这里计算 8减去self.transferring,下面显示request交给handler之后加1 slot.lastseen = now request, deferred = slot.queue.popleft() dfd = self._download(slot, request, spider) dfd.chainDeferred(deferred) # prevent burst if inter-request delays were configured if delay: self._process_queue(spider, slot) break def _download(self, slot, request, spider): # The order is very important for the following deferreds. Do not change! # 1. Create the download deferred dfd = mustbe_deferred(self.handlers.download_request, request, spider) #交给handler # 2. Notify response_downloaded listeners about the recent download # before querying queue for next request def _downloaded(response): self.signals.send_catch_log(signal=signals.response_downloaded, response=response, request=request, spider=spider) return response dfd.addCallback(_downloaded) #handler返回resp,通知。。。。。。。。。。。 # 3. After response arrives, remove the request from transferring # state to free up the transferring slot so it can be used by the # following requests (perhaps those which came from the downloader # middleware itself) slot.transferring.add(request) #request交给handler之后,加1 def finish_transferring(_): slot.transferring.remove(request) self._process_queue(spider, slot) return _ return dfd.addBoth(finish_transferring) #handler返回resp,减1
####直到填满当前slot并行数:从slot queue取出的request,根据 scheme 选择相应 handler ,比如 http 选择 HTTPDownloadHandler 实际对应 \core\downloader\handlers\http11.py HTTP11DownloadHandler
C:\Program Files\Anaconda2\Lib\site-packages\scrapy\core\downloader\handlers\__init__.py
class DownloadHandlers(object): def __init__(self, crawler): self._schemes = {} # stores acceptable schemes on instancing handlers = without_none_values( crawler.settings.getwithbase(‘DOWNLOAD_HANDLERS‘)) for scheme, clspath in six.iteritems(handlers): #字典 {scheme: clspath} self._schemes[scheme] = clspath def download_request(self, request, spider): scheme = urlparse_cached(request).scheme handler = self._get_handler(scheme) #选择 handler if not handler: raise NotSupported("Unsupported URL scheme ‘%s‘: %s" % (scheme, self._notconfigured[scheme])) return handler.download_request(request, spider)
C:\Program Files\Anaconda2\Lib\site-packages\scrapy\core\downloader\handlers\http11.py
最后跟踪到如下,
其中 agent 是 from twisted.web.client import Agent
L{Agent} is a very basic HTTP client. It supports I{HTTP} and I{HTTPS} scheme URIs
d = agent.request( method, to_bytes(url, encoding=‘ascii‘), headers, bodyproducer) # set download latency d.addCallback(self._cb_latency, request, start_time) # response body is ready to be consumed d.addCallback(self._cb_bodyready, request) d.addCallback(self._cb_bodydone, request, url) # check download timeout self._timeout_cl = reactor.callLater(timeout, d.cancel) d.addBoth(self._cb_timeout, request, url, timeout) return d
正常则最终返回 response
return respcls(url=url, status=status, headers=headers, body=body, flags=flags)
停。
###fetch 将request交给handler拿到结果之后的回调 dwld.addCallbacks(_on_success) 和 dwld.addBoth(_on_complete)
C:\Program Files\Anaconda2\Lib\site-packages\scrapy\core\engine.py
class ExecutionEngine(object): def _download(self, request, spider): slot = self.slot slot.add_request(request) def _on_success(response): assert isinstance(response, (Response, Request)) if isinstance(response, Response): response.request = request # tie request to response received logkws = self.logformatter.crawled(request, response, spider) logger.log(*logformatter_adapter(logkws), extra={‘spider‘: spider}) self.signals.send_catch_log(signal=signals.response_received, response=response, request=request, spider=spider) return response #如果结果是response,发出信号 def _on_complete(_): slot.nextcall.schedule() #触发同心跳操作 return _ dwld = self.downloader.fetch(request, spider) dwld.addCallbacks(_on_success) dwld.addBoth(_on_complete) return dwld #返回结果
3. d.addBoth(self._handle_downloader_output, request, spider) #拿到下载结果执行回调
###回调 d.addBoth(self._handle_downloader_output, request, spider)
C:\Program Files\Anaconda2\Lib\site-packages\scrapy\core\engine.py
class ExecutionEngine(object): def _handle_downloader_output(self, response, request, spider): assert isinstance(response, (Request, Response, Failure)), response #结果只能是 request/response/failure # downloader middleware can return requests (for example, redirects) if isinstance(response, Request): #结果是 request self.crawl(response, spider) return # response is a Response or Failure d = self.scraper.enqueue_scrape(response, request, spider) #结果是 response/failure,交给scraper d.addErrback(lambda f: logger.error(‘Error while enqueuing downloader output‘, exc_info=failure_to_exc_info(f), extra={‘spider‘: spider})) return d