python elasticsearch 批量写入数据

from elasticsearch import Elasticsearch
from elasticsearch import helpers
import pymysql
import time

# 连接ES
es = Elasticsearch(
    [‘127.0.0.1‘],
    port=9200
)

# 连接MySQL
print("Connect to mysql...")
mysql_db = "test"
m_conn = pymysql.connect(‘localhost‘, ‘root‘, ‘数据库密码‘, ‘table表‘)
m_cursor = m_conn.cursor()

try:
    num_id = 0
    while True:
        s = time.time()
        # 查询数据
        sql = "select good_id, title,description from goods LIMIT {}, 100000".format(num_id*100000)
        # 这里假设查询出来的结果为 张三 26 北京
        m_cursor.execute(sql)
        query_results = m_cursor.fetchall()

        if not query_results:
            print("MySQL查询结果为空 num_id=<{}>".format(num_id))
            break
        else:
            actions = []
            for line in query_results:
            # 拼接插入数据结构
                action = {
                    "_index": "tenco2019",
                    "_type": "goods",
                    "_id":line[0],
                    "_source": {
                        "good_title": line[1],
                        "good_description": line[2],
                    }
                }
                # 形成一个长度与查询结果数量相等的列表
                actions.append(action)
            # 批量插入
            a = helpers.bulk(es, actions)
            e = time.time()
            print("{} {}s".format(a, e-s))
        num_id += 1

finally:
    m_cursor.close()
    m_conn.close()
    print("MySQL connection close...")

原文地址:https://www.cnblogs.com/firebirdweb/p/10243892.html

时间: 2024-10-12 06:58:54

python elasticsearch 批量写入数据的相关文章

MSSQL批量写入数据方案

近来有一个项目Feature需要有批量写入数据的场景,正巧整理资料发现自己以前也类似实现的项目,在重构的同时把相关资料做了一个简单的梳理,方便大家参考. 循环写入(简单粗暴,毕业设计就这样干的)(不推荐) Bulk Copy写入(>1000K 记录一次性写入推荐) 表值参数方式写入(mssql 2008新特性)(强烈推荐) 在SQL Server 2008未提供表值参数之前,需要将多行数据传递到存储过程或参数化sql命令我们一般会采用以下几个方法: 使用一系列单参数来表示多个数据列和行中的值.但

使用XML向SQL Server 2005批量写入数据——一次有关XML时间格式的折腾经历

使用XML向SQL Server 2005批量写入数据——一次有关XML时间格式的折腾经历 原文:使用XML向SQL Server 2005批量写入数据——一次有关XML时间格式的折腾经历 常常遇到需要向SQL Server插入批量数据,然后在存储过程中对这些数据进行进一步处理的情况.存储过程并没有数组.列表之类的参数类型,使用XML类型可妥善解决这个问题. 不过,SQL Server2005对标准xml的支持不足,很多地方需要特别处理.举一个例子说明一下. 这个场景是往存储过程里传递一个xml

python脚本批量生成数据

在平时的工作中,经常会遇到造数据,特别是性能测试的时候更是需要大量的数据.如果一条条的插入数据库或者一条条的创建数据,效率未免有点低.如何快速的造大量的测试数据呢?在不熟悉存储过程的情况下,今天给大家介绍一种方法,很简单的也很实用.思路是用python代码写一段小程序,生成一定数量的SQL语句,再把这些SQL语句拷贝黏贴到数据库工具执行SQL即可. 假如有个联系人的学生表student,其表结构为姓名name.学校school.电话telphone.邮箱email. 以Mysq为例子,sql语句

使用bulkload向hbase中批量写入数据

1.数据样式 写入之前,需要整理以下数据的格式,之后将数据保存到hdfs中,本例使用的样式如下(用tab分开): row1 N row2 M row3 B row4 V row5 N row6 M row7 B 2.代码 假设要将以上样式的数据写入到hbase中,列族为cf,列名为colb,可以使用下面的代码(参考) 1 package com.testdata; 2 3 import java.io.IOException; 4 import org.apache.hadoop.conf.Co

Shell脚本:向磁盘中批量写入数据

一.关于本文 工作要做的监控系统需要监控磁盘空间的使用率并报警.在测试这个功能的时候需要模拟两个场景:一是磁盘空间不断增长超过设定的阈值时,需要触发报警机制:二是磁盘空间降落到低于报警阈值的时候,不再进行报警.为了测试这两个场景,我写了下面三个脚本: 1)initializer.sh:创建目录TestDir,并创建一个大文件template 2)duplicator.sh:不断复制文件template,直到磁盘空间使用率超过输入的参数为止 3)cleaner.sh:清除前面两个脚本留下的痕迹,即

elasticsearch批量索引数据示例

示例数据文件document.json(index表示在索引中增加或替换现有文档,create表示如果文档不存在则添加文档,delete表示删除文档): { "index": { "_index": "addr", "_type": "contact", "_id": 1 }}{ "name": "Fyodor Dostoevsky", "

Spark向Elasticsearch批量导入数据,出现重复的问题定位

看了下es-hadoop插件的源码: 发现ES导入数据重试情况的发生,除了在es.batch.write.retry.policy参数默认开启且es-hadoop插件向ES集群发送不bulk写入请求接受到503响应码会重试3次室外. 本身执行http请求时,也会存在重试(hadoop/rest/NetworkClient.java): public Response execute(Request request) { Response response = null; boolean newN

记第一个问题——python文件无法写入数据

import getpass username = input("please input your name:") password = getpass.getpass("please input the password:") fo = open("homework01.txt","a+") print(username) print(password) fo.write(username) fo.write(passwo

关于以DataTable形式批量写入数据的案例

void IDataAccess.CommandDataTable(DataTable dt, string ProcedureName, System.Data.Common.DbParameter[] para) { _sqlConnection = new SqlConnection(strConnet); SqlDataAdapter MyAdapter = new SqlDataAdapter(); MyAdapter.InsertCommand = new SqlCommand();