MySQL通用批量写入工具(Python)

背景

平台目前的分析任务主要以Hive为主,分析后的结果存储在HDFS,用户通过REST API或者Rsync的方式获取分析结果,这样的方式带来以下几个问题:

(1)任务执行结束时间未知,用户必须自行编写代码不断地通过REST API请求分析结果,直至获取到分析结果为止,其中还需要处理分析结果过大,转而通过Rsync方式获取;

(2)受限于Hive SQL的表达能力,用户的计算逻辑无法完全表述,获取分析结果后需要再计算,然后入库;

(3)基于(1)、(2)的原因,用户编写大量复杂且冗余的代码处理上述逻辑。

为了改善上述情况,平台设计的解决方案如下:

(1)使用Spark替换Hive(MapReduce)分析任务,结合Hive SQL和Spark API两种方式,使用户的计算逻辑可以得到很好的表述;

(2)用户无需使用“Pull”的方式获取结果,转而使用“Post”的方式将分析结果存入自己的MySQL数据库中;

(3)提供基于Python的MySQL通用批量写入工具方便用户使用。

方案

MySQL批量写入工具需要解决以下几个问题:

(1)通用:不受限于具体数据格式的束缚;

用户需要将不同格式的数据写入不同的数据库/表,意味着数据库的主机名、端口、用户名、密码、数据库实例可以通过参数定制,不同的数据格式使用SQL语句表达,如“insert into students (c_number, c_name) values (%s, %s)”。

(2)多线程:使用多线程的方式,提高写入的吞吐量;

典型的“生产者——消费者”问题,用户需要将数据首先写入一个线程安全的共享队列中;“消费者”线程不断的从这个共享队列中获取数据并缓存,待缓存数据达到一定数目时,将这批数据一次性写出;然后继续上述过程。

除此之外,“消费者”线程需要能够“正常”结束。

(3)数据库连接控制:需要避免连接过多或者频繁连接带来的性能开销;

“消费者”线程需要能够重复利用一定数目的数据库连接,数据库连接由专门的连接池提供,工作流程如下:

a. 从连接池中获取数据库连接;

b. 通过a中的连接将数据批量写出并commit;

c. 将数据库连接归还给连接池。

(4)API:简单易用

我们将批量写入工具定义为一个“存储引擎”,考虑到MySQL的吞吐量可能在大数据量的写入下会成为一个瓶颈,后期会考虑扩展其它工具。因此定义一个基类表示抽象的“存储引擎”,并扩展出本文具体讨论的MySQL“存储引擎”:MySQLStorageEngine。

MySQLStorageEngine的初始化过程涉及以下六个方面:

(1)begin、end用于统计一次批量写入的耗时;

(2)接收用户定制的参数;

host:数据库主机名;

user:数据库登录用户名;

passwd:数据库登录密码;

db:数据库实例

port:数据库端口

charset:数据库字符编码

sql:写入数据时使用的SQL语句,如“insert into students (c_number, c_name) values (%s, %s)”

threads:写入线程数目;

bufferSize:写入线程内部的缓存区大小,亦即每一次“batch”的大小;

mincached:数据库连接池内部最小缓存连接数;

maxcached:数据库连接池内部最大缓存连接数;

maxconnections:数据库连接池所允许同时建立的最大连接数:目前与写入线程数目相同。

(3)构建共享队列queue;

(4)saveNum、storeNum用于统计用户写入总数及实际(成功)写入总数,考虑到多线程使用环境,分别构建相应的锁对象saveLock、storeLock;

(5)构建数据库连接池,这里使用的是DBUtils PooledDB;

(6)构建写入线程(多个)并启动。

至此,MySQLStorageEngine实例创建完毕,并且启动内部多个写入线程用于消费队列queue中数据。用户可通过实例方法save写入数据:

可见,save支持两种类型的数据,一种是Tuple,另一种是Tuple数组,它们都被保存至队列queue中,由写入线程负责处理。

用户写入完成之后,可以通过方法close关闭“存储引擎”,

需要注意的是,用户写入的数据实际是保存在队列queue中的,“用户写入完成”并不代表队列queue中的数据已全部被写入线程消费且完成入库,因此必须首先通知写入线程用户数据已全部写入完毕(requestStop),然后等待写入线程运行完毕(join),最后关闭数据库连接池。

写入线程由MySQLSaver实现,它的初始化过程特别简单:

(1)接收“存储引擎”实例engine;

(2)定义实例变量stop,初始值为False,用于表示用户尚有数据写入;True表示用户写入结束。

MySQLSaver的工作流程如下:

(1)初始化缓存区buffer,用于保存从队列queue消费而来的数据;

(2)循环从队列queue获取数据,如果没有获取到数据,则执行(3);如果获取到数据,则执行(4);

(3)如果用户写入结束(stop值为True)而且队列中已经没有剩余数据,将缓冲区buffer中的数据一次性写入(__save),结束线程(break);

(4)将(2)中获取到的数据存入缓存区,如果缓存区的大小达到数目限制,则将缓冲区buffer中的数据一次性写入(__save),继续(2)。

__save的工作流程如下:

(1)从连接池pool中获取数据库连接db并构建实例cursor;

(2)写入buffer中的数据(executemany)并提交(commit);

(3)清空buffer、关闭实例cursor、将数据库连接db“归还”给连接池(close)。

使用示例如下:

时间: 2024-10-14 05:37:27

MySQL通用批量写入工具(Python)的相关文章

android批量打包工具-python实现

所谓批量打包实质是:只需动态修改AndroidManifest.xml文件中的channel_value,添加你需要的渠道名称并重新打包成新的渠道包. 思路 1.导出一个未签名的apk包,其中渠道号配置如上图. 2.使用apkTool解压apk包 3.根据渠道号修改channel_value 4.使用apktool重新打包为未签名的apk包 5.给未签名的apk包签名 使用提示步骤: 一.下载python安装包,python 2.7.9下载地址:https://www.python.org/do

【Python之旅】第六篇(七):开发简易主机批量管理工具

通过前面对Paramiko模块的学习与使用,以及Python中多线程与多进程的了解,依此,就可以开发简易的主机批量管理工具了. 显然批量管理主机时,程序如果能并发执行功能是最好的,因为这样可以最大程度地利用CPU的性能,因此这就需要使用Python多线程或者多进程,基于学习的需要,这里主要使用多进程来进行开发,当然,这会存在一定问题,后面会说. 主要内容如下: 1.主机批量管理工具功能 2.设计框架 3.实现:数据库信息与程序源代码 4.实战演示 5.程序的不足 6.在写程序过程中的经验教训 7

php从memcache读取数据再批量写入mysql的方法

这篇文章主要介绍了php从memcache读取数据再批量写入mysql的方法,可利用memcache缓解服务器读写压力,并实现数据库数据的写入操作,非常具有实用价值,需要的朋友可以参考下. 用 Memcache 可以缓解 php和数据库压力下面代码是解决高负载下数据库写入瓶颈问题,遇到最实用的:写入ip pv uv的时候,用户达到每分钟几万访问量,要记录这些数据,实时写入数据库必定奔溃. 用以下技术就能解决,还有如用户注册,同一时间断内,大量用户注册,可以缓存后一次性写入到数据库,代码如下 pu

原创python多线程批量管理工具batch(不断完善)

#!/usr/bin/env python import threading import time import paramiko import os,sys from ip import ip_list,web_server,ip_msg from optparse import OptionParser from ssh_co.cfg.config import host_msg def opts(): parser = OptionParser(usage="usage %prog op

Python开发程序:简单主机批量管理工具

题目:简单主机批量管理工具 需求: 主机分组 登录后显示主机分组,选择分组后查看主机列表 可批量执行命令.发送文件,结果实时返回 主机用户名密码可以不同 流程图: 说明: ### 作者介绍: * author:lzl ### 博客地址: * http://www.cnblogs.com/lianzhilei/p/5881434.html ### 功能实现 题目:简单主机批量管理工具 需求: 主机分组 登录后显示主机分组,选择分组后查看主机列表 可批量执行命令.发送文件,结果实时返回 主机用户名密

批量管理工具,TriAquae!比较简单

首先,triaquae是基于ssh 的方式来对主机进行批量管理的,安装triaquae这边可以是图形,也可以不是图形! 然后triaquae访问必须要图形界面的支持!traquae是用python语言编写的,实现在大量主机推送文件,执行命令的一个过程! triaquae添加主机什么的也是比较方便的,因为是基于web的图形界面吗!比较简单!triaquae整体来讲操作命令的什么给回执的时间也不算 慢! 但是可能服务器太大量的话就不太合适了! 因为毕竟是基于ssh的! 如果服务器量太大的话,就根据需

批量插入数据, 将DataTable里的数据批量写入数据库的方法

大量数据导入操作, 也就是直接将DataTable里的内容写入到数据库 通用方法: 拼接Insert语句, 好土鳖 1. MS Sql Server:   使用SqlBulkCopy 2. MySql: adapter.update()批量更新 MySqlBulkLoader, 这个是从文件里边到的, 有个实现是先将DATATable编程CSV文件, 在用MySqlBulkLoader导入MySql 参考文章: http://theonetechnologies.com/outsourcing/

简单主机批量管理工具

题目:简单主机批量管理工具 需求: 主机分组 登录后显示主机分组,选择分组后查看主机列表 可批量执行命令.发送文件,结果实时返回 主机用户名密码可以不同 流程图: 说明: ### 作者介绍: * author:lzl ### 博客地址: * http://www.cnblogs.com/lianzhilei/p/5881434.html ### 功能实现 题目:简单主机批量管理工具 需求: 主机分组 登录后显示主机分组,选择分组后查看主机列表 可批量执行命令.发送文件,结果实时返回 主机用户名密

常用MySQL图形化管理工具

MySQL是一个非常流行的小型关系型数据库管理系统.目前MySQL被广泛地应用在Internet上的 中小型网站中.由于其体积小.速度快.总体拥有成本低,尤其是开放源码这一特点,许多中小型网站为了降低网站总体拥有成本而选择了MySQL作为网站数据 库. 1.phpMyAdmin(http://www.phpmyadmin.net/) phpMyAdmin是最常用的MySQL维护工具,是一个用PHP开发的基于Web方式架构在网站主机上的MySQL管理工具,支持中文,管理数据库非常方便.不足之处在于