实验二:事件驱动-回调函数实现爬虫

承接上一节课,我们在上节课中提到,socket I/O 阻塞时的那段时间完全被浪费了,那么要如何节省下那一段时间呢?

非阻塞I/O

如果使用非阻塞I/O,它就不会傻傻地等在那里(比如等连接、等读取),而是会返回一个错误信息,虽然说是说错误信息,它其实就是叫你过一会再来的意思,编程的时候都不把它当错误看。

非阻塞I/O代码如下:

sock = socket.socket()
sock.setblocking(False)
try:
    sock.connect((‘xkcd.com‘, 80))
except BlockingIOError:
    pass

这里抛出的异常无视掉就可以了。

单线程上的多I/O

有了非阻塞I/O这个特性,我们就能够实现单线程上多个sockets的处理了,学过C语言网络编程的同学应该都认识select这个函数吧?不认识也不要紧,select函数如果你不设置它的超时时间它就是默认一直阻塞的,只有当有I/O事件发生时它才会被激活,然后告诉你哪个socket上发生了什么事件(读|写|异常),在Python中也有select,还有跟select功能相同但是更高效的poll,它们都是底层C函数的Python实现。

不过这里我们不使用select,而是用更简单好用的DefaultSelector,是Python 3.4后才出现的一个模块里的类,你只需要在非阻塞socket和事件上绑定回调函数就可以了。

代码如下:

from selectors import DefaultSelector, EVENT_WRITE

selector = DefaultSelector()

sock = socket.socket()
sock.setblocking(False)
try:
    sock.connect((‘localhost‘, 3000))
except BlockingIOError:
    pass

def connected():
    selector.unregister(sock.fileno())
    print(‘connected!‘)

selector.register(sock.fileno(), EVENT_WRITE, connected)

这里看一下selector.register的原型

register(fileobj, events, data=None)

其中fileobj可以是文件描述符也可以是文件对象(通过fileno得到),events是位掩码,指明发生的是什么事件,data 则是与指定文件(也就是我们的socket)与指定事件绑定在一起的数据。

如代码所示,selector.register 在该socket的写事件上绑定了回调函数connected(这里作为数据绑定)。在该socket上第一次发生的写事件意味着连接的建立,connected函数在连接建立成功后再解除了该socket上所有绑定的数据。

事件驱动

看了以上selector的使用方式,我想你会发现它很适合写成事件驱动的形式。

我们可以创建一个事件循环,在循环中不断获得I/O事件:

def loop():
    while True:
     events = selector.select()
        #遍历事件并调用相应的处理
        for event_key, event_mask in events:
         callback = event_key.data
            callback()

其中events_key是一个namedtuple,它的结构大致如下(fileobj,fd,events,data),我们从data得到之前绑定的回调函数并调用。 event_mask则是事件的位掩码。

关于selectors的更多内容,可参考官方文档: https://docs.python.org/3.4/library/selectors.html

完成后续工作

现在我们已经明白了基于回调函数实现事件驱动是怎么一回事了,接着来完成我们的爬虫吧。

首先创建两个set,一个是待处理url的集合,一个是已抓取url的集合,同时初始化为根url ‘/‘

urls_todo = set([‘/‘])
seen_urls = set([‘/‘])

抓取一个页面会需要许多回调函数。比如connected,它会在连接建立成功后向服务器发送一个GET请求请求页面。当然它不会干等着服务器响应(那就阻塞了),而是再绑定另一个接收响应的回调函数read_response。如果read_response在事件触发时无法一次性读取完整的响应,那么就会等下次事件触发时继续读取,直到读取到了完整的响应才解除绑定。

我们将这些回调函数封装在 Fetcher 类中。它有三个成员变量:抓取的urlsocket对象与得到的服务器响应response

class Fetcher:
    def __init__(self, url):
        self.response = b‘‘
        self.url = url
        self.sock = None

实现fetch函数,绑定connected

    # 在Fetcher类中实现
    def fetch(self):
        self.sock = socket.socket()
        self.sock.setblocking(False)
        try:
            self.sock.connect((‘xkcd.com‘, 80))
        except BlockingIOError:
            pass

        selector.register(self.sock.fileno(),
                          EVENT_WRITE,
                          self.connected)

注意到fetch函数在内部调用connect尝试建立socket连接并绑定回调函数,I/O的处理则都是交给事件循环控制的。Fetcher与事件循环的关系如下:

# Begin fetching http://xkcd.com/353/
fetcher = Fetcher(‘/353/‘)
fetcher.fetch()

# 事件循环
while True:
    events = selector.select()
    for event_key, event_mask in events:
        callback = event_key.data
        callback(event_key, event_mask)

connected的实现:

def connected(self, key, mask):
    print(‘connected!‘)
    #解除该socket上的所有绑定
    selector.unregister(key.fd)
    request = ‘GET {} HTTP/1.0\r\nHost: xkcd.com\r\n\r\n‘.format(self.url)
    self.sock.send(request.encode(‘ascii‘))

    # 连接建立后绑定读取响应的回调函数
    selector.register(key.fd,
                      EVENT_READ,
                      self.read_response)

read_response的实现:

def read_response(self, key, mask):
    global stopped

    chunk = self.sock.recv(4096)  # 每次接收最多4K的信息
    if chunk:
        self.response += chunk
    else:
        selector.unregister(key.fd)  # 完成接收则解除绑定
        links = self.parse_links()

        # Python set-logic:
        for link in links.difference(seen_urls):
            urls_todo.add(link)
            Fetcher(link).fetch()  # 抓取新的url

        seen_urls.update(links)
        urls_todo.remove(self.url)
        if not urls_todo:
            stopped = True         # 当抓取队列为空时结束事件循环

parse_links 如上一节课,它的作用是返回抓取到的页面中的所有发现的url的集合。 parse_links 之后,遍历了每一个没抓取过的url并为其创建一个新的Fetcher 对象并调用fetch 函数开始抓取。

parse_links等其它函数的实现:

def body(self):
    body = self.response.split(b‘\r\n\r\n‘, 1)[1]
    return body.decode(‘utf-8‘)

def parse_links(self):
    if not self.response:
        print(‘error: {}‘.format(self.url))
        return set()
    if not self._is_html():
        return set()
    urls = set(re.findall(r‘‘‘(?i)href=["‘]?([^\s"‘<>]+)‘‘‘,
                          self.body()))

    links = set()
    for url in urls:
        normalized = urllib.parse.urljoin(self.url, url)
        parts = urllib.parse.urlparse(normalized)
        if parts.scheme not in (‘‘, ‘http‘, ‘https‘):
            continue
        host, port = urllib.parse.splitport(parts.netloc)
        if host and host.lower() not in (‘xkcd.com‘, ‘www.xkcd.com‘):
            continue
        defragmented, frag = urllib.parse.urldefrag(parts.path)
        links.add(defragmented)

    return links

def _is_html(self):
    head, body = self.response.split(b‘\r\n\r\n‘, 1)
    headers = dict(h.split(‘: ‘) for h in head.decode().split(‘\r\n‘)[1:])
    return headers.get(‘Content-Type‘, ‘‘).startswith(‘text/html‘)

将事件循环改为stopped时停止:

start = time.time()
stopped = False

def loop():
    while not stopped:
        events = selector.select()
        for event_key, event_mask in events:
            callback = event_key.data
            callback()

print(‘{} URLs fetched in {:.1f} seconds‘.format(
    len(seen_urls), time.time() - start))

运行效果

这里先奉上完整代码:

from selectors import *
import socket
import re
import urllib.parse
import time

urls_todo = set([‘/‘])
seen_urls = set([‘/‘])
#追加了一个可以看最高并发数的变量
concurrency_achieved = 0
selector = DefaultSelector()
stopped = False

class Fetcher:
    def __init__(self, url):
        self.response = b‘‘
        self.url = url
        self.sock = None

    def fetch(self):
        global concurrency_achieved
        concurrency_achieved = max(concurrency_achieved, len(urls_todo))

        self.sock = socket.socket()
        self.sock.setblocking(False)
        try:
            self.sock.connect((‘localhost‘, 3000))
        except BlockingIOError:
            pass
        selector.register(self.sock.fileno(), EVENT_WRITE, self.connected)

    def connected(self, key, mask):
        selector.unregister(key.fd)
        get = ‘GET {} HTTP/1.0\r\nHost: localhost\r\n\r\n‘.format(self.url)
        self.sock.send(get.encode(‘ascii‘))
        selector.register(key.fd, EVENT_READ, self.read_response)

    def read_response(self, key, mask):
        global stopped

        chunk = self.sock.recv(4096)  # 4k chunk size.
        if chunk:
            self.response += chunk
        else:
            selector.unregister(key.fd)  # Done reading.
            links = self.parse_links()
            for link in links.difference(seen_urls):
                urls_todo.add(link)
                Fetcher(link).fetch()

            seen_urls.update(links)
            urls_todo.remove(self.url)
            if not urls_todo:
                stopped = True
            print(self.url)

    def body(self):
        body = self.response.split(b‘\r\n\r\n‘, 1)[1]
        return body.decode(‘utf-8‘)

    def parse_links(self):
        if not self.response:
            print(‘error: {}‘.format(self.url))
            return set()
        if not self._is_html():
            return set()
        urls = set(re.findall(r‘‘‘(?i)href=["‘]?([^\s"‘<>]+)‘‘‘,
                              self.body()))

        links = set()
        for url in urls:
            normalized = urllib.parse.urljoin(self.url, url)
            parts = urllib.parse.urlparse(normalized)
            if parts.scheme not in (‘‘, ‘http‘, ‘https‘):
                continue
            host, port = urllib.parse.splitport(parts.netloc)
            if host and host.lower() not in (‘localhost‘):
                continue
            defragmented, frag = urllib.parse.urldefrag(parts.path)
            links.add(defragmented)

        return links

    def _is_html(self):
        head, body = self.response.split(b‘\r\n\r\n‘, 1)
        headers = dict(h.split(‘: ‘) for h in head.decode().split(‘\r\n‘)[1:])
        return headers.get(‘Content-Type‘, ‘‘).startswith(‘text/html‘)

start = time.time()
fetcher = Fetcher(‘/‘)
fetcher.fetch()

while not stopped:
    events = selector.select()
    for event_key, event_mask in events:
        callback = event_key.data
        callback(event_key, event_mask)

print(‘{} URLs fetched in {:.1f} seconds, achieved concurrency = {}‘.format(
    len(seen_urls), time.time() - start, concurrency_achieved))

输入python3 callback.py命令查看效果。不要忘了先开网站的服务器哦。

基于回调函数实现的缺陷

想想之前从建立连接到读取响应到解析新的url到工作队列中,这一切都能够在一个函数中完成,就像下面这样:

def fetch(url):
    sock = socket.socket()
    sock.connect((‘localhost‘, 3000))
    request = ‘GET {} HTTP/1.0\r\nHost: localhost\r\n\r\n‘.format(url)
    sock.send(request.encode(‘ascii‘))
    response = b‘‘
    chunk = sock.recv(4096)
    while chunk:
        response += chunk
        chunk = sock.recv(4096)

    links = parse_links(response)
    q.add(links)

而用回调函数实现,整个整体就支离破碎了,哪里阻塞就又不得不在那里把函数一切为二,代码会显得非常乱,维护也变的很麻烦。更麻烦的是如果在回调函数中抛出了异常,你根本得不到什么有用的信息:

Traceback (most recent call last):
  File "loop-with-callbacks.py", line 111, in <module>
    loop()
  File "loop-with-callbacks.py", line 106, in loop
    callback(event_key, event_mask)
  File "loop-with-callbacks.py", line 51, in read_response
    links = self.parse_links()
  File "loop-with-callbacks.py", line 67, in parse_links
    raise Exception(‘parse error‘)
Exception: parse error

你看不到这个回调函数的上下文是什么,你只知道它在事件循环里。你想在这个函数外抓取它的异常都没地方下手。但是这又是回调实现无法避免的缺陷,那我们想实现并发异步应该怎么办咧?

时间: 2024-08-03 13:48:57

实验二:事件驱动-回调函数实现爬虫的相关文章

node.js学习笔记(二)——回调函数

Node.js 异步编程的直接体现就是回调. 那什么是回调呢?回调指的是将一个函数作为参数传递给另一个函数,并且通常在第一个函数完成后被调用.需要指明的是,回调函数不是由该函数的实现方直接调用,而是在特定的事件或条件发生时由另外的一方调用的,用于对该事件或条件进行响应.回调函数在完成任务后就会被调用,Node 使用了大量的回调函数,Node 所有 API 都支持回调函数.例如,我们可以一边读取文件,一边执行其他命令,在文件读取完成后,我们将文件内容作为回调函数的参数返回.这样在执行代码时就没有阻

python全栈开发基础【第二十二篇】进程池和回调函数

一.数据共享 1.进程间的通信应该尽量避免共享数据的方式 2.进程间的数据是独立的,可以借助队列或管道实现通信,二者都是基于消息传递的. 虽然进程间数据独立,但可以用过Manager实现数据共享,事实上Manager的功能远不止于此. 命令就是一个程序,按回车就会执行(这个只是在windows情况下) tasklist 查看进程 tasklist | findstr pycharm #(findstr是进行过滤的),|就是管道(tasklist执行的内容就放到管道里面了, 管道后面的findst

并发编程之多进程3 (生产者与消费者模型) 回调函数

一.生产者消费模型补充 总结: ---生产者消费者模型程序中两种角色:①负责生产数据(生产者):②负责处理数据(消费者) ---生产者消费者模型的作用:平衡生产者与消费者之间的速度差. ---实现方式:生产者-->队列-->消费者 如上篇博客内容关于生产消费模型内容,在生产者生产数据的过程结束后,即使消费者已将数据完全获取,消费者程序也不能结束,需由主进程或者生产者在结束生产程序后发送给消费者结束口令,消费者程序才会结束.但是如果出现多个消费者和多个生产者,这种情况又该如何解决?方法如下两种:

Python学习【第21篇】:进程池以及回调函数

python并发编程之多进程2-------------数据共享及进程池和回调函数 一.数据共享 1.进程间的通信应该尽量避免共享数据的方式 2.进程间的数据是独立的,可以借助队列或管道实现通信,二者都是基于消息传递的. 虽然进程间数据独立,但可以用过Manager实现数据共享,事实上Manager的功能远不止于此. ? 1 2 3 4 命令就是一个程序,按回车就会执行(这个只是在windows情况下) tasklist 查看进程 tasklist | findstr  pycharm   #(

函数指针与回调函数

一.函数指针 1.  函数指针就是一个指针变量,用来指向函数地址.正在运行的程序(进程)在内存中占据一定的空间.进程包括编译好的程序代码和需要使用的变量.因此,程序代码中的函数就是一些字符域,要得到一个函数地址,也就是得到这些字符域的起始地址. 2. 函数指针的三种形式:    指向普通C函数的函数指针 --- C语言中的函数指针    指向C++类静态成员函数的函数指针 --- C++    指向C++类非静态成员函数的函数指针 --- C++ 函数指针的本质自然也就是函数地址.  类成员函数

函数指针,回调函数的定义和使用

一.函数指针 定义:函数指针是指向函数的指针变量,即本质是一个指针变量. int (*f) (int x); /* 声明一个函数指针 */ f=func; /* 将func函数的首地址赋给指针f */ 指向函数的指针包含了函数的地址,可以通过它来调用函数.声明格式如下:  类型说明符 (*函数名)(参数)其实这里不能称为函数名,应该叫做指针的变量名.这个特殊的指针指向一个返回整型值的函数.指针的声明笔削和它指向函数的声明保持一致.指针名和指针运算符外面的括号改变了默认的运算符优先级.如果没有圆括

C++中类成员函数作为回调函数

注:与tr1::function对象结合使用,能获得更好的效果,详情见http://blog.csdn.net/this_capslock/article/details/38564719 回调函数是基于C编程的Windows SDK的技术,不是针对C++的,程序员可以将一个C函数直接作为回调函数,但是如果试图直接使用C++的成员函数作为回调函数将发生错误,甚至编译就不能通过. 普通的C++成员函数都隐含了一个传递函数作为参数,亦即“this”指针,C++通过传递一个指向自身的指针给其成员函数从

C函数指针与回调函数

一.函数指针 简单声明一个函数指针并不意味着它马上就可以使用,和其它指针一样,对函数指针执行简接访问之前必须把它初始化为指向某一个函数. int f(int); int (*pf)(int)=&f; 第二个声明创建了函数指针pf,并把它初始化为指向函数f.函数指针的初始化也可以通过一条赋值语句完成.在函数指针的初始化之前具有f的原型是很重要的,否则编译器就无法检查f的类型是否与pf所指向的类型一致. 初始化表达式中的&操作符是可选的,因为函数名被使用时总是由编译器把它转换为函数指针.&am

javascript 回调函数(转)

一,回调函数定义 百度百科:回调函数 回调函数就是一个通过函数指针调用的函数.如果你把函数的指针(地址)作为参数传递给另一个函数,当这个指针被用为调用它所指向的函数时,我们就说这是回调函数.回调函数不是由该函数的实现方直接调用,而是在特定的事件或条件发生时由另外的一方调用的,用于对该事件或条件进行响应. 在JavaScript中,回调函数具体的定义为:函数A作为参数(函数引用)传递到另一个函数B中,并且这个函数B执行函数A.我们就说函数A叫做回调函数.如果没有名称(函数表达式),就叫做匿名回调函