事情缘由:
昨日下午工信部前来,几个看似很专业搞安全的非要让现场写脚本导出几百万条Redis记录中的IP字段,由于之间确实没想过如何快速导出这么多数据,只能尴尬认怂~但下来仔细想想我们可以做到~办法总比困难多~
具体需求:
1. 快速导出Redis中只包含[0-9a-z]组成的16序列号下的WlanIP字段
实现思路:
1. 必然先想到多线程/多进程/多协程,最终选择gevent协程池的原因的是涉及到Redis读和文件写操作,相对于多进程/多线程更容易控制且时间不会浪费在阻塞上,异步来回切换更适合
2. 为了减轻每次调用Redis接口keys指令,所以先计算出[0-9a-z]组成的2位序列号排列组合,顺便将其作为250多个文件名
3. 为了减轻每次调用Redis接口hgetall指令,所以使用pipeline,超过1000个指令后统一通过各自管道发送来防止反复连接执行耗时
4. 由于Redis接口连接池只是提供了keepalive功能,所以在协程内部直接维护100个"假"连接池,但还有一个最主要的功能是为了解决pipeline混乱的问题
具体代码:
#!/usr/bin/env python # -*- coding: utf-8 -*- """ # # Authors: limanman # OsChina: http://xmdevops.blog.51cto.com/ # Purpose: # """ # 说明: 导入公共模块 import os from gevent import monkey from gevent.pool import Pool from itertools import product from redis.exceptions import ResponseError from redis import StrictRedis, ConnectionPool # 说明: 导入其它模块 host = ‘127.0.0.1‘ port = 5123 monkey.patch_all() def write_serialdata(key): rds_pool = ConnectionPool(host=host, port=port) rds_inst = StrictRedis(connection_pool=rds_pool, retry_on_timeout=True, max_connections=10) rds_pipe = rds_inst.pipeline() prefix = ‘‘.join(key) redis_key = ‘{0}*‘.format(prefix) rds_pipe.keys(redis_key) redis_val = rds_pipe.execute() if not redis_val: return with open(prefix, ‘a+b‘) as fd: for index, serial_key in enumerate(redis_val[0]): if index % 1000 == 0: serial_val = rds_pipe.execute() if not serial_val: continue for item in serial_val: if ‘WanIP‘ not in item: continue ip = item[‘WanIP‘] print ‘record ip => {0} to file {1}‘.format(ip, prefix) fd.write(‘‘.join([ip, os.linesep])) if not serial_key.isalnum(): continue rds_pipe.hgetall(serial_key) serial_val = rds_pipe.execute() if not serial_val: return for item in serial_val: if ‘WanIP‘ not in item: continue ip = item[‘WanIP‘] print ‘record ip => {0} to file {1}‘.format(ip, prefix) fd.write(‘‘.join([ip, os.linesep])) def generat_product(): s_bit = ‘abcdef0123456789‘ return product(s_bit, repeat=2) if __name__ == ‘__main__‘: pool = Pool(100) keys = generat_product() path = [] for key_pairs in keys: fpath = ‘‘.join(key_pairs) path.append(fpath) if not os.path.exists(fpath): file(fpath, ‘w+b‘).close() pool.map(write_serialdata, path)
有图有相:
时间: 2024-10-14 06:57:39