以多进程读取oss符合条件的数据为例,综合使用多进程间的通信、获取多进程的数据

import datetime
import sys
import oss2
from itertools import islice
import pandas as pd
import re
import json
from pandas.tseries.offsets import Day
from multiprocessing import Process, JoinableQueue, cpu_count, Manager
import time

def mkbuck(bk):
	auth = oss2.Auth(username, password)
	bucket = oss2.Bucket(auth, address, bk)
	return bucket

#获取前天最后一小时的paths
def getbflastpt(bucket, bfyespattern):
	bfpamax = []
	for bf in islice(oss2.ObjectIterator(bucket, prefix=bfyespattern), sys.maxsize):
		c = bf.key
		if c[-1:] != ‘/‘:
			bfpamax.append(int(c.split(‘/‘)[4]))
	last = pd.Series(bfpamax).unique().max()
	if last < 10:
		bflastpt = bfyespattern + ‘/0‘ + str(last)
	else:
		bflastpt = bfyespattern + ‘/‘ + str(last)
	return bflastpt

#获取当天第一个小时的paths
def getnowfirstpt(bucket, nowpattern):
	bfpamin = []
	for bf in islice(oss2.ObjectIterator(bucket, prefix=nowpattern), sys.maxsize):
		c = bf.key
		if c[-1:] != ‘/‘:
			bfpamin.append(int(c.split(‘/‘)[4]))
	first = pd.Series(bfpamin).unique().min()
	if first < 10:
		nowfirstpt = nowpattern + ‘/0‘ + str(first)
	else:
		nowfirstpt = nowpattern + ‘/‘ + str(first)
	return nowfirstpt

#获取所有的昨日paths,并合并得到完全的paths和数量
def getfullnum(bk, bfyespattern, nowpattern, yespattern):
	lists = []
	bucket = mkbuck(bk)
	bfyespattern = getbflastpt(bucket, bfyespattern)
	nowpattern = getnowfirstpt(bucket, nowpattern)
	timelist = (s for s in (bfyespattern, yespattern, nowpattern))
	for pter in timelist:
		for bf in islice(oss2.ObjectIterator(bucket, prefix=pter), sys.maxsize):
			c = bf.key
			lists.append(c)
	return lists, len(lists)

#以下为进程间通信,即生产者、消费者模型
def getfull(bk, bfyespattern, nowpattern, yespattern, q):
	lists, num = getfullnum(bk, bfyespattern, nowpattern, yespattern)
	for c in lists:
		q.put(c)
	q.join()

def consumer(bk, q, d):
	bucket = mkbuck(bk)
	repattern2 = re.compile(‘{.*"adadji",.*}‘)
	while True:
		js = []
		ress = q.get()
		if ress[-1:] != ‘/‘:
			remote_data = bucket.get_object(ress).read().decode(‘utf-8‘)
			aa = (d for d in repattern2.findall(remote_data))
			for a in aa:
				temdic = json.loads(a)
				if (starttime <= temdic[‘created_at‘]) and (temdic[‘created_at‘] <= endtime):
					js.append(temdic)
		df = pd.DataFrame(js, columns=[‘dd‘,‘cc‘])
		d[ress] = df##d为通过主进程Manager共享变量将数据取出
		# print(ress)
		q.task_done()# 向q.join()发送一次信号,证明一个数据已经被取走了

if __name__ == ‘__main__‘:
	s1 = time.time()
	now_time = datetime.datetime.now()  # 获取当前时间
	bfyes_time = (now_time - 2 * Day()).strftime(‘%Y/%m/%d‘)
	yes_time = (now_time - 1 * Day()).strftime(‘%Y/%m/%d‘)
	yesdate = (now_time - 1 * Day()).strftime(‘%Y-%m-%d‘)
	yesdate1 = (now_time - 1 * Day()).strftime(‘%Y%m%d‘)
	endtime = (now_time - 1 * Day()).strftime(‘%Y-%m-%d 23:59:59‘)
	starttime = (now_time - 1 * Day()).strftime(‘%Y-%m-%d 00:00:00‘)
	nowdate = now_time.strftime(‘%Y/%m/%d‘)

	bk = ‘xxx‘
	bfyespattern = ‘%s/%s‘ % (bk, bfyes_time)
	yespattern = ‘%s/%s‘ % (bk, yes_time)
	nowpattern = ‘%s/%s‘ % (bk, nowdate)

	q = JoinableQueue(cpu_count())
	m = Manager()
	d = m.dict()  ##主进程字典共享
	p1 = Process(target=getfull, args=(‘xx‘, bfyespattern, nowpattern, yespattern, q))
	#####生成consumer多进程
	cc = []
	for c in range(cpu_count() - 1):
		c1 = Process(target=consumer, args=(‘xx‘, q, d))
		cc.append(c1)

	p_l = [p1]
	for c in cc:
		c.daemon = True
		p_l.append(c)

	for p in p_l:
		p.start()
	p1.join()
	d = d.values()
	df1 = pd.concat(d, ignore_index=True)
	df1.sort_values(‘created_at‘, inplace=True)
	print(time.time() - s1)
	print(‘=‘ * 20)
	print(df1)

  说明:需求为获取昨日的数据即可,因oss实时数据存储可能存在提前或延迟情况,因此读取前天的最后一小时,昨日全部,当天最开始一小时数据,读者可根据自身情况进行修改

原文地址:https://www.cnblogs.com/mahailuo/p/9293825.html

时间: 2024-07-29 13:14:39

以多进程读取oss符合条件的数据为例,综合使用多进程间的通信、获取多进程的数据的相关文章

zTree实现获取一级节点数据

1.实现源码 <!DOCTYPE html> <html> <head> <title>zTree实现基本树</title> <meta http-equiv="content-type" content="text/html; charset=UTF-8"> <link rel="stylesheet" type="text/css" href=&

mssql数据库游标批量修改符合条件的记录

//需求:由于项目刚上传,没有票数,为了表现出一定的人气,需要在一开始把各项目的票数赋一个值 , 但每个项目不能一样,否则容易看出问题,呵呵 . DECLARE @Id varchar(50) DECLARE My_Cursor CURSOR --定义游标 FOR (SELECT Id FROM dbo.kinpanAwardProject where session=9) --查出需要的集合放到游标中 OPEN My_Cursor; --打开游标 FETCH NEXT FROM My_Curs

mssql数据库游标批量改动符合条件的记录

//需求:因为项目刚上传,没有票数,为了表现出一定的人气,须要在一開始把各项目的票数赋一个值 , 但每一个项目不能一样,否则easy看出问题,呵呵 . DECLARE @Id varchar(50) DECLARE My_Cursor CURSOR --定义游标 FOR (SELECT Id FROM dbo.kinpanAwardProject where session=9) --查出须要的集合放到游标中 OPEN My_Cursor; --打开游标 FETCH NEXT FROM My_C

《Linux学习并不难》Linux常用操作命令(14):grep命令查找文件中符合条件的字符串

8.14  <Linux学习并不难>Linux常用操作命令(14):grep命令查找文件中符合条件的字符串 使用grep命令可以查找文件内符合条件的字符串.          命令语法: grep [选项] [查找模式] [文件] 命令中各选项的含义如表所示. 选项 选项含义 -E 模式是一个可扩展的正则表达式 -F 模式是一组由断行符分隔的定长字符串 -P 模式是一个Perl正则表达式 -b 在输出的每一行前显示包含匹配字符串的行在文件中的字节偏移量 -c 只显示匹配行的数量 -i 比较时不

PHP preg_replace() 正则替换所有符合条件的字符串

PHP preg_replace() 正则替换,与Javascript 正则替换不同,PHP preg_replace() 默认就是替换所有符号匹配条件的元素 需要我们用程序处理的数据并不总是预先以数据库思维设计的,或者说是无法用数据库的结构去存储的. 比如模版引擎解析模版.垃圾敏感信息过滤等等. 一般这种情况,我们用正则按我们的规则去匹配preg_match.替换preg_replace. 但一般的应用中,无非是些数据库CRUD,正则摆弄的机会很少. 根据前面说的,两种场景:统计分析,用匹配:

VBA在Excel中的应用(一):改变符合条件单元格的背景颜色

在使用excel处理数据的时候,为了能更清晰的标示出满足特定条件的单元格,对单元格添加背景色是不错的选择.手工处理的方式简单快捷,但是当遇到大批量数据,就会特别的费时费力,而且不讨好(容易出错).通过代码来处理是个不错的选择,excel可以通过VBA编程来处理内部数据,在打开excel页面后,可以通过“alt + F11”组合键来启动VBA编程界面,跟VB的编程界面和语法一样,需要注意的是如何调用excel的内容.VBA通过sheet, range和cells三个层次来调用excel中的制定区域

SQL存储过程将符合条件的大量记录批量删除脚本

-- ============================================= -- Author: James Fu -- Create date: 2015/10/27 -- Description: v0.1 利用批量的方式删除符合条件的数据 -- ============================================= CREATE PROCEDURE [dbo].[sp_LargeDelete] @TableName sysname, @MaxRow

根据某条件给GridView符合条件的值画上删除线

如博文标题,根据某些条件对GridView控件中,对符合条件的值画上删除线效果.实现这些要求,只人捕获到哪些符合要求的数据即可.GridView控件是在TemplateField模版显示数据,Insus.NET并没有使用任一控件,如label或literal等控件来呈现数据.稍后在写OnRowDataBound事件,会教大家怎样去捕获取需要的数据. 下面是对GridView控件时行数据绑定,由于在Insus.NET博客已经无数次有写到怎样创建数据,写存储过程,创建对象(类class),这些在此篇

mysql随机查询符合条件的几条记录

随机查询,方法可以有很多种.比如,查询出所有记录,然后随机从列表中取n条记录.使用程序便可实现.可是程序实现必须查询出所有符合条件的记录(至少是所有符合条件的记录id),然后再随机取出n个id,查询数据库.但是效率毕竟没有数据库中直接查询得快.下面介绍mysql中怎样随机查询n条记录. 1.最简单的办法order by rand(),示例 select * from question q where q.`level`=1 order by rand() limit 1; 此写法,可以将查询出的