用python3的多进程和协程处理MySQL的数据

本文介绍用python3的多进程 + 协程处理MySQL的数据,主要逻辑是拉取MySQL的数据,然后使用flashtext匹配关键字,在存回MySQL,代码如下(async_mysql.py):

import time
import asyncio
import random
from concurrent.futures import ProcessPoolExecutor as Pool

import aiomysql
from flashtext import KeywordProcessor
import click

class AttrDict(dict):
    """可以用"."获取属性,没有该属性时返回None的字典"""
    def __getattr__(self, name):
        try:
            return self[name]
        except KeyError:
            return None

    def __setattr__(self, name, value):
        self[name] = value

class AttrDictCursor(aiomysql.DictCursor):
    """继承aiomysql的字典cursor"""
    dict_type = AttrDict

class MultiProcessMysql(object):
    """用多进程和协程处理MySQL数据"""

    def __init__(self, workers=2, pool=10, start=0, end=2000):
        """第一段的参数需要跟随需求变动"""
        self.host = "192.168.0.34"
        self.port = 3306
        self.user = "root"
        self.password = "root"
        self.db = "mydb"
        self.origin_table = "judgment_main_etl"  # main
        self.dest_table = "laws_finance1"
        self.s_sql = f"select uuid, court_idea, judge_result, reason, plt_claim, dft_rep, crs_exm from {self.origin_table} where %s<=id and id<%s;"
        self.i_sql = f"insert into {self.dest_table} (uuid, title, reason, keyword) values (%s, %s, %s, %s)"

        self.pool = pool    # 协程数和MySQL连接数
        self.aionum = self.pool
        self.step = 2000  # 一次性从MySQL拉取的行数
        self.workers = workers  # 进程数
        self.start = start  # MySQL开始的行数
        self.end = end  # MySQL结束的行数

        self.keyword = [‘非法经营支付业务‘, ‘网络洗钱‘, ‘资金池‘, ‘支付牌照‘, ‘清洁算‘, ‘网络支付‘, ‘网上支付‘, ‘移动支付‘, ‘聚合支付‘, ‘保本保息‘, ‘担保交易‘, ‘供应链金融‘, ‘网贷‘, ‘网络借贷‘, ‘网络投资‘, ‘虚假标的‘, ‘自融‘, ‘资金池‘, ‘关联交易‘, ‘庞氏骗局‘, ‘网络金融理财‘, ‘线上投资理财‘, ‘互联网私募‘, ‘互联网股权‘, ‘非法集资‘, ‘合同欺诈‘, ‘众筹投资‘, ‘股权转让‘, ‘互联网债权转让‘, ‘资本自融‘, ‘投资骗局‘, ‘洗钱‘, ‘非法集资‘, ‘网络传销‘, ‘虚拟币泡沫‘, ‘网络互助金融‘, ‘金融欺诈‘, ‘网上银行‘, ‘信用卡盗刷‘, ‘网络钓鱼‘, ‘信用卡信息窃取‘, ‘网上洗钱‘, ‘洗钱诈骗‘, ‘数字签名更改‘, ‘支付命令窃取‘, ‘金融诈骗‘, ‘引诱投资‘, ‘隐瞒项目信息‘, ‘风险披露‘, ‘夸大收益‘, ‘诈骗保险金‘, ‘非法经营保险业务‘, ‘侵占客户资金‘, ‘征信报告窃取‘, ‘金融诈骗‘, ‘破坏金融管理‘]
        self.kp = KeywordProcessor()    # flashtext是一个文本匹配包,在关键词数量大时速度远大于re
        self.kp.add_keywords_from_list(self.keyword)

    async def createMysqlPool(self, loop):
        """每个进程要有独立的pool,所以不绑定self"""
        pool = await aiomysql.create_pool(
            loop=loop, host=self.host, port=self.port, user=self.user,
            password=self.password, db=self.db, maxsize=self.pool,
            charset=‘utf8‘, cursorclass=AttrDictCursor
        )
        return pool

    def cutRange(self, start, end, times):
        """将数据区间分段"""
        partition = (end - start) // times
        ranges = []
        tmp_end = start
        while tmp_end < end:
            tmp_end += partition
            # 剩下的不足以再分
            if (end - tmp_end) < partition:
                tmp_end = end
            ranges.append((start, tmp_end))
            start = tmp_end
        return ranges

    async def findKeyword(self, db, start, end):
        """从MySQL数据中匹配出关键字"""
        # 随机休息一定时间,防止数据同时到达,同时处理, 应该是一部分等待,一部分处理
        await asyncio.sleep(random.random() * self.workers * 2)
        print("coroutine start")
        async with db.acquire() as conn:
            async with conn.cursor() as cur:
                while start < end:
                    tmp_end = start + self.step
                    if tmp_end > end:
                        tmp_end = end
                    print("aio start: %s, end: %s" % (start, tmp_end))
                    # <=id 和 id<
                    await cur.execute(self.s_sql, (start, tmp_end))
                    datas = await cur.fetchall()
                    uuids = []
                    for data in datas:
                        if data:
                            for key in list(data.keys()):
                                if not data[key]:
                                    data.pop(key)
                            keyword = self.kp.extract_keywords(
                                " ".join(data.values()))
                            if keyword:
                                keyword = ‘ ‘.join(set(keyword))   # 对关键字去重
                                # print(keyword)
                                uuids.append(
                                    (data.uuid, data.title, data.reason, keyword))
                    await cur.executemany(self.i_sql, uuids)
                    await conn.commit()
                    start = tmp_end

    def singleProcess(self, start, end):
        """单个进程的任务"""
        loop = asyncio.get_event_loop()
        # 为每个进程创建一个pool
        db = loop.run_until_complete(asyncio.ensure_future(
            self.createMysqlPool(loop)))

        tasks = []
        ranges = self.cutRange(start, end, self.aionum)
        print(ranges)
        for start, end in ranges:
            tasks.append(self.findKeyword(db, start, end))
        loop.run_until_complete(asyncio.gather(*tasks))

    def run(self):
        """多进程跑"""
        tasks = []
        ranges = self.cutRange(self.start, self.end, self.workers)
        start_time = time.time()
        with Pool(max_workers=self.workers) as executor:
            for start, end in ranges:
                print("processor start: %s, end: %s" % (start, end))
                tasks.append(executor.submit(self.singleProcess, start, end))
            for task in tasks:
                task.result()
        print("total time: %s" % (time.time() - start_time))

@click.command(help="运行")
@click.option("-w", "--workers", default=2, help="进程数")
@click.option(‘-p‘, "--pool", default=10, help="协程数")
@click.option(‘-s‘, ‘--start‘, default=0, help=‘MySQL开始的id‘)
@click.option(‘-e‘, "--end", default=2640000, help="MySQL结束的id")
def main(workers, pool, start, end):
    mp = MultiProcessMysql(workers=workers, pool=pool, start=start, end=end)
    if workers * pool > 100:
        if not click.confirm(‘MySQL连接数超过100(%s),确认吗?‘ % (workers * pool)):
            return
    mp.run()

if __name__ == "__main__":
    main()

运行如下:
$ python3 async_mysql.py -w 2 # 可以指定其他参数,也可使用默认值



个人博客

原文地址:http://blog.51cto.com/13103353/2108236

时间: 2024-08-14 00:55:53

用python3的多进程和协程处理MySQL的数据的相关文章

多进程、协程、事件驱动

多进程.协程.事件驱动及select poll epoll 目录 -多线程使用场景 -多进程 --简单的一个多进程例子 --进程间数据的交互实现方法 ---通过Queues和Pipe可以实现进程间数据的传递,但是不能实现数据的共享 ---Queues ---Pipe ---通过Manager可以不同进程间实现数据的共享 --进程同步,即进程锁 --进程池 -协程 --先用yield实现简单的协程 --Greenlet --Gevent --用协程gevent写一个简单并发爬网页 -事件驱动 --

单线程、多线程、多进程、协程比较,以爬取新浪军事历史为例

演示python单线程.多线程.多进程.协程 1 import requests,json,random 2 import re,threading,time 3 from lxml import etree 4 5 lock=threading.Lock() 6 semaphore=threading.Semaphore(100) ###每次限制只能100线程 7 8 user_agent_list = [ 9 "Mozilla/5.0 (Windows NT 6.1; WOW64) Appl

windows下多进程加协程并发模式

好久没更新博客了.正好最近要整理一下最近这段时间做过的项目以及学习python的一些心得.如标题所示,今天就来说说windows下多进程加协程并发模式.其实网上还是蛮多在linux下的多进程加协程并发模式,本身linux对python的支持更好吧.但是由于本人的开发环境是windows的,而且网上关于这方面的资料还是少了一点,不过经过一番折腾,也算是弄出来了.废话不多说,先贴代码吧: # coding=utf-8 # windows下多进程加协程并发模式 # 打入gevent的monkey补丁

python多线程、多进程、协程的使用

本文主要介绍多线程.多进程.协程的最常见使用,每个的详细说明与介绍有时间会在以后的随笔中体现. 一.多线程 1.python通过两个标准库thread和threading提供对线程的支持.thread提供了低级别的.原始的线程以及一个简单的锁.threading通过对thread模块进行二次封装,提供了更方便的API来操作线程.接下来只介绍threading的常见用法. 2.使用 import threading import time def Traversal_5(interval): fo

一个简单的多进程+多线程+协程的例子

因为一个朋友最近想搞接口压力测试,推荐了jmeter,因为jmeter开源,且有命令行启动模式,方便封装.兴起时,自己也简单实现了一下高并发的脚本. 采用的是多进程+多线程+协程.想法是这样的,多进程是为了有效利用多核,理论上最好一个核对应一个进程比较好:那我为什么还要用多线程呢?不怕GIL全局锁吗?这是因为我用了gevent处理,请求采用requests,但requests是阻塞的方法,所以我把requests操作丢到协程做,就没啥问题了.接下来看看脚本,实现了一个2000并发量的脚本(写的比

[转]向facebook学习,通过协程实现mysql查询的异步化

FROM : 通过协程实现mysql查询的异步化 前言 最近学习了赵海平的演讲,了解到facebook的mysql查询可以进行异步化,从而提高性能.由于facebook实现的比较早,他们不得不对php进行hack才得以实现.现在的php5.5,已经无需hack就可以实现了.对于一个web网站的性能来说,瓶颈多半是来自于数据库.一般数据库查询会在某个请求的整体耗时中占很大比例.如果能提高数据库查询的效率,网站的整体响应时间会有很大的下降.如果能实现mysql查询的异步化,就可以实现多条sql语句同

多进程、协程、事件驱动及select poll epoll

目录 -多线程使用场景 -多进程 --简单的一个多进程例子 --进程间数据的交互实现方法 ---通过Queues和Pipe可以实现进程间数据的传递,但是不能实现数据的共享 ---Queues ---Pipe ---通过Manager可以不同进程间实现数据的共享 --进程同步,即进程锁 --进程池 -协程 --先用yield实现简单的协程 --Greenlet --Gevent --用协程gevent写一个简单并发爬网页 -事件驱动 --IO多路复用 ---用户空间和内核空间 ---文件描述符fd

python 多进程/多线程/协程 同步异步

这篇主要是对概念的理解: 1.异步和多线程区别:二者不是一个同等关系,异步是最终目的,多线程只是我们实现异步的一种手段.异步是当一个调用请求发送给被调用者,而调用者不用等待其结果的返回而可以做其它的事情.实现异步可以采用多线程技术或则交给另外的进程来处理.多线程的好处,比较容易的实现了 异步切换的思想, 因为异步的程序很难写的.多线程本身程还是以同步完成,但是应该说比效率是比不上异步的. 而且多线很容易写, 相对效率也高. 2.异步和同步的区别:  在io等待的时候,同步不会切走,浪费了时间.异

Python爬虫案例演示:Python多线程、多进程、协程

很多时候我们写了一个爬虫,实现了需求后会发现了很多值得改进的地方,其中很重要的一点就是爬取速度.本文 就通过代码讲解如何使用 多进程.多线程.协程 来提升爬取速度.注意:我们不深入介绍理论和原理,一切都在代码中. 二.同步 首先我们写一个简化的爬虫,对各个功能细分,有意识进行函数式编程.下面代码的目的是访问300次百度页面并返回状态码,其中 parse_1 函数可以设定循环次数,每次循环将当前循环数(从0开始)和url传入 parse_2 函数. import requests def pars