爬虫必备—性能相关(异步非阻塞)

在编写爬虫时,性能的消耗主要在IO请求中,当单进程单线程模式下请求URL时必然会引起等待,从而使得请求整体变慢。

1. 同步执行

 1 import requests
 2
 3 def fetch_async(url):
 4     response = requests.get(url)
 5     return response
 6
 7
 8 url_list = [‘http://www.github.com‘, ‘http://www.bing.com‘]
 9
10 for url in url_list:
11     fetch_async(url)

2. 多线程执行(多个线程并发执行,时间长短取决于最长的URL请求)

 1 from concurrent.futures import ThreadPoolExecutor
 2 import requests
 3
 4
 5 def fetch_async(url):
 6     response = requests.get(url)
 7     return response
 8
 9
10 url_list = [‘http://www.github.com‘, ‘http://www.bing.com‘]
11 pool = ThreadPoolExecutor(5)
12 for url in url_list:
13     pool.submit(fetch_async, url)
14 pool.shutdown(wait=True)

3.  多进程执行(在CPU核心数足够的情况下,多个进程并行执行,时间长短取决于最长的URL请求,理论上会快于多线程)

 1 from concurrent.futures import ProcessPoolExecutor
 2 import requests
 3
 4 def fetch_async(url):
 5     response = requests.get(url)
 6     return response
 7
 8
 9 url_list = [‘http://www.github.com‘, ‘http://www.bing.com‘]
10 pool = ProcessPoolExecutor(5)
11 for url in url_list:
12     pool.submit(fetch_async, url)
13 pool.shutdown(wait=True)

4. 多线程+回调函数(实现了异步非阻塞,在IO等待的情况下可以做其它事情)

 1 from concurrent.futures import ThreadPoolExecutor
 2 import requests
 3
 4 def fetch_async(url):
 5     response = requests.get(url)
 6     return response
 7
 8
 9 def callback(future):
10     print(future.result())
11
12
13 url_list = [‘http://www.github.com‘, ‘http://www.bing.com‘]
14 pool = ThreadPoolExecutor(5)
15 for url in url_list:
16     v = pool.submit(fetch_async, url)
17     v.add_done_callback(callback)
18 pool.shutdown(wait=True)

5. 多进程+回调函数(实现了异步非阻塞,在IO等待的情况下可以做其它事情)

 1 from concurrent.futures import ProcessPoolExecutor
 2 import requests
 3
 4
 5 def fetch_async(url):
 6     response = requests.get(url)
 7     return response
 8
 9
10 def callback(future):
11     print(future.result())
12
13
14 url_list = [‘http://www.github.com‘, ‘http://www.bing.com‘]
15 pool = ProcessPoolExecutor(5)
16 for url in url_list:
17     v = pool.submit(fetch_async, url)
18     v.add_done_callback(callback)
19 pool.shutdown(wait=True)

通过上述代码均可以完成对请求性能的提高,对于多线程和多进程的缺点是在IO阻塞时会造成了线程和进程的浪费,所以异步IO会是首选:

1. asyncio示例一

import asyncio

@asyncio.coroutine
def func1():
    print(‘before...func1......‘)
    yield from asyncio.sleep(5)
    print(‘end...func1......‘)

tasks = [func1(), func1()]

loop = asyncio.get_event_loop()
loop.run_until_complete(asyncio.gather(*tasks))
loop.close()

2. asyncio示例二

 1 import asyncio
 2
 3
 4 @asyncio.coroutine
 5 def fetch_async(host, url=‘/‘):
 6     print(host, url)
 7     reader, writer = yield from asyncio.open_connection(host, 80)
 8
 9     request_header_content = """GET %s HTTP/1.0\r\nHost: %s\r\n\r\n""" % (url, host,)
10     request_header_content = bytes(request_header_content, encoding=‘utf-8‘)
11
12     writer.write(request_header_content)
13     yield from writer.drain()
14     text = yield from reader.read()
15     print(host, url, text)
16     writer.close()
17
18 tasks = [
19     fetch_async(‘www.cnblogs.com‘, ‘/wupeiqi/‘),
20     fetch_async(‘dig.chouti.com‘, ‘/pic/show?nid=4073644713430508&lid=10273091‘)
21 ]
22
23 loop = asyncio.get_event_loop()
24 results = loop.run_until_complete(asyncio.gather(*tasks))
25 loop.close()

3. asyncio + aiohttp

 1 import aiohttp
 2 import asyncio
 3
 4
 5 @asyncio.coroutine
 6 def fetch_async(url):
 7     print(url)
 8     response = yield from aiohttp.request(‘GET‘, url)
 9     # data = yield from response.read()
10     # print(url, data)
11     print(url, response)
12     response.close()
13
14
15 tasks = [fetch_async(‘http://www.google.com/‘), fetch_async(‘http://www.chouti.com/‘)]
16
17 event_loop = asyncio.get_event_loop()
18 results = event_loop.run_until_complete(asyncio.gather(*tasks))
19 event_loop.close()

4. asyncio + requests

 1 import asyncio
 2 import requests
 3
 4
 5 @asyncio.coroutine
 6 def fetch_async(func, *args):
 7     loop = asyncio.get_event_loop()
 8     future = loop.run_in_executor(None, func, *args)
 9     response = yield from future
10     print(response.url, response.content)
11
12
13 tasks = [
14     fetch_async(requests.get, ‘http://www.cnblogs.com/wupeiqi/‘),
15     fetch_async(requests.get, ‘http://dig.chouti.com/pic/show?nid=4073644713430508&lid=10273091‘)
16 ]
17
18 loop = asyncio.get_event_loop()
19 results = loop.run_until_complete(asyncio.gather(*tasks))
20 loop.close()

5. gevent + requests

 1 import gevent
 2
 3 import requests
 4 from gevent import monkey
 5
 6 monkey.patch_all()
 7
 8
 9 def fetch_async(method, url, req_kwargs):
10     print(method, url, req_kwargs)
11     response = requests.request(method=method, url=url, **req_kwargs)
12     print(response.url, response.content)
13
14 # ##### 发送请求 #####
15 gevent.joinall([
16     gevent.spawn(fetch_async, method=‘get‘, url=‘https://www.python.org/‘, req_kwargs={}),
17     gevent.spawn(fetch_async, method=‘get‘, url=‘https://www.yahoo.com/‘, req_kwargs={}),
18     gevent.spawn(fetch_async, method=‘get‘, url=‘https://github.com/‘, req_kwargs={}),
19 ])
20
21 # ##### 发送请求(协程池控制最大协程数量) #####
22 # from gevent.pool import Pool
23 # pool = Pool(None)
24 # gevent.joinall([
25 #     pool.spawn(fetch_async, method=‘get‘, url=‘https://www.python.org/‘, req_kwargs={}),
26 #     pool.spawn(fetch_async, method=‘get‘, url=‘https://www.yahoo.com/‘, req_kwargs={}),
27 #     pool.spawn(fetch_async, method=‘get‘, url=‘https://www.github.com/‘, req_kwargs={}),
28 # ])

6. grequests(封装的 gevent + requests)

 1 import grequests
 2
 3
 4 request_list = [
 5     grequests.get(‘http://httpbin.org/delay/1‘, timeout=0.001),
 6     grequests.get(‘http://fakedomain/‘),
 7     grequests.get(‘http://httpbin.org/status/500‘)
 8 ]
 9
10
11 # ##### 执行并获取响应列表 #####
12 # response_list = grequests.map(request_list)
13 # print(response_list)
14
15
16 # ##### 执行并获取响应列表(处理异常) #####
17 # def exception_handler(request, exception):
18 # print(request,exception)
19 #     print("Request failed")
20
21 # response_list = grequests.map(request_list, exception_handler=exception_handler)
22 # print(response_list)

7. Twisted示例

 1 from twisted.web.client import getPage, defer
 2 from twisted.internet import reactor
 3
 4
 5 def all_done(arg):
 6     reactor.stop()
 7
 8
 9 def callback(contents):
10     print(contents)
11
12
13 deferred_list = []
14
15 url_list = [‘http://www.bing.com‘, ‘http://www.baidu.com‘, ]
16 for url in url_list:
17     deferred = getPage(bytes(url, encoding=‘utf8‘))
18     deferred.addCallback(callback)
19     deferred_list.append(deferred)
20
21 dlist = defer.DeferredList(deferred_list)
22 dlist.addBoth(all_done)
23
24 reactor.run()

8. Tornado

 1 from tornado.httpclient import AsyncHTTPClient
 2 from tornado.httpclient import HTTPRequest
 3 from tornado import ioloop
 4
 5
 6 def handle_response(response):
 7     """
 8     处理返回值内容(需要维护计数器,来停止IO循环),调用 ioloop.IOLoop.current().stop()
 9     :param response:
10     :return:
11     """
12     if response.error:
13         print("Error:", response.error)
14     else:
15         print(response.body)
16
17
18 def func():
19     url_list = [
20         ‘http://www.baidu.com‘,
21         ‘http://www.bing.com‘,
22     ]
23     for url in url_list:
24         print(url)
25         http_client = AsyncHTTPClient()
26         http_client.fetch(HTTPRequest(url), handle_response)
27
28
29 ioloop.IOLoop.current().add_callback(func)
30 ioloop.IOLoop.current().start()

9. Twisted更多

 1 from twisted.internet import reactor
 2 from twisted.web.client import getPage
 3 import urllib.parse
 4
 5
 6 def one_done(arg):
 7     print(arg)
 8     reactor.stop()
 9
10 post_data = urllib.parse.urlencode({‘check_data‘: ‘adf‘})
11 post_data = bytes(post_data, encoding=‘utf8‘)
12 headers = {b‘Content-Type‘: b‘application/x-www-form-urlencoded‘}
13 response = getPage(bytes(‘http://dig.chouti.com/login‘, encoding=‘utf8‘),
14                    method=bytes(‘POST‘, encoding=‘utf8‘),
15                    postdata=post_data,
16                    cookies={},
17                    headers=headers)
18 response.addBoth(one_done)
19
20 reactor.run()

以上均是Python内置以及第三方模块提供异步IO请求模块,使用简便大大提高效率,而对于异步IO请求的本质则是【非阻塞Socket】+【IO多路复用】:

  1 import select
  2 import socket
  3 import time
  4
  5
  6 class AsyncTimeoutException(TimeoutError):
  7     """
  8     请求超时异常类
  9     """
 10
 11     def __init__(self, msg):
 12         self.msg = msg
 13         super(AsyncTimeoutException, self).__init__(msg)
 14
 15
 16 class HttpContext(object):
 17     """封装请求和相应的基本数据"""
 18
 19     def __init__(self, sock, host, port, method, url, data, callback, timeout=5):
 20         """
 21         sock: 请求的客户端socket对象
 22         host: 请求的主机名
 23         port: 请求的端口
 24         port: 请求的端口
 25         method: 请求方式
 26         url: 请求的URL
 27         data: 请求时请求体中的数据
 28         callback: 请求完成后的回调函数
 29         timeout: 请求的超时时间
 30         """
 31         self.sock = sock
 32         self.callback = callback
 33         self.host = host
 34         self.port = port
 35         self.method = method
 36         self.url = url
 37         self.data = data
 38
 39         self.timeout = timeout
 40
 41         self.__start_time = time.time()
 42         self.__buffer = []
 43
 44     def is_timeout(self):
 45         """当前请求是否已经超时"""
 46         current_time = time.time()
 47         if (self.__start_time + self.timeout) < current_time:
 48             return True
 49
 50     def fileno(self):
 51         """请求sockect对象的文件描述符,用于select监听"""
 52         return self.sock.fileno()
 53
 54     def write(self, data):
 55         """在buffer中写入响应内容"""
 56         self.__buffer.append(data)
 57
 58     def finish(self, exc=None):
 59         """在buffer中写入响应内容完成,执行请求的回调函数"""
 60         if not exc:
 61             response = b‘‘.join(self.__buffer)
 62             self.callback(self, response, exc)
 63         else:
 64             self.callback(self, None, exc)
 65
 66     def send_request_data(self):
 67         content = """%s %s HTTP/1.0\r\nHost: %s\r\n\r\n%s""" % (
 68             self.method.upper(), self.url, self.host, self.data,)
 69
 70         return content.encode(encoding=‘utf8‘)
 71
 72
 73 class AsyncRequest(object):
 74     def __init__(self):
 75         self.fds = []
 76         self.connections = []
 77
 78     def add_request(self, host, port, method, url, data, callback, timeout):
 79         """创建一个要请求"""
 80         client = socket.socket()
 81         client.setblocking(False)
 82         try:
 83             client.connect((host, port))
 84         except BlockingIOError as e:
 85             pass
 86             # print(‘已经向远程发送连接的请求‘)
 87         req = HttpContext(client, host, port, method, url, data, callback, timeout)
 88         self.connections.append(req)
 89         self.fds.append(req)
 90
 91     def check_conn_timeout(self):
 92         """检查所有的请求,是否有已经连接超时,如果有则终止"""
 93         timeout_list = []
 94         for context in self.connections:
 95             if context.is_timeout():
 96                 timeout_list.append(context)
 97         for context in timeout_list:
 98             context.finish(AsyncTimeoutException(‘请求超时‘))
 99             self.fds.remove(context)
100             self.connections.remove(context)
101
102     def running(self):
103         """事件循环,用于检测请求的socket是否已经就绪,从而执行相关操作"""
104         while True:
105             r, w, e = select.select(self.fds, self.connections, self.fds, 0.05)
106
107             if not self.fds:
108                 return
109
110             for context in r:
111                 sock = context.sock
112                 while True:
113                     try:
114                         data = sock.recv(8096)
115                         if not data:
116                             self.fds.remove(context)
117                             context.finish()
118                             break
119                         else:
120                             context.write(data)
121                     except BlockingIOError as e:
122                         break
123                     except TimeoutError as e:
124                         self.fds.remove(context)
125                         self.connections.remove(context)
126                         context.finish(e)
127                         break
128
129             for context in w:
130                 # 已经连接成功远程服务器,开始向远程发送请求数据
131                 if context in self.fds:
132                     data = context.send_request_data()
133                     context.sock.sendall(data)
134                     self.connections.remove(context)
135
136             self.check_conn_timeout()
137
138
139 if __name__ == ‘__main__‘:
140     def callback_func(context, response, ex):
141         """
142         :param context: HttpContext对象,内部封装了请求相关信息
143         :param response: 请求响应内容
144         :param ex: 是否出现异常(如果有异常则值为异常对象;否则值为None)
145         :return:
146         """
147         print(context, response, ex)
148
149     obj = AsyncRequest()
150     url_list = [
151         {‘host‘: ‘www.google.com‘, ‘port‘: 80, ‘method‘: ‘GET‘, ‘url‘: ‘/‘, ‘data‘: ‘‘, ‘timeout‘: 5,
152          ‘callback‘: callback_func},
153         {‘host‘: ‘www.baidu.com‘, ‘port‘: 80, ‘method‘: ‘GET‘, ‘url‘: ‘/‘, ‘data‘: ‘‘, ‘timeout‘: 5,
154          ‘callback‘: callback_func},
155         {‘host‘: ‘www.bing.com‘, ‘port‘: 80, ‘method‘: ‘GET‘, ‘url‘: ‘/‘, ‘data‘: ‘‘, ‘timeout‘: 5,
156          ‘callback‘: callback_func},
157     ]
158     for item in url_list:
159         print(item)
160         obj.add_request(**item)
161
162     obj.running()

史上最牛逼的异步IO模块

本文转载自: 银角大王

http://www.cnblogs.com/wupeiqi/articles/6229292.html

时间: 2024-10-11 07:37:35

爬虫必备—性能相关(异步非阻塞)的相关文章

Python的异步编程[0] -&gt; 协程[1] -&gt; 使用协程建立自己的异步非阻塞模型

使用协程建立自己的异步非阻塞模型 接下来例子中,将使用纯粹的Python编码搭建一个异步模型,相当于自己构建的一个asyncio模块,这也许能对asyncio模块底层实现的理解有更大的帮助.主要参考为文末的链接,以及自己的补充理解. 完整代码 1 #!/usr/bin/python 2 # ============================================================= 3 # File Name: async_base.py 4 # Author: L

利用tornado使请求实现异步非阻塞

基本IO模型 网上搜了很多关于同步异步,阻塞非阻塞的说法,理解还是不能很透彻,有必要买书看下. 参考:使用异步 I/O 大大提高应用程序的性能 怎样理解阻塞非阻塞与同步异步的区别? 同步和异步:主要关注消息通信机制(重点在B?). 同步:A调用B,B处理直到获得结果,才返回给A. 异步:A调用B,B直接返回.无需等待结果,B通过状态,通知等来通知A或回调函数来处理. 阻塞和非阻塞:主要关注程序等待(重点在A?). 阻塞:A调用B,A被挂起直到B返回结果给A,A继续执行. 非阻塞:A调用B,A不会

透过现象看本质——回头再看Nginx(进程模型、异步非阻塞、源码目录结构)

透过现象看本质--回头再看Nginx Nginx的进程模型 ? 使用过nginx的朋友都知道nginx的性能很高,而其原因可能少有人知.首先,nginx的架构就奠定了其高性能的基础.那么就先来看看nginx的基础架构吧,如下图所示:(不能完全理清楚所有内容也没关系,因为本小节讲述的主要内容是Nginx的进程模型) ? 本小节先来说说Nginx基础架构中的进程模型: ? 所谓进程模型,即Nginx响应请求或服务时程序运行(机器执行指令集)的方式,一般在nginx服务启动后,在Unix系统中会以da

node 单线程异步非阻塞

链接:http://www.runoob.com/nodejs/nodejs-callback.html 首先什么是单线程异步非阻塞? 单线程的意思整个程序从头到尾但是运用一个线程,程序是从上往下执行的.异步操作就是程序虽然是从上到下执行的,但是某个函数执行时间过长时并不会阻塞在那里等待它执行完,然后在执行下面的代码.非阻塞也就是这个意思. 为什么node是异步非阻塞的呢,得力于回调函数,还有js中的定时器也是经典的异步操作. ###4.1 Node.js异步机制 由于异步的高效性,node.j

nginx学习(二)——基础概念之异步非阻塞

上面讲了很多关于nginx的进程模型,接下来,我们来看看nginx是如何处理事件的. 有人可能要问了,nginx采用多worker的方式来处理请求,每个worker里面只有一个主线程,那能够处理的并发数很有限啊,多少个worker就能处理多少个并发,何来高并发呢?非也,这就是nginx的高明之处,nginx采用了异步非阻塞的方式来处理请求,也就是说,nginx是可以同时处理成千上万个请求的.想想apache的常用工作方式(apache也有异步非阻塞版本,但因其与自带某些模块冲突,所以不常用),每

异步非阻塞

首先讨论下使用事件驱动,异步编程的优点: 充分利用了系统资源,执行代码无须阻塞等待某种操作完成,有限的资源可以用于其他的任务.其非常适合于后端的网络服务编程. 在服务器开发中,并发的请求处理是个大问题,阻塞式的函数会导致资源浪费和时间延迟.通过事件注册.异步函数,开发人员可以提高资源的利用率,性能也会改善.其nginx和node.js处理并发都是采用的事件驱动异步非阻塞模式.其中nginx中处理并发用的是epoll,poll,queue等方式,node.js使用的是libev,它们对大规模的HT

基于MFC的socket编程(异步非阻塞通信)

对于许多初学者来说,网络通信程序的开发,普遍的一个现象就是觉得难以入手.许多概念,诸如:同步(Sync)/异步(Async),阻塞(Block)/非阻塞(Unblock)等,初学者往往迷惑不清,只知其所以而不知起所以然. 异步方式指的是发送方不等接收方响应,便接着发下个数据包的通信方式:而同步指发送方发出数据后,等收到接收方发回的响应,才发下一个数据包的通信方式. 阻塞套接字是指执行此套接字的网络调用时,直到成功才返回,否则一直阻塞在此网络调用上,比如调用recv()函数读取网络缓冲区中的数据,

并行,并发,串行,同步,异步,阻塞,非阻塞,同步阻塞,同步非阻塞,异步阻塞,异步非阻塞

并行和并发 并发和并行从宏观上来讲都是同时处理多路请求的概念.但并发和并行又有区别,并行是指两个或者多个事件(多核线程)在同一时刻发生:而并发是指两个或多个事件(进程或者程序)在同一时间间隔内发生.计算机在宏观上并发,微观上并行. 在操作系统中,并发是指一个时间段中有几个程序都处于已启动运行到运行完毕之间,且这几个程序都是在同一个处理机上运行,但任一个时刻点上只有一个程序在处理机上运行. ①程序与计算不再一一对应,一个程序副本可以有多个计算 ②并发程序之间有相互制约关系,直接制约体现为一个程序需

nodejs的异步非阻塞IO

简单表述一下:发启向系统IO操作请求,系统使用线程池IO操作,执行完放到事件队列里,node主线程轮询事件队列,读取结果与调用回调.所以说node并非真的单线程,还是使用了线程池的多线程. 上个图看看吧 举一反三:所有的异步非阻塞思路都类似,如:nginx,python的模拟异步非阻塞,还有java的nio.C#的 EAP