Twisted使用和scrapy源码剖析

1.Twisted是用Python实现的基于事件驱动的网络引擎框架。

事件驱动编程是一种编程范式,这里程序的执行流由外部事件来决定。它的特点是包含一个事件循环,当外部事件发生时使用回调机制来触发相应的处理。另外两种常见的编程范式是(单线程)同步以及多线程编程。

from twisted.internet import reactor   # 事件循环(终止条件,所有的socket都已经移除)from twisted.web.client import getPage # socket对象(如果下载完成,自动从时间循环中移除...)from twisted.internet import defer     # defer.Deferred 特殊的socket对象 (不会发请求,手动移除)
1.利用getPage创建socket
2.将socket添加到事件循环中
3.开始事件循环(无法自动结束)
def response(content):
    print(content)

@defer.inlineCallbacks
def task():
    url = "http://www.baidu.com"
    d = getPage(url.encode(‘utf-8‘))
    d.addCallback(response)
    yield d

def done(*args,**kwargs):
    reactor.stop()#在这里加上也无法自动结束

task()
reactor.run()
########################
1.利用getPage创建socket
2.将socket添加到事件循环中
3.开始事件循环(自动结束)
def response(content):
    print(content)

@defer.inlineCallbacks
def task():
    url = "http://www.baidu.com"
    d = getPage(url.encode(‘utf-8‘))
    d.addCallback(response)
    yield d

def done(*args,**kwargs):
    reactor.stop()

d = task()
dd = defer.DeferredList([d,])
dd.addBoth(done)

reactor.run()

Twisted实现了设计模式中的反应堆(reactor)模式,这种模式在单线程环境中调度多个事件源产生的事件到它们各自的事件处理例程中去。

在异步版的URL获取器中,reactor.run()启动reactor事件循环。

Twisted的核心就是reactor事件循环。Reactor可以感知网络、文件系统以及定时器事件。它等待然后处理这些事件,从特定于平台的行为中抽象出来,并提供统一的接口,使得在网络协议栈的任何位置对事件做出响应都变得简单。

2.Deferred对象以抽象化的方式表达了一种思想,即结果还尚不存在。它同样能够帮助管理产生这个结果所需要的回调链。当从函数中返回时,Deferred对象承诺在某个时刻函数将产生一个结果。返回的Deferred对象中包含所有注册到事件上的回调引用,因此在函数间只需要传递这一个对象即可,跟踪这个对象比单独管理所有的回调要简单的多。

Deferred对象创建时包含两个添加回调的阶段。第一阶段,addCallbacks将response添加到归属的回调链中。然后addBoth再将done同时添加到这两个回调链上。

# 1.利用getPage创建socket
# 2.将socket添加到事件循环中
# 3.开始事件循环(自动结束)
def response(content):
    print(content)

@defer.inlineCallbacks
def task():
    url = "http://www.baidu.com"
    d = getPage(url.encode(‘utf-8‘))
    d.addCallback(response)
    yield d
    url = "http://www.baidu.com"
    d = getPage(url.encode(‘utf-8‘))
    d.addCallback(response)
    yield d

def done(*args,**kwargs):
    reactor.stop()

li = []
for i in range(10):
    d = task()
    li.append(d)
dd = defer.DeferredList(li)
dd.addBoth(done)
reactor.run()
#########################
# 1.利用getPage创建socket
# 2.将socket添加到事件循环中
# 3.开始事件循环(自动结束)
def response(content):
    print(content)

@defer.inlineCallbacks
def task():
    url = "http://www.baidu.com"
    d1 = getPage(url.encode(‘utf-8‘))
    d1.addCallback(response)

    url = "http://www.baidu.com"
    d2 = getPage(url.encode(‘utf-8‘))
    d2.addCallback(response)

    url = "http://www.baidu.com"
    d3 = getPage(url.encode(‘utf-8‘))
    d3.addCallback(response)
    yield defer.Deferred()

def done(*args,**kwargs):
    reactor.stop()

d=task()
dd = defer.DeferredList([d,])
dd.addBoth(done)
reactor.run()

3.自定义scrapy框架

from twisted.internet import reactor   # 事件循环(终止条件,所有的socket都已经移除)
from twisted.web.client import getPage # socket对象(如果下载完成,自动从时间循环中移除...)
from twisted.internet import defer     # defer.Deferred 特殊的socket对象 (不会发请求,手动移除)

class Request(object):
    def __init__(self,url,callback):
        self.url = url
        self.callback = callback

class HttpResponse(object):
    def __init__(self,content,request):
        self.content = content
        self.request = request
        self.url = request.url
        self.text = str(content,encoding=‘utf-8‘)

class ChoutiSpider(object):
    name = ‘chouti‘
    def start_requests(self):
        start_url = [‘http://www.baidu.com‘,‘http://www.bing.com‘,]
        for url in start_url:
            yield Request(url,self.parse)

    def parse(self,response):
        print(‘-----response----->‘,response) #response是下载的页面
        yield Request(‘http://www.cnblogs.com‘,callback=self.parse)
        #1 crawling移除
        #2 获取parse yield值
        #3 再次去队列中获取

import queue
Q = queue.Queue()

class Engine(object):
    def __init__(self):
        self._close = None
        self.max = 5
        self.crawlling = []

    def get_response_callback(self,content,request):
        self.crawlling.remove(request)
        rep = HttpResponse(content,request)
        result = request.callback(rep)#content和request
        # print(result)#<generator object ChoutiSpider.parse at 0x000001F694A2C9E8>
        import types
        if isinstance(result,types.GeneratorType):
            for req in result:
                # print(‘-------------->‘,req)
                Q.put(req)

    def _next_request(self):
        """
        去取request对象,并发送请求
        最大并发数限制
        :return:
        """
        print(‘---->request‘,self.crawlling,Q.qsize())
        if Q.qsize() == 0 and len(self.crawlling) == 0:
            self._close.callback(None)
            return

        if len(self.crawlling) >= self.max:
            return
        while len(self.crawlling) < self.max:
            try:
                req = Q.get(block=False)
                self.crawlling.append(req)
                d = getPage(req.url.encode(‘utf-8‘))
                # 页面下载完成,get_response_callback,调用用户spider中定义的parse方法,并且将新请求添加到调度器
                d.addCallback(self.get_response_callback,req)
                # 未达到最大并发数,可以再去调度器中获取Request
                d.addCallback(lambda _:reactor.callLater(0, self._next_request))
            except Exception as e:
                # print(e)
                return

    @defer.inlineCallbacks
    def crawl(self,spider):
        # 将初始Request对象添加到调度器
        start_requests = iter(spider.start_requests())
        while True:
            try:
                request = next(start_requests)
                Q.put(request)
            except StopIteration as e:
                break

        # 去调度器中取request,并发送请求
        # self._next_request()
        reactor.callLater(0, self._next_request)
        self._close = defer.Deferred()
        yield self._close

spider = ChoutiSpider()
_active = set()
engine = Engine()
d = engine.crawl(spider)
_active.add(d)

dd = defer.DeferredList(_active)
dd.addBoth(lambda _:reactor.stop())
reactor.run()

4. 根据源码重写engine

from twisted.internet import reactor   # 事件循环(终止条件,所有的socket都已经移除)
from twisted.web.client import getPage # socket对象(如果下载完成,自动从时间循环中移除...)
from twisted.internet import defer     # defer.Deferred 特殊的socket对象 (不会发请求,手动移除)
from queue import Queue

class Request(object):
    """
    用于封装用户请求相关信息
    """
    def __init__(self,url,callback):
        self.url = url
        self.callback = callback

class HttpResponse(object):
    def __init__(self,content,request):
        self.content = content
        self.request = request

class Scheduler(object):
    """
    任务调度器
    """
    def __init__(self):
        self.q = Queue()

    def open(self):
        pass

    def next_request(self):
        try:
            req = self.q.get(block=False)
        except Exception as e:
            req = None
        return req

    def enqueue_request(self,req):
        self.q.put(req)

    def size(self):
        return self.q.qsize()

class ExecutionEngine(object):
    """
    引擎:所有调度
    """
    def __init__(self):
        self._close = None
        self.scheduler = None
        self.max = 5
        self.crawlling = []

    def get_response_callback(self,content,request):
        self.crawlling.remove(request)
        response = HttpResponse(content,request)
        result = request.callback(response)
        import types
        if isinstance(result,types.GeneratorType):
            for req in result:
                self.scheduler.enqueue_request(req)

    def _next_request(self):
        if self.scheduler.size() == 0 and len(self.crawlling) == 0:
            self._close.callback(None)
            return

        while len(self.crawlling) < self.max:
            req = self.scheduler.next_request()
            if not req:
                return
            self.crawlling.append(req)
            d = getPage(req.url.encode(‘utf-8‘))
            d.addCallback(self.get_response_callback,req)
            d.addCallback(lambda _:reactor.callLater(0,self._next_request))

    @defer.inlineCallbacks
    def open_spider(self,start_requests):
        self.scheduler = Scheduler()
        yield self.scheduler.open()
        while True:
            try:
                req = next(start_requests)
            except StopIteration as e:
                break
            self.scheduler.enqueue_request(req)
        reactor.callLater(0,self._next_request)

    @defer.inlineCallbacks
    def start(self):
        self._close = defer.Deferred()
        yield self._close

class Crawler(object):
    """
    用户封装调度器以及引擎的...
    """
    def _create_engine(self):
        return ExecutionEngine()

    def _create_spider(self,spider_cls_path):
        """
        :param spider_cls_path:  spider.chouti.ChoutiSpider
        :return:
        """
        module_path,cls_name = spider_cls_path.rsplit(‘.‘,maxsplit=1)
        import importlib
        m = importlib.import_module(module_path)
        cls = getattr(m,cls_name)
        return cls()

    @defer.inlineCallbacks
    def crawl(self,spider_cls_path):
        engine = self._create_engine()
        spider = self._create_spider(spider_cls_path)
        start_requests = iter(spider.start_requests())
        yield engine.open_spider(start_requests)
        yield engine.start()

class CrawlerProcess(object):
    """
    开启事件循环
    """
    def __init__(self):
        self._active = set()

    def crawl(self,spider_cls_path):
        """
        :param spider_cls_path:
        :return:
        """
        crawler = Crawler()
        d = crawler.crawl(spider_cls_path)
        self._active.add(d)

    def start(self):
        dd = defer.DeferredList(self._active)
        dd.addBoth(lambda _:reactor.stop())
        reactor.run()

class Commond(object):
    def run(self):
        crawl_process = CrawlerProcess()
        spider_cls_path_list = [‘spider.chouti.ChoutiSpider‘,‘spider.cnblogs.CnblogsSpider‘,]
        for spider_cls_path in spider_cls_path_list:
            crawl_process.crawl(spider_cls_path)
        crawl_process.start()

if __name__ == ‘__main__‘:
    cmd = Commond()
    cmd.run()

@defer.inlineCallbacks是一个装饰器并用来装饰生成器函数.inlineCallbacks 的主要的目的就是把一个生成器变成一系列的异步的callbacks.

当我们调用一个用inlineCallbacks 修饰的函数的时候,我们不需要调用下一个或者发送或者抛出我们自己.这个装饰器会帮我们完成这些并会确保我们的生成器会一直运行到底(假设它并没有抛出异常).

一个被inlineCallbacks修饰的函数会返回deferred.因为我们不知道生成器什么时候会停止运行,这个被修饰过的函数是一个异步的函数,最适合返回的是deferred.注意这个返回的deferred 不是yield 语句返回的deferred,它是这个生成器全部运行完毕之后才触发的deferred.

使用了callLater 在一段时间之后去触发deferred.这是一个很方便的把非阻塞的延迟放入callback 链的方法,一般来说,在我们的生成器中我们会不断的返回一个已经被触发过的deferred.

以上代码执行以下连个spider。

from engine import Request
class ChoutiSpider(object):

    name = ‘chouti‘

    def start_requests(self):
        start_url = [‘http://www.baidu.com‘,‘http://www.bing.com‘,]
        for url in start_url:
            yield Request(url,self.parse)

    def parse(self,response):
        print(response) #response是下载的页面
        yield Request(‘http://www.cnblogs.com‘,callback=self.parse)

spider1

from engine import Request
class CnblogsSpider(object):

    name = ‘cnblogs‘

    def start_requests(self):
        start_url = [‘http://www.cnblogs.com‘,]
        for url in start_url:
            yield Request(url,self.parse)

    def parse(self,response):
        print(response) #response是下载的页面
        yield Request(‘http://www.cnblogs.com‘,callback=self.parse)

spider2

时间: 2024-11-09 10:30:54

Twisted使用和scrapy源码剖析的相关文章

Scrapy源码剖析之自定义Scrapy框架

from twisted.internet import reactor # 事件循环(终止条件,所有的socket都已经移除) from twisted.web.client import getPage # socket对象(如果下载完成,自动从时间循环中移除...) from twisted.internet import defer # defer.Deferred 特殊的socket对象 (不会发请求,手动移除) from queue import Queue class Reques

下载-深入浅出Netty源码剖析、Netty实战高性能分布式RPC、NIO+Netty5各种RPC架构实战演练三部曲视频教程

下载-深入浅出Netty源码剖析.Netty实战高性能分布式RPC.NIO+Netty5各种RPC架构实战演练三部曲视频教程 第一部分:入浅出Netty源码剖析 第二部分:Netty实战高性能分布式RPC 第三部分:NIO+Netty5各种RPC架构实战演练

Phaser实现源码剖析

在这里首先说明一下,由于Phaser在4.3代码里是存在,但并没有被开放出来供使用,但已经被本人大致研究了,因此也一并进行剖析. Phaser是一个可以重复利用的同步栅栏,功能上与CyclicBarrier和CountDownLatch相似,不过提供更加灵活的用法.也就是说,Phaser的同步模型与它们差不多.一般运用的场景是一组线程希望同时到达某个执行点后(先到达的会被阻塞),执行一个指定任务,然后这些线程才被唤醒继续执行其它任务. Phaser一般是定义一个parties数(parties一

【Java集合源码剖析】HashMap源码剖析

转载请注明出处:http://blog.csdn.net/ns_code/article/details/36034955 HashMap简介 HashMap是基于哈希表实现的,每一个元素是一个key-value对,其内部通过单链表解决冲突问题,容量不足(超过了阀值)时,同样会自动增长. HashMap是非线程安全的,只是用于单线程环境下,多线程环境下可以采用concurrent并发包下的concurrentHashMap. HashMap 实现了Serializable接口,因此它支持序列化,

转:【Java集合源码剖析】Vector源码剖析

转载请注明出处:http://blog.csdn.net/ns_code/article/details/35793865   Vector简介 Vector也是基于数组实现的,是一个动态数组,其容量能自动增长. Vector是JDK1.0引入了,它的很多实现方法都加入了同步语句,因此是线程安全的(其实也只是相对安全,有些时候还是要加入同步语句来保证线程的安全),可以用于多线程环境. Vector没有丝线Serializable接口,因此它不支持序列化,实现了Cloneable接口,能被克隆,实

下载BootStrap企业级应用培训课程(零基础、源码剖析,内部教材,项目实训)

全套500多课,附赠JS OOP编程,转一播放码.下载地址:http://pan.baidu.com/s/1kVLdZmf 第一季:基础篇,侧重于BootStrap 相关 API 详解.主要包含以下内容:Brackets前端开发工具详解.BootStrap框架三大核心-CSS.BootStrap框架三大核心-布局组件.BootStrap框架三大核心-JavaScript插件.附-BootStrap编码规范第二季:高级篇,侧重于BootStap源码解析与第三方扩展.主要包含以下内容:BootStr

菜鸟nginx源码剖析 框架篇(一) 从main函数看nginx启动流程(转)

俗话说的好,牵牛要牵牛鼻子 驾车顶牛,处理复杂的东西,只要抓住重点,才能理清脉络,不至于深陷其中,不能自拔.对复杂的nginx而言,main函数就是“牛之鼻”,只要能理清main函数,就一定能理解其中的奥秘,下面我们就一起来研究一下nginx的main函数. 1.nginx的main函数解读 nginx启动显然是由main函数驱动的,main函数在在core/nginx.c文件中,其源代码解析如下,涉及到的数据结构在本节仅指出其作用,将在第二节中详细解释. nginx main函数的流程图如下:

HashMap(2) 源码剖析(推荐)

今天看代码,想到去年发生的HashMap发生的CPU使用率100%的事件,转载下当时看的三个比较不错的博客(非常推荐) 参考:http://coolshell.cn/articles/9606.html   http://github.thinkingbar.com/hashmap-analysis/ http://developer.51cto.com/art/201102/246431.htm 在 Java 集合类中,使用最多的容器类恐怕就是 HashMap 和 ArrayList 了,所以

菜鸟nginx源码剖析数据结构篇(六) 哈希表 ngx_hash_t(上)

Author:Echo Chen(陈斌) Email:[email protected] Blog:Blog.csdn.net/chen19870707 Date:October 31h, 2014 1.哈希表ngx_hash_t的优势和特点 哈希表是一种典型的以空间换取时间的数据结构,在没有冲突的情况下,对任意元素的插入.索引.删除的时间复杂度都是O(1).这样优秀的时间复杂度是通过将元素的key值以hash方法f映射到哈希表中的某一个位置来访问记录来实现的,即键值为key的元素必定存储在哈希