Elasticsearch+Mongo亿级别数据导入及查询实践

数据方案:

  • 在Elasticsearch中通过code及time字段查询对应doc的mongo_id字段获得mongodb中的主键_id
  • 通过获得id再进入mongodb进行查询

 

1,数据情况:

  • 全部为股票及指数的分钟K线数据(股票代码区分度较高)
  • Elasticsearch及mongodb都未分片且未优化参数配置
  • mongodb数据量:

    

  • Elasticsearch数据量:

    

2,将数据从mongo源库导入Elasticsearch

import time
from pymongo import MongoClient
from elasticsearch import Elasticsearch
from elasticsearch.helpers import bulk
es = Elasticsearch()

conn = MongoClient(‘127.0.0.1‘, 27017)
db = conn.kline_db
my_set = db.min_kline
x = 1
tmp = []

#此处有个坑mongo查询时由于数据量比较大时间较长需要设置游标不过期:no_cursor_timeout=True
for i in my_set.find(no_cursor_timeout=True):
    x+=1
    #每次插入100000条
    if x%100000 == 99999:
        #es批量插入
        success, _ = bulk(es, tmp, index=‘test_2‘, raise_on_error=True)
        print(‘Performed %d actions‘ % success)
        tmp = []
    if i[‘market‘] == ‘sz‘:
        market = 0
    else:
        market = 1
    #此处有个秒数时间类型及时区转换
    tmp.append({"_index":‘test_2‘,"_type": ‘kline‘,‘_source‘:{‘code‘:i[‘code‘],‘market‘:market,                ‘time‘:time.strftime("%Y-%m-%d %H:%M:%S", time.localtime(i[‘kline_time‘]/1000 - 8*60*60))                ,‘mongo_id‘:str(i[‘_id‘])}})

#将最后剩余在tmp中的数据插入
if len(tmp)>0:
    success, _ = bulk(es, tmp, index=‘test_2‘, raise_on_error=True)
    print(‘Performed %d actions‘ % success)

3,Elasticsearch+mongo查询时间统计

import time
from pymongo import MongoClient
from elasticsearch import Elasticsearch
from elasticsearch.helpers import scan
from bson.objectid import ObjectId

#es连接
es = Elasticsearch()

#mongo连接
conn = MongoClient(‘127.0.0.1‘, 27017)
db = conn.kline_db  #连接kline_db数据库,没有则自动创建
my_set = db.min_kline

tmp = []

#计算运行时间装饰器
def cal_run_time(func):
    def wrapper(*args,**kwargs):
        start_time = time.time()
        res = func(*args,**kwargs)
        end_time = time.time()
        print(str(func) +‘---run time--- %s‘ % str(end_time-start_time))
        return res
    return wrapper

@cal_run_time
def query_in_mongo(tmp_list):
    k_list = []
    kline_data = my_set.find({‘_id‘:{‘$in‘:tmp_list}})
    for k in kline_data:
        k_list.append(k)
    return k_list

@cal_run_time
def query_in_es():
    #bool多条件查询 must相当于and
    body = {
        "query": {
            "bool": {
                "must": [{
                    "range": {#范围查询
                        "time": {
                            "gte": ‘2017-01-10 00:00:00‘,  # >=
                            "lte": ‘2017-04-12 00:00:00‘  # <=
                        }
                    }
                },
                    {"terms": {# == 或  in:terms 精确查询
                        "code": [‘000002‘,‘000001‘]
                    }
                    }
                ]
            }

        }
    }

    #根据body条件记性查询
    scanResp = scan(es, body, scroll="10m", index="test_2",doc_type="kline", timeout="10m")

    #解析结果字典并放入tmp列表中
    for resp in scanResp:
        tmp.append(ObjectId(resp[‘_source‘][‘mongo_id‘]))

    print(len(tmp))

    #--------------此处有个坑,直接使用search方法查询到的结果集中最多只有10条记录----------------
    # zz = es.search(index="test_2", doc_type="kline", body=body)
    # print(zz[‘hits‘][‘total‘])
    # for resp in zz[‘hits‘][‘hits‘]:
    #     tmp.append(ObjectId(resp[‘_source‘][‘mongo_id‘]))

query_in_es()

query_in_mongo(tmp)

运行结果如下:

第一行:查询的doc个数:28320

第二行:es查询所用时间:0.36s

第三行:mongo使用_id查询所用时间 :0.34s

从结果来看对于3亿多数据的查询Elasticsearch的速度还是相当不错的

※Elasticsearch主要的优势在于可以进行分词模糊查询,所以股票K线并不是完全适应此场景。

※Elasticsearch+Mongo这个架构主要针对:使用mongo存储海量数据,且这张表更新频繁。

原文地址:https://www.cnblogs.com/dxf813/p/8371214.html

时间: 2024-11-06 03:57:30

Elasticsearch+Mongo亿级别数据导入及查询实践的相关文章

es 在数据量很大的情况下(数十亿级别)如何提高查询效率啊?

面试题es 在数据量很大的情况下(数十亿级别)如何提高查询效率啊?面试官心理分析这个问题是肯定要问的,说白了,就是看你有没有实际干过 es,因为啥?其实 es 性能并没有你想象中那么好的.很多时候数据量大了,特别是有几亿条数据的时候,可能你会懵逼的发现,跑个搜索怎么一下 5~10s,坑爹了.第一次搜索的时候,是 5~10s,后面反而就快了,可能就几百毫秒.你就很懵,每个用户第一次访问都会比较慢,比较卡么?所以你要是没玩儿过 es,或者就是自己玩玩儿 demo,被问到这个问题容易懵逼,显示出你对

使用Mongo dump 将数据导入到hive

概述:使用dump 方式将mongo数据导出,上传到hdfs,然后在hive中建立外部表. 1.     使用mongodump 将集合导出 mongodump --host=localhost:27017  --db=mydb --collection=users  --out=/tmp/root/mongodump0712 [[email protected] root]# mongodump --host=localhost:27017  --db=mydb --collection=us

Mongo实战之数据空洞的最佳实践

问题背景: 某天,开发部的同事跑过来反映: mongodb数据文件太大,快把磁盘撑爆了!其中某个db占用最大(运营环境这个db的数据量其实很小) 分析: 开发环境有大量测试的增/删/改操作,而由于MongoDB顺序写的原因,在我们删除部分无用数据后,它的storageSize并不会变小,这就造成了大量的数据空洞. 解决办法 1. 使用MongoDB自带的compact命令: db.collectionName.runCommand("compact") 这种方式是collection级

亿级数据毫秒级查询!ElasticSearch是怎么做到的?

目录: 1. 一道面试题的引入: 2. 性能优化的杀手锏:Filesystem Cache 3. 数据预热 4. 冷热分离 5. ElasticSearch 中的关联查询 6. Document 模型设计 7. 分页性能优化 一道面试题的引入: 如果面试的时候碰到这样一个面试题:ElasticSearch(以下简称ES) 在数据量很大的情况下(数十亿级别)如何提高查询效率? 这个问题说白了,就是看你有没有实际用过 ES,因为啥?其实 ES 性能并没有你想象中那么好的. 很多时候数据量大了,特别是

数据迁移经验总结——亿级别多表异构的数据迁移工作

由于系统改版,最近三个月在做数据迁移工作,由于业务的特殊,基本将数据迁移所能踩的坑都踩了一遍,决定好好做个总结. 迁移类型--新老系统表结构变化较大的历史数据 一.核心问题 1.新老表结构变化极大.新表是以deliver为核心,另外还涉及仓储系统的一张表,订单系统的4张表,并按照新的逻辑映射关系进行迁移. 2.增量数据迁移.在全量数据迁移时必然会有新的数据,这些数据应该实时进行迁移 3.亿级别数据性能.效率的考虑.由于订单业务非常重要,数据迁移带来的qps对数据库的压力非常大,需要不断测试迭代找

009-elasticsearch【三】示例数据导入、URI查询方式简介、Query DSL简介、查询简述【_source、match、must、should等】、过滤器、聚合

一.简单数据 客户银行账户信息,json { "account_number": 0, "balance": 16623, "firstname": "Bradshaw", "lastname": "Mckenzie", "age": 29, "gender": "F", "address": "2

如何在万亿级别规模的数据量上使用Spark

一.前言 Spark作为大数据计算引擎,凭借其快速.稳定.简易等特点,快速的占领了大数据计算的领域.本文主要为作者在搭建使用计算平台的过程中,对于Spark的理解,希望能给读者一些学习的思路.文章内容为介绍Spark在DataMagic平台扮演的角色.如何快速掌握Spark以及DataMagic平台是如何使用好Spark的. 二.Spark在DataMagic平台中的角色 图 2-1 整套架构的主要功能为日志接入.查询(实时和离线).计算.离线计算平台主要负责计算这一部分,系统的存储用的是COS

【DataMagic】如何在万亿级别规模的数据量上使用Spark

欢迎大家前往腾讯云+社区,获取更多腾讯海量技术实践干货哦~ 本文由鹅厂新鲜事儿发表于云+社区专栏 作者:张国鹏 | 腾讯 运营开发工程师 一.前言 Spark作为大数据计算引擎,凭借其快速.稳定.简易等特点,快速的占领了大数据计算的领域.本文主要为作者在搭建使用计算平台的过程中,对于Spark的理解,希望能给读者一些学习的思路.文章内容为介绍Spark在DataMagic平台扮演的角色.如何快速掌握Spark以及DataMagic平台是如何使用好Spark的. 二.Spark在DataMagic

1.3万亿条数据查询如何做到毫秒级响应?

关注微信公众号"程序员黄小斜",选择"置顶或者星标" 一起成为更好的自己! ![](https://img2018.cnblogs.com/blog/1813797/201912/1813797-20191230133159470-930879899.jpg) 作者:孙晓光 出处:http://itindex.net/ 知乎,在古典中文中意为"你知道吗?",它是中国的 Quora,一个问答网站,其中各种问题由用户社区创建,回答,编辑和组织. 作为