python异步加协程获取比特币市场信息

目标

  选取几个比特币交易量大的几个交易平台,查看对应的API,获取该市场下货币对的ticker和depth信息。我们从网站上选取4个交易平台:bitfinex、okex、binance、gdax。对应的交易对是BTC/USD,BTC/USDT,BTC/USDT,BTC/USD。

一、ccxt库

  开始想着直接请求市场的API,然后再解析获取下来的数据,但到github上发现一个比较好得python库,里面封装好了获取比特币市场的相关函数,这样一来就省掉分析API的时间了。因此我只要传入市场以及对应的货币对,利用库里面的函数 fetch_ticker 和 fetch_order_book 就可以获取到市场的ticker和depth信息(具体的使用方法可以查看ccxt手册)。接下来以市场okex为例,利用ccxt库获取okex的ticker和depth信息。

# 引入库
import ccxt

# 实例化市场
exchange = ccxt.okex()
# 交易对
symbol = ‘BTC/USDT‘

# 获取ticker信息
ticker = exchange.fetch_ticker(symbol)
# 获取depth信息
depth = exchange.fetch_order_book(symbol)

print(‘ticker:%s, depth:%s‘ % (ticker, depth))

  运行后会得到结果如下图,从此可以看出已经获取到了ticker和depth信息。

 二、获取四个市场的信息(for循环)

   接下来我们获取四个市场的信息,深度里面有asks和bids,数据量稍微有点儿多,这里depth信息我只去前面五个,对于ticker我也只提取里面的info信息(具体代表什么含义就要参考一下对应市场的API啦)。将其简单的封装后,最开始我想的是for循环。想到啥就开始吧:

# 引入库
import ccxt
import time

now = lambda: time.time()
start = now()

def getData(exchange, symbol):
    data = {}  # 用于存储ticker和depth信息
    # 获取ticker信息
    tickerInfo = exchange.fetch_ticker(symbol)
    # 获取depth信息
    depth = {}
    # 获取深度信息
    exchange_depth = exchange.fetch_order_book(symbol)
    # 获取asks,bids 最低5个,最高5个信息
    asks = exchange_depth.get(‘asks‘)[:5]
    bids = exchange_depth.get(‘bids‘)[:5]
    depth[‘asks‘] = asks
    depth[‘bids‘] = bids

    data[‘ticker‘] = tickerInfo
    data[‘depth‘] = depth

    return data

def main():
    # 实例化市场
    exchanges = [ccxt.binance(), ccxt.bitfinex2(), ccxt.okex(), ccxt.gdax()]
    # 交易对
    symbols = [‘BTC/USDT‘, ‘BTC/USD‘, ‘BTC/USDT‘, ‘BTC/USD‘]

    for i in range(len(exchanges)):
        exchange = exchanges[i]
        symbol = symbols[i]
        data = getData(exchange, symbol)
        print(‘exchange: %s data is %s‘ % (exchange.id, data))

if __name__ == ‘__main__‘:
    main()
    print(‘Run Time: %s‘ % (now() - start))

   运行后会发现虽然每个市场的信息都获取到了,执行完差不多花掉5.7秒,因为这是同步的,也就是按顺序执行的,要是要想每隔一定时间同时获取四个市场的信息,很显然这种结果不符合我们的要求。

三、异步加协程(coroutine)

  前面讲的循环虽然可以输出结果,但耗时长而且达不到想要的效果,接下来采用异步加协程(参考知乎上的一篇文章),要用到异步首先得引入asyncio库,这个库是3.4以后才有的,它提供了一种机制,使得你可以用协程(coroutines)、IO复用(multiplexing I/O)在单线程环境中编写并发模型。这里python文档有个小例子。

import asyncio

async def compute(x, y):
    print("Compute %s + %s ..." % (x, y))
    await asyncio.sleep(1.0)
    return x + y

async def print_sum(x, y):
    result = await compute(x, y)
    print("%s + %s = %s" % (x, y, result))

loop = asyncio.get_event_loop()
loop.run_until_complete(print_sum(1, 2))
loop.close()

  

  当事件循环开始运行时,它会在Task中寻找coroutine来执行调度,因为事件循环注册了print_sum(),因此print_sum()被调用,执行result = await compute(x, y)这条语句(等同于result = yield from compute(x, y)),因为compute()自身就是一个coroutine,因此print_sum()这个协程就会暂时被挂起,compute()被加入到事件循环中,程序流执行compute()中的print语句,打印”Compute %s + %s …”,然后执行了await asyncio.sleep(1.0),因为asyncio.sleep()也是一个coroutine,接着compute()就会被挂起,等待计时器读秒,在这1秒的过程中,事件循环会在队列中查询可以被调度的coroutine,而因为此前print_sum()compute()都被挂起了,因此事件循环会停下来等待协程的调度,当计时器读秒结束后,程序流便会返回到compute()中执行return语句,结果会返回到print_sum()中的result中,最后打印result,事件队列中没有可以调度的任务了,此时loop.close()把事件队列关闭,程序结束。

  接下来我们采用异步和协程(ps:ccxt库也有对应的异步),运行后发现时间只用了1.9秒,比之前快了好多倍。  

Run Time: 1.9661316871643066 

相关代码:

# 引入库
import ccxt.async as ccxt
import asyncio
import time

now = lambda: time.time()
start = now()

async def getData(exchange, symbol):
    data = {}  # 用于存储ticker和depth信息
    # 获取ticker信息
    tickerInfo = await exchange.fetch_ticker(symbol)
    # 获取depth信息
    depth = {}
    # 获取深度信息
    exchange_depth = await exchange.fetch_order_book(symbol)
    # 获取asks,bids 最低5个,最高5个信息
    asks = exchange_depth.get(‘asks‘)[:5]
    bids = exchange_depth.get(‘bids‘)[:5]
    depth[‘asks‘] = asks
    depth[‘bids‘] = bids

    data[‘ticker‘] = tickerInfo
    data[‘depth‘] = depth

    return data

def main():
    # 实例化市场
    exchanges = [ccxt.binance(), ccxt.bitfinex2(), ccxt.okex(), ccxt.gdax()]
    # 交易对
    symbols = [‘BTC/USDT‘, ‘BTC/USD‘, ‘BTC/USDT‘, ‘BTC/USD‘]

    tasks = []
    for i in range(len(exchanges)):
        task = getData(exchanges[i], symbols[i])
        tasks.append(asyncio.ensure_future(task))

    loop = asyncio.get_event_loop()
    loop.run_until_complete(asyncio.wait(tasks))

if __name__ == ‘__main__‘:
    main()
    print(‘Run Time: %s‘ % (now() - start))

  

三、定时爬取并用mongodb保存数据

  在前面的基础上,添加一个定时任务,实现每隔一段时间爬取一次数据,并将数据保存到mongodb数据库。只需再前面的代码上稍微改改就可以啦,代码和运行结果如下:

import asyncio
import ccxt.async as ccxt
import time
import pymongo

# 获取ticker和depth信息
async def get_exchange_tickerDepth(exchange, symbol):  # 其中exchange为实例化后的市场
    # print(‘start get_ticker‘)
    while True:
        print(‘%s is run %s‘ % (exchange.id, time.ctime()))

        # 获取ticher信息
        tickerInfo = await exchange.fetch_ticker(symbol)
        ticker = tickerInfo.get(‘info‘)

        if type(ticker) == type({}):
            ticker[‘timestamp‘] = tickerInfo.get(‘timestamp‘)
            ticker[‘high‘] = tickerInfo.get(‘high‘)
            ticker[‘low‘] = tickerInfo.get(‘low‘)
            ticker[‘last‘] = tickerInfo.get(‘last‘)
        else:
            ticker = tickerInfo
        # print(ticker)

        # 获取深度信息
        depth = {}
        exchange_depth = await exchange.fetch_order_book(symbol)
        # 获取asks,bids 最低5个,最高5个信息
        asks = exchange_depth.get(‘asks‘)[:5]
        bids = exchange_depth.get(‘bids‘)[:5]
        depth[‘asks‘] = asks
        depth[‘bids‘] = bids
        # print(‘depth:{}‘.format(depth))
        data = {
            ‘exchange‘: exchange.id,
            ‘countries‘: exchange.countries,
            ‘symbol‘: symbol,
            ‘ticker‘: ticker,
            ‘depth‘: depth
        }

        # 保存数据
        save_exchangeDate(exchange.id, data)
        print(‘********* %s is finished, time %s *********‘ % (exchange.id, time.ctime()))

        # 等待时间
        await asyncio.sleep(2)

# 存库
def save_exchangeDate(exchangeName, data):
    # 链接MongoDB
    connect = pymongo.MongoClient(host=‘localhost‘, port=27017)
    # 创建数据库
    exchangeData = connect[‘exchangeDataAsyncio‘]
    # 创建表
    exchangeInformation = exchangeData[exchangeName]
    # print(table_name)
    # 数据去重后保存
    count = exchangeInformation.count()
    if not count > 0:
        exchangeInformation.insert_one(data)
    else:
        for item in exchangeInformation.find().skip(count - 1):
            lastdata = item
        if lastdata[‘ticker‘][‘timestamp‘] != data[‘ticker‘][‘timestamp‘]:
            exchangeInformation.insert_one(data)

def main():
    exchanges = [ccxt.binance(), ccxt.bitfinex2(), ccxt.okex(),
                  ccxt.gdax()]
    symbols = [‘BTC/USDT‘, ‘BTC/USD‘, ‘BTC/USDT‘, ‘BTC/USD‘]
    tasks = []
    for i in range(len(exchanges)):
        task = get_exchange_tickerDepth(exchanges[i], symbols[i])
        tasks.append(asyncio.ensure_future(task))

    loop = asyncio.get_event_loop()

    try:
        # print(asyncio.Task.all_tasks(loop))
        loop.run_forever()

    except Exception as e:
        print(e)
        loop.stop()
        loop.run_forever()
    finally:
        loop.close()

if __name__ == ‘__main__‘:
    main()

 五、小结

  使用协程可以实现高效的并发任务。Python在3.4中引入了协程的概念,可是这个还是以生成器对象为基础,3.5则确定了协程的语法。这里只简单的使用了asyncio。当然实现协程的不仅仅是asyncio,tornado和gevent都实现了类似的功能。这里我有一个问题,就是运行一段时间后,里面的市场可能有请求超时等情况导致协程停止运行,我要怎样才能获取到错误然后重启对应的协程。如果有大神知道的话请指点指点。

原文地址:https://www.cnblogs.com/xiaxuexiaoab/p/8410682.html

时间: 2024-11-08 20:37:04

python异步加协程获取比特币市场信息的相关文章

Python实现基于协程的异步爬虫

一.课程介绍 1. 课程来源 本课程核心部分来自<500 lines or less>项目,作者是来自 MongoDB 的工程师 A. Jesse Jiryu Davis 与 Python 之父 Guido van Rossum.项目代码使用 MIT 协议,项目文档使用 http://creativecommons.org/licenses/by/3.0/legalcode 协议. 课程内容在原文档基础上做了稍许修改,增加了部分原理介绍,步骤的拆解分析及源代码注释. 2. 内容简介 传统计算机

Python并发之协程

<python并发之协程>一: 单线程下实现并发,即只在一个主线程,并且cpu只有一个的情况下实现并发.(并发的本质:切换+保存状态) cpu正在运行一个任务,会在两种情况下切去执行其他的任务(切换由操作系统强制控制),一种情况是该任务发生了阻塞,第二种情况是该任务计算时间过长. 主线程的三种状态:其中第二种情况并不能提升效率,只是为了让cpu能够雨露均沾,实现看起来大家都被执行的效果,如果多个程序都是纯计算任务,这种切换反而会降低效率.为此我们基于yield验证.yield本身就是一种在单线

python并发编程&amp;协程

0x01 前导 如何基于单线程来实现并发? 即只用一个主线程(可利用的cpu只有一个)情况下实现并发: 并发的本质:切换+保存状态 cpu正在运行一个任务,会在两种情况下切走去执行其他的任务(切换由操作系统强制控制),一种情况是该任务发生了阻塞,另外一种情况是该任务计算的时间过长 ps:在介绍进程理论时,提及进程的三种执行状态,而线程才是执行单位,所以也可以将上图理解为线程的三种状态 1)其中第二种情况并不能提升效率,只是为了让cpu能够雨露均沾,实现看起来所有任务都被“同时”执行的效果,如果多

协程及Python中的协程

阅读目录 1 协程 2 Python中如何实现协程 回到顶部 1 协程 1.1协程的概念 协程,又称微线程,纤程.英文名Coroutine.一句话说明什么是线程:协程是一种用户态的轻量级线程.(其实并没有说明白~) 我觉得单说协程,比较抽象,如果对线程有一定了解的话,应该就比较好理解了. 那么这么来理解协程比较容易: 线程是系统级别的,它们是由操作系统调度:协程是程序级别的,由程序员根据需要自己调度.我们把一个线程中的一个个函数叫做子程序,那么子程序在执行过程中可以中断去执行别的子程序:别的子程

windows下多进程加协程并发模式

好久没更新博客了.正好最近要整理一下最近这段时间做过的项目以及学习python的一些心得.如标题所示,今天就来说说windows下多进程加协程并发模式.其实网上还是蛮多在linux下的多进程加协程并发模式,本身linux对python的支持更好吧.但是由于本人的开发环境是windows的,而且网上关于这方面的资料还是少了一点,不过经过一番折腾,也算是弄出来了.废话不多说,先贴代码吧: # coding=utf-8 # windows下多进程加协程并发模式 # 打入gevent的monkey补丁

python 并发编程 协程 gevent模块

一 gevent模块 gevent应用场景: 单线程下,多个任务,io密集型程序 安装 pip3 install gevent Gevent 是一个第三方库,可以轻松通过gevent实现并发同步或异步编程,在gevent中用到的主要模式是Greenlet, 它是以C扩展模块形式接入Python的轻量级协程. Greenlet全部运行在主程序操作系统进程的内部,但它们被协作式地调度. gevent可以检测io,实现遇到io自动切换另外一个任务 #用法 g1=gevent.spawn(func,1,

Python与Golang协程异同

背景知识 这里先给出一些常用的知识点简要说明,以便理解后面的文章内容. 进程的定义: 进程,是计算机中已运行程序的实体.程序本身只是指令.数据及其组织形式的描述,进程才是程序的真正运行实例. 线程的定义: 操作系统能够进行运算调度的最小单位.它被包含在进程之中,是进程中的实际运作单位. 进程和线程的关系: 一条线程指的是进程中一个单一顺序的控制流,一个进程中可以并发多个线程,每条线程并行执行不同的任务. CPU的最小调度单元是线程不是进程,所以单进程多线程也可以利用多核CPU. 协程的定义: 协

python 并发编程 协程 目录

python 并发编程 协程 协程介绍 python 并发编程 协程 greenlet模块 原文地址:https://www.cnblogs.com/mingerlcm/p/11148935.html

使用coro+anyevent 异步协程获取IP运营商

主要使用coro协程+AnyEvent::HTTP::LWP::UserAgent 异步http请求,查询数据库中IP字段,返回运营商.如需要获取其他类型的字段,修改正则即可, 此方法的好处是,不需要获取本地IP库,提高IP精准度.缺点,需要很好的网络质量.CODE如下: #查询IP的网络提供商 sub search_ip_area { my $self = shift; my ( $dsn, $dbuser, $dbpass, $ips ) = @_; my $ua = AnyEvent::H