爬虫性能分析

对于爬虫,python进行并发抓取的实现方式主要有以下几种:进程,线程,协程。

性能的消耗主要在IO请求中,当单进程单线程模式下请求URL时必然会引起等待,从而使得请求整体变慢。

一 多进程执行

可以实现并发,但是,请求发送出去后和返回之前,中间时期进程空闲

编写方式:
1- 多进程直接返回处理

 1 from concurrent.futures import ProcessPoolExecutor
 2 import requests
 3 import time
 4
 5 def task(url):
 6     response = requests.get(url)
 7     print(url,response)
 8     # 写正则表达式
 9     return response
10
11 pool = ProcessPoolExecutor(7)
12 url_list = [
13     ‘http://www.cnblogs.com/wupeiqi‘,
14     ‘http://huaban.com/favorite/beauty/‘,
15     ‘http://www.bing.com‘,
16     ‘http://www.zhihu.com‘,
17     ‘http://www.sina.com‘,
18     ‘http://www.baidu.com‘,
19     ‘http://www.autohome.com.cn‘,
20 ]
21
22 for url in url_list:
23     pool.submit(task,url)
24
25 pool.shutdown(wait=True)

2- 多进程通过回调函数处理

 1 from concurrent.futures import ProcessPoolExecutor
 2 import requests
 3 import time
 4
 5 def task(url):
 6     response = requests.get(url)
 7     return response
 8
 9 def done(future,*args,**kwargs):
10     response = future.result()
11     print(response.status_code,response.content)
12
13 pool = ProcessPoolExecutor(7)
14 url_list = [
15     ‘http://www.cnblogs.com/wupeiqi‘,
16     ‘http://huaban.com/favorite/beauty/‘,
17     ‘http://www.bing.com‘,
18     ‘http://www.zhihu.com‘,
19     ‘http://www.sina.com‘,
20     ‘http://www.baidu.com‘,
21     ‘http://www.autohome.com.cn‘,
22 ]
23 for url in url_list:
24     v = pool.submit(task,url)
25     v.add_done_callback(done)
26
27 pool.shutdown(wait=True)

二 多线程执行

爬虫可以实现并发,但是,请求发送出去后和返回之前,中间时期线程空闲。

编写方式:
1 多线程直接返回处理

from concurrent.futures import ThreadPoolExecutor
import requests
import time

def task(url):
    response = requests.get(url)
    print(url,response)
    # 写正则表达式

pool = ThreadPoolExecutor(7)
url_list = [
    ‘http://www.cnblogs.com/wupeiqi‘,
    ‘http://huaban.com/favorite/beauty/‘,
    ‘http://www.bing.com‘,
    ‘http://www.zhihu.com‘,
    ‘http://www.sina.com‘,
    ‘http://www.baidu.com‘,
    ‘http://www.autohome.com.cn‘,
]
for url in url_list:
    pool.submit(task,url)

pool.shutdown(wait=True)

2 多线程通过回调函数处理

from concurrent.futures import ThreadPoolExecutor
import requests
import time

def task(url):
    """
    下载页面
    :param url:
    :return:
    """
    response = requests.get(url)
    return response

def done(future,*args,**kwargs):
    response = future.result()
    print(response.status_code,response.content)

pool = ThreadPoolExecutor(7)
url_list = [
    ‘http://www.cnblogs.com/wupeiqi‘,
    ‘http://huaban.com/favorite/beauty/‘,
    ‘http://www.bing.com‘,
    ‘http://www.zhihu.com‘,
    ‘http://www.sina.com‘,
    ‘http://www.baidu.com‘,
    ‘http://www.autohome.com.cn‘,
]
for url in url_list:
    v = pool.submit(task,url)
    v.add_done_callback(done)

pool.shutdown(wait=True)

通过上述代码均可以完成对请求性能的提高,对于多线程和多进行的缺点是在IO阻塞时会造成了线程和进程的浪费

三 异步非阻塞模块

协程(微线程) + 异步IO---> 1个线程发送N个Http请求

import asyncio
@asyncio.coroutine
def task():
    print(‘before...task......‘)
    yield from asyncio.sleep(5) # 发送Http请求,支持TCP获取结果..
    print(‘end...task......‘)

tasks = [task(), task()]

loop = asyncio.get_event_loop()
loop.run_until_complete(asyncio.gather(*tasks))
loop.close()

one

 1 import asyncio
 2 @asyncio.coroutine
 3 def task(host, url=‘/‘):
 4     print(‘start‘,host,url)
 5     reader, writer = yield from asyncio.open_connection(host, 80)
 6
 7     request_header_content = "GET %s HTTP/1.0\r\nHost: %s\r\n\r\n" % (url, host,)
 8     request_header_content = bytes(request_header_content, encoding=‘utf-8‘)
 9
10     writer.write(request_header_content)
11     yield from writer.drain()
12     text = yield from reader.read()
13     print(‘end‘,host, url, text)
14     writer.close()
15
16 tasks = [
17     task(‘www.cnblogs.com‘, ‘/gregoryli/‘),
18     task(‘dig.chouti.com‘, ‘/pic/show?nid=4073644713430508&lid=10273091‘)
19 ]
20
21 loop = asyncio.get_event_loop()
22 results = loop.run_until_complete(asyncio.gather(*tasks))
23 loop.close()

two

import aiohttp
import asyncio
@asyncio.coroutine
def fetch_async(url):
    print(url)
    response = yield from aiohttp.request(‘GET‘, url)
    print(url, response)
    response.close()

tasks = [fetch_async(‘http://www.baidu.com/‘), fetch_async(‘http://www.chouti.com/‘)]

event_loop = asyncio.get_event_loop()
results = event_loop.run_until_complete(asyncio.gather(*tasks))
event_loop.close()

aiohttp

 1 # -*- coding: utf-8 -*-
 2 # 2017/11/17 14:04
 3 import asyncio
 4 import requests
 5
 6 @asyncio.coroutine
 7 def task(func, *args):
 8     print(func,args)
 9     loop = asyncio.get_event_loop()
10     future = loop.run_in_executor(None, func, *args) # requests.get(‘http://www.cnblogs.com/wupeiqi/‘)
11     response = yield from future
12     print(response.url, response.content)
13
14 tasks = [
15     task(requests.get, ‘http://www.cnblogs.com/gregoryli/‘),
16     task(requests.get, ‘http://dig.chouti.com/pic/show?nid=4073644713430508&lid=10273091‘)
17 ]
18
19 loop = asyncio.get_event_loop()
20 results = loop.run_until_complete(asyncio.gather(*tasks))
21 loop.close()

requests

import gevent
import requests
from gevent import monkey

monkey.patch_all()

def task(method, url, req_kwargs):
    print(method, url, req_kwargs)
    response = requests.request(method=method, url=url, **req_kwargs)
    print(response.url, response.content)

# ##### 发送请求 #####
# gevent.joinall([
#     gevent.spawn(task, method=‘get‘, url=‘https://www.python.org/‘, req_kwargs={}),
#     gevent.spawn(task, method=‘get‘, url=‘https://www.yahoo.com/‘, req_kwargs={}),
#     gevent.spawn(task, method=‘get‘, url=‘https://github.com/‘, req_kwargs={}),
# ])

# ##### 发送请求(协程池控制最大协程数量) #####
from gevent.pool import Pool
pool = Pool(5)
gevent.joinall([
    pool.spawn(task, method=‘get‘, url=‘https://www.python.org/‘, req_kwargs={}),
    pool.spawn(task, method=‘get‘, url=‘https://www.yahoo.com/‘, req_kwargs={}),
    pool.spawn(task, method=‘get‘, url=‘https://www.github.com/‘, req_kwargs={}),
])

gevent+requests

 1 import grequests
 2
 3 request_list = [
 4     grequests.get(‘http://httpbin.org/delay/1‘, timeout=0.001),
 5     grequests.get(‘http://fakedomain/‘),
 6     grequests.get(‘http://httpbin.org/status/500‘)
 7 ]
 8
 9 # ##### 执行并获取响应列表 #####
10 response_list = grequests.map(request_list,size=5)
11 print(response_list)

grequests

#!/usr/bin/env python
# -*- coding:utf-8 -*-
from twisted.internet import defer
from twisted.web.client import getPage
from twisted.internet import reactor

def one_done(arg):
    print(arg)

def all_done(arg):
    print(‘done‘)
    reactor.stop()

@defer.inlineCallbacks
def task(url):
    res = getPage(bytes(url, encoding=‘utf8‘)) # 发送Http请求
    res.addCallback(one_done)
    yield res

url_list = [
    ‘http://www.cnblogs.com‘,
    ‘http://www.cnblogs.com‘,
    ‘http://www.cnblogs.com‘,
    ‘http://www.cnblogs.com‘,
]

defer_list = [] # [特殊,特殊,特殊(已经向url发送请求)]
for url in url_list:
    v = task(url)
    defer_list.append(v)

d = defer.DeferredList(defer_list)
d.addBoth(all_done)
reactor.run() # 死循环,事件循环

twisted

 1 #!/usr/bin/env python
 2 # -*- coding:utf-8 -*-
 3 from tornado.httpclient import AsyncHTTPClient
 4 from tornado.httpclient import HTTPRequest
 5 from tornado import ioloop
 6
 7 COUNT = 0
 8 def handle_response(response):
 9     global COUNT
10     COUNT -= 1
11     if response.error:
12         print("Error:", response.error)
13     else:
14         print(response.body)
15         # 方法同twisted
16         # ioloop.IOLoop.current().stop()
17
18     if COUNT == 0:
19         ioloop.IOLoop.current().stop()
20
21 def func():
22     url_list = [
23         ‘http://www.baidu.com‘,
24         ‘http://www.bing.com‘,
25     ]
26     global COUNT
27     COUNT = len(url_list)
28     for url in url_list:
29         print(url)
30         http_client = AsyncHTTPClient()
31         http_client.fetch(HTTPRequest(url), handle_response)
32
33 ioloop.IOLoop.current().add_callback(func)
34 ioloop.IOLoop.current().start() # 死循环

tornado

- asyncio
- 示例1:asyncio.sleep(5)
- 示例2:自己封装Http数据包
- 示例3:asyncio+aiohttp
aiohttp模块:封装Http数据包 pip3 install aiohttp
- 示例4:asyncio+requests
requests模块:封装Http数据包 pip3 install requests
- gevent,greenlet+异步IO
pip3 install greenlet
pip3 install gevent
- 示例1:gevent+requests
- 示例2:gevent(协程池,最多发多少个请求)+requests
- 示例3:gevent+requests => grequests
pip3 install grequests

效率:gevent > Twisted > Tornado > asyncio

四  socket

1. socket客户端、服务端
连接阻塞
setblocking(0): 无数据(连接无响应;数据未返回)就报错

------》http请求本质:阻塞

sk = socket.socket()
# 1.连接
sk.connect((‘www.baidu.com‘,80,)) # IO阻塞
print(‘连接成功了...‘)

# 2. 连接成功发送消息
sk.send(b‘GET / HTTP/1.0\r\nHost:www.baidu.com\r\n\r\n‘)
# sk.send(b‘POST / HTTP/1.0\r\nHost:www.baidu.com\r\n\r\nk1=v1&k2=v2‘)

# 3. 等待着服务端响应
data = sk.recv(8096) # 响应头,响应体,IO阻塞
print(data)

# 关闭连接
sk.close()

2. IO多路复用
客户端:
try:
  socket对象1.connet()
  socket对象2.connet()
  socket对象3.connet()
except Exception as e:
  pass

while True:
r,w,e = select.select([socket对象1,socket对象2,socket对象3,],[socket对象1,socket对象2,socket对象3,],[],0.05)
r = [socket对象1,] # 表示有人给我发送数据
xx = socket对象1.recv()
w = [socket对象1,] # 表示我已经和别人创建连接成功:
socket对象1.send(‘"""GET /index HTTP/1.0\r\nHost: baidu.com\r\n\r\n"""‘)

--------->http请求本质:非阻塞

sk = socket.socket()
sk.setblocking(False)
# 1.连接
try:
    sk.connect((‘www.baidu.com‘,80,)) # IO阻塞
    print(‘连接成功了...‘)
except BlockingIOError as e:
    print(e)
# 2. 连接成功发送消息
sk.send(b‘GET / HTTP/1.0\r\nHost:www.baidu.com\r\n\r\n‘)
# sk.send(b‘POST / HTTP/1.0\r\nHost:www.baidu.com\r\n\r\nk1=v1&k2=v2‘)

# 3. 等待着服务端响应
data = sk.recv(8096) # IO阻塞
print(data)

# 关闭连接
sk.close()

3. 
class Foo:
·def fileno(self):
·obj = socket()
·return obj.fileno()
r,w,e = select.select([socket对象?,对象?,对象?,Foo()],[],[])
# 对象必须有: fileno方法,并返回一个文件描述符

要点:

a. select内部:对象.fileno()

b. Foo()内部封装socket文件描述符

IO多路复用: r,w,e = while 监听多个socket对象;
异步IO: 非阻塞的socket+IO多路复用
- 非阻塞socket
- select[自己对象],w,r

import socket
import select

class HttpRequest:
    def __init__(self,sk,host,callback):
        self.socket = sk
        self.host = host
        self.callback = callback
    def fileno(self):
        return self.socket.fileno()

class HttpResponse:
    def __init__(self,recv_data):
        self.recv_data = recv_data
        self.header_dict = {}
        self.body = None
        self.initialize()

    def initialize(self):
        headers, body = self.recv_data.split(b‘\r\n\r\n‘, 1)
        self.body = body
        header_list = headers.split(b‘\r\n‘)
        for h in header_list:
            h_str = str(h,encoding=‘utf-8‘)
            v = h_str.split(‘:‘,1)
            if len(v) == 2:
                self.header_dict[v[0]] = v[1]

class AsyncRequest:
    def __init__(self):
        self.conn = []
        self.connection = [] # 用于检测是否已经连接成功

    def add_request(self,host,callback):
        try:
            sk = socket.socket()
            sk.setblocking(0)#false
            sk.connect((host,80,))
        except BlockingIOError as e:
            pass
        request = HttpRequest(sk,host,callback)
        self.conn.append(request)
        self.connection.append(request)

    def run(self):

        while True:
            rlist,wlist,elist = select.select(self.conn,self.connection,self.conn,0.05)
            for w in wlist:
                print(w.host,‘连接成功...‘)
                # 只要能循环到,表示socket和服务器端已经连接成功
                tpl = "GET / HTTP/1.0\r\nHost:%s\r\n\r\n"  %(w.host,)
                w.socket.send(bytes(tpl,encoding=‘utf-8‘))
                self.connection.remove(w)
            for r in rlist:
                # r,是HttpRequest
                recv_data = bytes()
                while True:
                    try:
                        chunck = r.socket.recv(8096)
                        recv_data += chunck
                    except Exception as e:
                        break
                print(r.host,"有数据返回...",recv_data)

                response = HttpResponse(recv_data)
                r.callback(response)
                r.socket.close()
                self.conn.remove(r)
            if len(self.conn) == 0:
                break

def f1(response):
    print(‘保存到文件‘,response.header_dict)

def f2(response):
    print(‘保存到数据库‘, response.header_dict)

url_list = [
    {‘host‘:‘www.baidu.com‘,‘callback‘: f1},
    {‘host‘:‘cn.bing.com‘,‘callback‘: f2},
    {‘host‘:‘www.cnblogs.com‘,‘callback‘: f2},
]

req = AsyncRequest()
for item in url_list:
    req.add_request(item[‘host‘],item[‘callback‘])

req.run()

时间: 2024-10-17 01:17:20

爬虫性能分析的相关文章

mysql索引结构原理、性能分析与优化

原文  http://wulijun.github.com/2012/08/21/mysql-index-implementation-and-optimization.html 第一部分:基础知识 索引 官方介绍索引是帮助MySQL高效获取数据的数据结构.笔者理解索引相当于一本书的目录,通过目录就知道要的资料在哪里, 不用一页一页查阅找出需要的资料. 唯一索引(unique index) 强调唯一,就是索引值必须唯一. 创建索引: create unique index 索引名 on 表名(列

MySQL监控、性能分析——工具篇

MySQL越来越被更多企业接受,随着企业发展,MySQL存储数据日益膨胀,MySQL的性能分析.监控预警.容量扩展议题越来越多.“工欲善其 事,必先利其器”,那么我们如何在进行MySQL性能分析.监控预警.容量扩展问题上得到更好的解决方案,就要利用各种工具来对MySQL各种指标进行分 析.本文是读书笔记,下面提及的工具,读者可能都用过,或打算准备是使用.MySQL服务器的发布包没有包含那些能完成许多常见任务的工具,例如监控服务器的工具.比较服务器间数据的工具.我们把这些工具分成以下几类:界面.监

系统监测和性能分析工具

作为一名linux运维工程师来说,对linux系统的日常管理,检测和系统性能的分析是必不可少的.也有一些针对系统监测和性能分析的工具.咱们现在就来了解一下. tcpdump命令: 网络抓包工具,过滤数据包或者定制输出格式: 常用选项: -n :  用IP地址表示主机,用数字表示端口号. -i  : 监听网卡接口, -i  any :   抓取所有网卡接口的数据包. -v  :  输出详细信息. -t :  不打印时间戳 -e :  显示以太网帧头部信息. -x  :  以十六进制数显示数据包的内

向mysql中批量插入数据的性能分析

MYSQL批量插入数据库实现语句性能分析 假定我们的表结构如下 代码如下   CREATE TABLE example (example_id INT NOT NULL,name VARCHAR( 50 ) NOT NULL,value VARCHAR( 50 ) NOT NULL,other_value VARCHAR( 50 ) NOT NULL) 通常情况下单条插入的sql语句我们会这么写: 代码如下   INSERT INTO example(example_id, name, valu

redis常用性能分析命令

一.连接 src/redis-cli -h 10.20.137.141 -p 6379 >auth 123456789 src/redis-cli -h 10.20.137.141 -p 6379 -a 123456789 二.常用性能分析命令 src/redis-cli -h 10.20.137.141 -p 6379 -a 123456789 monitor src/redis-cli -h 10.20.137.141 -p 6379 -a 123456789 info clients|gr

SQL Server-聚焦NOT IN VS NOT EXISTS VS LEFT JOIN...IS NULL性能分析(十八)

前言 本节我们来综合比较NOT IN VS NOT EXISTS VS LEFT JOIN...IS NULL的性能,简短的内容,深入的理解,Always to review the basics. NOT IN.NOT EXISTS.LEFT JOIN...IS NULL性能分析 我们首先创建测试表 USE TSQL2012 GO CREATE SCHEMA [compare] CREATE TABLE [compare].t_left ( id INT NOT NULL PRIMARY KE

Oracle Update 语句语法与性能分析 - 多表关联

Oracle Update 语句语法与性能分析 - 多表关联 为了方便起见,建立了以下简单模型,和构造了部分测试数据: 在某个业务受理子系统BSS中, SQL 代码 --客户资料表 create table customers ( customer_id number(8) not null, -- 客户标示 city_name varchar2(10) not null, -- 所在城市 customer_type char(2) not null, -- 客户类型 ... ) create

三种Linux性能分析工具的比较

无论是在CPU设计.服务器研发还是存储系统开发的过程中,性能总是一个绕不过去的硬指标.很多时候,我们发现系统功能完备,但就是性能不尽如意,这时候就需要找到性能瓶颈.进行优化.首先我们需要结合硬件特点.操作系统和应用程序的特点深入了解系统内部的运行机制.数据流图和关键路径,最好找出核心模块.建立起抽象模型:接着需要利用各种性能分析工具,探测相关模块的热点路径.耗时统计和占比.在这方面,Linux操作系统自带了多种灵活又具有专对性的工具,此外一些厂家也开源了不少优秀的性能分析工具.下面就结合笔者最近

代码性能分析

代码性能优化 优化是对代码进行等价变换,使得变换后的代码运行结果与变换前的代码运行结果相同,但执行速度加快或存储开销减少. 代码性能优化是一门复杂的学问. 根据 80/20 原则,实现程序的重构.优化.扩展以及文档相关的事情通常需要消耗80% 的工作量. 在满足正确性.可靠性.健壮性.可读性等质量因素的前提下,设法提高程序的效率 以提高程序的全局效率为主,提高局部效率为辅 在优化程序效率时,应先找出限制效率的“瓶颈” 先优化数据结构和算法,再优化执行代码 时间效率和空间效率可能是对立的,应当分析