python时间序列数据的对齐和数据库的分批查询

欲直接下载代码文件,关注我们的公众号哦!查看历史消息即可!

0. 前言

在机器学习里,我们对时间序列数据做预处理的时候,经常会碰到一个问题:有多个时间序列存在多个表里,每个表的的时间轴不完全相同,要如何把这些表在时间轴上进行对齐,从而合并成一个表呢?尤其是当这些表都存在数据库里,而且超级超级大的时候,怎样才能更高效地处理呢?

上一篇文章中,已经介绍过了如何在Python中创建数据库连接以及对数据库进行增删改查、分组聚合以及批量读取和处理等操作。

今天就以上面的问题为导向,手把手教你如何用Python一步步实现相应的功能。讲解的内容主要有:

  1. 如何实现两个有序序列的合并;
  2. 延伸到两个时间序列数据的对齐;
  3. 从数据库中自动循环分批读取数据。

需要掌握的主要编程技巧包括:

  • 用函数实现特定功能
  • 用类对功能进行封装
  • 实现基本的迭代器

使用的工具及版本:Python3.7,MySQL8.0, Jupyter Notebook

1. 有序序列的合并

本节主要介绍如何实现将2个有序(默认从小到大排序)序列合并成一个序列,同时介绍Python中基本的循环结构。

其实在Python中固然有相应的方法可以很容易地做到(例如集合的set.union()方法),这里之所以要自己实现,主要是要理解这种思想,为后文的功能实现做铺垫。

1.1 Python知识点之条件测试

if 语句的语法结构为:

if boolean_expression1: #如果满足条件1,则执行suite1代码块
    suite1
elif boolean_expression2: #如果满足条件2,则执行suite2代码块
    suite2
else: #否则执行else_suite代码块
    else_suite

其中elifelse为可选。

1.2 Python知识点之循环控制

1.2.1 while循环

(1) 循环机制及应用场景

  • 用于编写通用迭代结构
  • 顶端测试为真时执行循环体,并会重复多次测试直到为假后结束循环

(2) 语法格式

while boolean_expression: #如果测试为真,则执行while_suite代码块(循环执行)
    while_suite
else: #直到测试为假,则执行一遍else_suite代码块之后结束循环
    else_suit

其中else为可选。

1.2.2 for 循环

(1)循环机制及应用场景

  • 通用的序列迭代器,用于遍历任何有序的序列对象内的元素
  • 可用于字符串、元组、列表和其它的内置可迭代对象,以及通过类所创建的新对象

(2)语法格式

for expression in iterable:
    for_suite
else:
    else_suite

其中else为可选。

??tips1:?for循环比while循环执行速度快的多,能用for的尽量使用for

1.3 Python知识点之函数

函数是python为了代码最大程度地重复利用最小化冗余而提供的基本程序结构。

它能够将整块代码巧妙地隔离成易管理的一小块,把重复代码放在函数中,而不是进行大块的复制,这是一个程序员应该具备的基本技能

1.3.1 创建函数

使用def语句定义函数,并且函数都会有一个返回值,默认为None,也可以用return语句明确指定返回值。

语法格式:

def funtionName(parameters): #定义函数名,设置函数的参数
    suite #函数体
    return something

1.3.2 调用函数

在Python中,函数是一个可调用对象,它有一个内置的方法,叫call

我们在写程序的时候,会碰到一类错误:"xxx" object is not callable,这就表示这个对象是不可调用的。

调用函数的方法也很简单,在函数名后面加小括号(),有参数的时候在括号中传入参数即可:funtionName(par1,..)

??tips2:python中定义函数名的时候,通常第一个单词均小写,第二个单词开始通常首字母大写,例如,printName,calculateSum

??tips3:写函数的时候,尽量写得简单,功能尽可能单一,不要写得又长又复杂

1.4 手动实现有序序列的合并

注:在 Python 中,list(列表)是最常用、最核心的数据结构之一,它是一种序列类型,可以接收各种类型的元素,也可以同时接收不同类型的元素。此外,list 还是一个可迭代对象。本文的演示多采用 list 结构组织数据。

- 解题思路

假设有两个序列:a = [1,3,7,9,11], b = [3,4,7,8],怎么合并成一个序列?

思路:用第3个序列 c?记录结果,同时对 a、b 进行遍历,按一定的顺序依次将 a、b 中的元素添加到 c 中;遍历的方法是用指针进行索引。

  1. 初始状态:c=,idx_a=0,idx_b=0;
  2. a[0]=1, b[0]=3, a[0]<b[0]
    → 将 a[0] 添加到 c ,idx_a=idx_a+1
    → 此时 c=[1], idx_a=1, idx_b=0
  3. a[1]=3, b[0]=3, a[0]=b[0]
    → 将 a[1]或b[0] 添加到 c ,idx_a=idx_a+1, idx_b=idx_b+1?
    → 此时 c=[1,3], idx_a=2, idx_b=1;
  4. a[2]=7, b[1]=4, a[0]>b[1]
    → 将b[1] 添加到 c , idx_b=idx_b+1
    → 此时 c=[1,3,4], idx_a=2, idx_b=2;
  5. a[2]=7, b[2]=7, a[2]=b[2]
    → 将a[2]或b[2] 添加到 c , idx_a=idx_a+1, idx_b=idx_b+1?
    → 此时 c=[1,3,4,7], idx_a=3, idx_b=3;
  6. a[3]=9, b[3]=8, a[3]>b[3]
    → 将b[3] 添加到 c , idx_b=idx_b+1?
    → 此时 c=[1,3,4,7,8], idx_a=3, idx_b=4;
  7. idx_b=4超出了b的索引范围,及idx_b=len(b),但此时idx_a<len(a),所以将 a[idx_a:] 直接添加到 c?
    → 此时c=[1,3,4,7,8,9,11],结束,输出结果c 。

- 实现代码

def orderedListUnion(a, b):
    '''
    合并两个按从小到大排好序的序列a,b
    '''

    # 设置循环初始值
    idx_a = 0
    idx_b = 0
    c = []

    # 声明变量len_a,len_b,指向序列a,b的长度,用来控制循环条件
    len_a = len(a)
    len_b = len(b)
    while (idx_a < len_a) and (idx_b < len_b):

        #若两个元素相等,则将该元素添加到c,且两个idx同时右移:
        if a[idx_a] == b[idx_b]:
            c.append(a[idx_a])
            idx_a += 1
            idx_b += 1

        #若不相等,取较小的元素,且较小元素的idx右移
        elif a[idx_a] < b[idx_b]:
            c.append(a[idx_a])
            idx_a += 1
        else:
            c.append(b[idx_b])
            idx_b += 1

    # 当一个序列遍历结束后,跳出循环,将未遍历完的序列的剩余元素添加到c
    if idx_a == len_a:
        c = c + b[idx_b:]
    if idx_b == len_b:
        c = c + a[idx_a:]

    return c

# 测试
a = [1,3,7,9,11]
b = [3,4,7,8]
print(orderedListUnion(a,b))

输出结果:

2、时间序列的对齐

2.1 问题场景

前面的练习仅仅作为热身,现在回到文章开头的问题,假设一个更具体场景:

在医院的ICU里,需要持续观察病人的各项生命指标。这些指标的采集频率往往是不同的(例如有些指标隔几秒采集一个,有些几个小时采集一个,有些一天采集一个),而且有些是定期的,有些是不定期的,或者由于某些原因某些指标在某段时间上是缺失的,所以不同生命指标的时间序列数据在时间轴上的表现往往是不对齐的。

所以现在的问题是:

如何将存储在不同数据表里,且时间轴不同的两个时间序列进行合并,对齐到同一个时间轴上?

举例说明:

假设现在有2个数据表,分别记录了某个病人某一天当中某些时刻的一些生命体征指标:

表1:

表2:

可以看到,两张表的时间点有些是相同的,更多时候是不同的,现在我们想把这两条时间线并到一条时间轴上。

2.2 问题分析

这里我们将一张表的信息用一个 list 的形式来表示:

每一行记录为这个列表的一个元素,每行记录用一个元组tuple?(python中另一个常用的数据结构,与list的区别在于list是可变的,而tuple是不可变的)来表示。例如表1的第一行,即列表的第0个元素表示为(‘01:30‘, 128, 19)

每个元组的第0个元素是这条记录发生的时间点,也就是我们用来索引的指针。

所有的时间点连起来就形成了一条时间轴,也就是表的第一列Time。每个时间点上的多项生命指标可以理解为这个指针所带的属性。

ts1 = [('01:30',128,19),
       ('05:00',124,20),
       ('13:00',131,18),
       ('20:00',138,24),
       ('21:30',122,22)]

ts2 = [('01:30',7,129,60),
       ('09:30',6.5,112,63),
       ('12:00',8,135,74),
       ('13:00',8.5,110,60),
       ('20:32',7,78,49),
       ('22:00',7.5,96,55),
       ('23:30',6,124,59)]

这里与1中?有序序列合并?的区别在于:

  • 前面的 list 的元素是个具体的数值,而这里的 list 的元素是个 tuple;
  • 前面只需要比较当前指针所指的元素(数值)本身的大小,这里需要比较的是当前指针所指的元素(tuple)的第0个元素的大小;

不过这只是形式上的区别,更重要、更核心的区别在于:

  • 前面的合并就是简单地把元素拼在一起,而这里,进行对齐的时候,对于缺失的那一方,需要采取一定的措施。

例如,在01:30时刻,两个表均有记录,则合并的记录为(‘01:30‘,128,19,7,129,60);但是在05:00时刻,只有表1有记录,表2没有,那么合并后的记录应该是多少呢?

所以我们需要采取一些策略,例如:

  • 直接用 None 表示,表示没有:(‘05:00‘,124,20,None,None,None)
  • 用前一个时刻的记录表示:(‘05:00‘,124,20,7,129,60)
  • 在前一条记录和后一条记录之间进行插值

具体应该根据实际的应用场景来选择不同的处理方式。

2.3 代码实现

本文先以第二个策略为例:用前一个时刻的记录表示。

所以这里的代码要比前面的,多设置一个变量pre,用来存储上一条记录。当指针移动到某个表没有记录的时刻时,就用pre来补上,并且pre也是跟着指针往前推移的。

由于第一条记录没有上一条,所以把初始的pre设置为None?*(**这也是个小技巧,若没有此项设置,则需要增加条件判断)*

def tsAlign(x, y):
    '''
    x,y: lsit of tuple,
            每个tuple代表一条记录,tule的第0个元素为这条记录的id(eg,时间),
            list里的tuple按照tuple的id从小到大进行排序

    把x和y在id上进行对齐:
        若x,y同时存在某个id,则将这两个tuple进行合并;
        若x存在某个id而y不存在,则合并 x当前id对应的tuple 和 y小于当前id的最大id对应的tuple。

    return
        z: 对齐了x和y之后的list of tuple
    '''
    # 设置循环初始值
    i = 0 #index of x
    j = 0 #index of y
    z=[]  #store result of merging x and y
    pre_x = (None,)*len(x)
    pre_y = (None,)*len(y)

    while (i<len(x)) and (j<len(y)):
        #若x当前的id小于y当前的id,则合并x当前的tuple和y的前一个tuple
        if x[i][0] < y[j][0]:
            z.append(x[i] + pre_y[1:])
            pre_x = x[i]
            i += 1
        #若x当前的id大于y当前的id,则合并x的前一个tuple和y当前的tuple
        elif x[i][0] > y[j][0]:
            z.append((y[j][0],) + pre_x[1:] + y[j][1:]) #注意:在定义tuple的时候,若只有一个元素,需要在元素后加个逗号
            pre_y = y[j]
            j += 1
        #若x当前的id等于y当前的id,则合并x当前的tuple和y当前的tuple
        else:
            z.append(x[i] + y[j][1:])
            pre_x = x[i]
            pre_y = y[j]
            i += 1
            j +=1

    while i < len(x):
        z.append(x[i] + pre_y[1:])
        i += 1

    while j < len(y):
            z.append((y[j][0],) + pre_x[1:] + y[j][1:])
            j += 1    

    return z

print(tsAlign(ts1, ts2))

输出结果:

另外:用字典dict结构存储的时候也很方便,dict 的 key 为时间,value 为各项生命指标组成的 list。需要注意的是,dict 是无序的,所以在进行遍历的时候,需要将所有的key提取出来,先进行排列。(读者可自行尝试)

不过python里其实还有个叫OrderDict的数据结构,就是个有序的字典,这里也不做介绍。

3. 大型数据表的分批读取

在前面的示例中,数据表的行和列都很少。如果当数据表很大的时候,直接把整张表读进来,将会消耗巨大的内存,程序可能根本跑不起来。

一个很自然的想法是分批读取并进行处理(前一篇文章中有相关的示例)。

也就是,可以先把“读取”+“处理”操作的功能封装起来,再在外面套个循环,不断地重复对应模块的操作。

到这里就需要跟大家讲讲Python中另一个非常重要的概念——

3.1 Python知识点之类与对象

我们常说有两种程序设计的方式:面向过程和面向对象。在Python中,这两种编程方式都可实现。而面向对象封装、继承、多态性的三大特性,可以使系统更加灵活、更加易于维护 。

3.1.1 对象的基本认识

首先需要理解?对象?的概念。

对象一般都由属性+方法组成。你可以这么理解:属性表示是什么(变量或数据),方法表示能干什么(函数或功能)。

面向对象的一个特点就在于,把操作同一组数据的各种功能集成在一起;对象的属性就表示我要操作的这组数据,对象的方法就是我要怎么操作这些数据。

而在有对象之前,必须要有类。

,就是具有同类属性的对象,是个抽象的概念。而对象是由类实例化而来的。

同一个类实例化出来的不同对象,具有相同的方法和相同的属性,但属性的值不一样。

比如,猫是一个类,是一个抽象的概念;而中华田园猫是实例化出来的具体对象,英国短毛猫是实例化出来的另一个对象,这两个对象都有自己的属性(体型,毛长,毛色等),和相同的方法(会吃,会跑,会喵喵喵)

在我们的问题中,也定义了这样一个类,提供给它一个数据库连接(属性),它就可以对这个数据库的表进行增删查改等各种操作(功能)。提供给它另一个数据库连接,又可以对另一个数据库进行操作。每提供一个数据库连接,就相当于实例化出一个对象。当数据存在多个数据库中时,我们就可以实例化出多个对象,同时进行操作。

3.1.2 创建类

Python使用class关键字创建类,一般形式为:

class ClassName(bases): # bases表示这个类是从哪个类继承而来的,即父类,为可选
    'class documentation string' # 文档字符串,为可选
    data = value # 定义类变量
    def method(self, ...): # 定义类方法
        self.member = value

主要包括两个部分:定义类变量 和 定义类方法

注意,在这里定义类方法的方式比较独特。

虽说其实类方法就是函数,但是这个函数的首个参数必须为self

因为类本身不能对方法进行调用,必须要实例化成对象了,对象才能调用方法。

所以这里就意味着,这个方法的目的是对实例化对象进行操作,也就是说,self?的属性只能被实例化对象自己使用,是私有的,我们称之为“实例变量”。

相比之下,在方法外面定义的属性,则是可以被所有实例化对象共同使用,是公共的,我们称之为“类变量”

在class语句内,任何赋值语句都会创建类属性。

定义实例变量则需要采用一种特殊的方式,称为类的构造器初始化方法:__init__(),后面会举例说明

??tips4:python中定义类名的时候,通常从第一个单词开始,每个单词开始首字母大写,例如,Animal,TableReader

3.1.3 对象实例化

创建实例化对象在其他编程语言中一般使用关键字new,但是python里面没有这个关键字,而是用类似函数调用的方式:ClassName(args...)

这里传进去的参数会被__init__方法接收,成为实例变量,也就是这个实例化对象私有的属性。

3.1.4 代码示例:类的创建和使用

'''代码示例:类的创建和使用'''
# 定义类
class MyClass1():
    say = 'Hello' #定义类变量

    def __init__(self, name): # 类的构造函数
        self.name = name #定义实例变量

    def show(self): #定义类方法
        print(self.say, self.name)

#把类实例化成对象
obj1 = MyClass1('Tom') #传入实例变量参数
print('obj1的变量say为:', obj1.say) #self.say='Hello'
print('obj1的变量name为:', obj1.name) #此时self.name='Tom'
print('obj1执行show()方法:')
obj1.show() #调用show()方法

print('--------------')

obj2 = MyClass1('Lisa') #传入实例变量参数
print('obj2的变量say为:', obj2.say) #self.say='Hello'
print('obj2的变量name为:', obj2.name) #此时self.name='Lisa'
print('obj2执行show()方法:')
obj2.show() #调用show()方法

输出结果:

在这里,我们定义了一个类,叫?MyClass1?,并且由这个类实例化出来两个对象,叫?obj1?和?obj2?。

从输出结果我们可以看到,obj1?和?obj2?都有两个变量:

  • 变量say:是相同的。它是定义在类方法外面的变量,是所有对象公共的,属于类变量;
  • 变量name:是不同的。它是定义在__init__方法内的,是每个对象私有的,属于实例变量。

从输出结果还可以看到,obj1?和?obj2?都有相同的方法?show()``。

3.2 Python知识点之迭代器

再次现在回到我们前面的需求:在数据库中读取并合并两个超级大的数据表并进行一定的处理。

分解一下任务流程:

  1. 从数据库中读取一批数据
  2. 对该批数据进行处理
  • 2.1 对当前行进行处理
  • 2.2 判断是否存在下一行:
  • 存在:跳到下一行,回到2.1
  • 不存在:回到1

发现了吗,这里存在两个循环的过程:1是通过循环遍历整个数据库,2是通过循环遍历每个批次中的每一行。

这种遍历我们称为迭代(Iteration)——可以说这是Python最强大的功能之一了。

3.2.1 迭代器的基本认识

迭代器是一个可以记住当前遍历位置的对象。(python里面一切皆对象)

迭代器对象从集合的第一个元素开始访问,直到所有的元素被访问完结束。

有个专门的?iter()?函数,传入一个可迭代对象,即可创建一个迭代器。举个例子:

'''示例:迭代器测创建及调用'''
l = [1,2,3,4] #列表是可迭代对象
it = iter(l) #创建迭代器对象

print('通过for循环调用:')
for i in it: print(i)

it = iter(l)
print('通过next()方法调用:')
print(next(it)) #pythonz2中是it.next()
print(next(it))
print(next(it))
print(next(it))
print(next(it)) #遍历结束,触发StopIteration异常

输出结果:

在这里,我们创建了一个迭代器,叫it,迭代器的调用方式一般有两种:

  1. 通过for…in结构进行调用,对迭代器里的元素逐个进行读取
  2. 所有的迭代器都有个next()方法,就是用来逐个访问迭代器内的元素的,调用一次就读一个出来,直到结束。

从上面的结果我们可以看到,当迭代器内的元素全部遍历完之后,继续调用next()方法会触发 StopIteration 异常。

所以这里需要特别注意:迭代器只能往前不会后退。如果遍历完想再遍历一遍,就需要重新再创建一个迭代器。

3.2.2 在类中实现迭代器

如果要把一个类作为一个迭代器使用的话,需要在类中实现两个方法?__iter__()?与?__next__()

  • __iter__():返回一个特殊的迭代器对象,这个迭代器对象实现了?__next__()方法并通过 StopIteration 异常标识迭代的完成。
  • __next__():会返回下一个迭代器对象,每一次for循环都调用该方法(必须存在)
'''示例:在类中实现迭代器'''
class MyClass2():
    def __init__(self, start, end):
        self.s = start
        self.e = end

    def __iter__(self):
        '''
        @summary: 生成迭代对象时调用,返回值必须是对象自己,然后for可以循环调用next方法

        '''
        return self

    def __next__(self):
        '''
        @summary: 每一次for循环都调用该方法(必须存在)
        '''
        if self.s < self.e:
            x = self.s
            self.s += 1
            return x
        else:
            raise StopIteration

# 实例化出来的对象是个可迭代对象
it = MyClass2(1,5)
for i in it: print(i)

输出结果:

3.3 代码实现

知识点讲的差不多了,现在就一步步来实现解决问题所需要的功能。

3.3.1 BufferTableReader版本

  1. 首先,我们定义一个叫 BufferTableReader 的类(pass 为占位符,表示什么也不做,这里的作用是为了演示代码的完整性):
class BufferTableReader():
    pass

2.?在类中应该包含哪些参数呢?
简单起见,先假设数据表不是在数据库中,而是已经存在于当前工作空间中了:

    # 初始化
    def __init__(self, data, batch_size ):
        self.data = data # 数据源
        self.bs = batch_size # 每次读取的批次大小
        self.buf = [] # 用来存储当前批次的数据,初始化为空
        self.idx = 0 # 当前批次数据(self.buf)指针,初始化为0
        self.offset = 0 # 数据源(data)指针,初始化为0

3.?接着,实现从 data 中读取一个batch_size 的数据的方法:

  • 判断什么时候需要从data中读取数据:
  • 当前批次数据已经处理完的时候,即self.idx==len(self.buf)时;
  • 读取数据,需判断从哪里读到哪里:
  • 如果self.offset+batch_size没有超出data的范围,则读取data[self.offset:self.offset+batch_size];
  • 如果self.offset+batch_size已经超出data的范围,即data剩下的数据量已经小于一个batch_size,则直接读取剩下的全部数据,即data[self.offset:];
  • 把读取的batch数据存在self.buf中。
    def readBatch(self):
        if self.idx==len(self.buf):
            slice_size=min(len(self.data)-self.offset,self.bs)#读多少
            self.buff=self.data[self.offset: self.offset+slice_size]

            #更新指针
            self.offset+=slice_size
            self.idx=0   

4.定义一个判断data中是否还有下一行的方法:

  • 该方法首先会调用 readBatch() 方法
  • 如果当前的self.buf中还没读完,则显然self.idx<len(self.buf)为真,此时readBatch()中什么也不做,且该方法返回 True;
  • 如果当前的?self.buf?刚好读完,则self.idx==len(self.buf),此时readBatch()会读取下一批次,更新self.idx=0:
  • 如果data中还有数据,则len(self.buf)>0,self.idx<len(self.buf)为真,返回True;
  • 如果data中没有数据了,则len(self.buf)=0,self.idx<len(self.buf)为假,返回False。
    def hasNext(self):
        self.readBatch()
        return self.idx < len(self.buf)

5.定义一个在当前批次self.buf中读取行的方法:

    def readNext(self):
        self.readBatch()
        if self.idx < len(self.buf):
            line = self.buf[self.idx]
            self.idx += 1
            return line

到这里,已经可以实现一些功能了,你可以尝试一下:

'''类BufferTableReader(未实现迭代功能)'''
class BufferTableReader():
    # 初始化
    def __init__(self, data, batch_size ):
        self.data = data # 数据源
        self.bs = batch_size # 每次读取的批次大小
        self.buf = [] # 用来存储当前批次的数据,初始化为空
        self.idx = 0 # 当前批次数据(self.buf)指针,初始化为0
        self.offset = 0 # 数据源(data)指针,初始化为0

    def readBatch(self):
        if self.idx == len(self.buf):
            slice_size=min(len(self.data)-self.offset,self.bs) #读多少
            self.buf=self.data[self.offset:self.offset+slice_size]

            #更新指针
            self.offset+=slice_size
            self.idx=0    

            # 为了便于观察,每读完一个批次做一个标注
            print('-------')

    def hasNext(self):
        self.readBatch()
        return self.idx < len(self.buf)

    def readNext(self):
        self.readBatch()
        if self.idx < len(self.buf):
            line = self.buf[self.idx]
            self.idx += 1
            return line

#实例化
ts1 = [('01:30',128,19),
       ('05:00',124,20),
       ('13:00',131,18),
       ('20:00',138,24),
       ('21:30',122,22)]

btr = BufferTableReader(ts1, 2)

while btr.hasNext():
    print(btr.readNext())

输出结果:

从结果来看,类BufferTableReader的实例化对象btr已经能够实现从list中分批读取数据了。(在这里,每次读取2条,到最后不足2条的时候,则把剩下的一次读出)

但到目前为止,这还不能算是一个Iter,只能说是个Reader,你会发现用for...in循环对无法对其进行遍历,因为它不是一个可迭代对象。

btr = BufferTableReader(ts1, 2)
for l in btr: print(l)

输出结果:

3.3.2 BufferTableIter版本

  • 版本1.0:实现一般迭代功能

前面说了,可迭代对象需要在类中实现两个方法?__iter__()?与__next__()。

所以接下来我们在BufferTableReader的基础上,定义一个新的类BufferTableIter,增加上述两个方法(由于后面的代码都比较长,为了方便读者阅读,我会在相较于上一个版本新增或修改部分做特别的标注):

'''类BufferTableIter 1.0 - 实现一般迭代功能'''
class BufferTableIter():
    # 初始化
    def __init__(self, data, batch_size ):
        self.data = data # 数据源
        self.bs = batch_size # 每次读取的批次大小
        self.buf = [] # 用来存储当前批次的数据,初始化为空
        self.idx = 0 # 当前批次数据(self.buf)指针,初始化为0
        self.offset = 0 # 数据源(data)指针,初始化为0

    def readBatch(self):
        if self.idx == len(self.buf):
            slice_size=min(len(self.data)-self.offset,self.bs) #读多少
            self.buf=self.data[self.offset:self.offset+slice_size]

            #更新指针
            self.offset += slice_size
            self.idx = 0    

            # 为了便于观察,每读完一个批次做一个标注
            print('-------')

    def hasNext(self):
        self.readBatch()
        return self.idx < len(self.buf)

    def readNext(self):
        self.readBatch()
        if self.idx < len(self.buf):
            line = self.buf[self.idx]
            self.idx += 1
            return line

    ############新增部分#############
    def __iter__(self):
        print('iter called')
        return self

    def __next__(self):
        if self.hasNext():
            return self.readNext()
        else:
            raise StopIteration

    ##################################

#实例化
ts1 = [('01:30',128,19),
       ('05:00',124,20),
       ('13:00',131,18),
       ('20:00',138,24),
       ('21:30',122,22)]

print('通过next()方法调用:')
btr = BufferTableIter(ts1, 2)
while btr.hasNext():
    print(btr.readNext())
    ##或者:
    #print(next(btr))

print('通过for循环调用:')
btr = BufferTableIter(ts1, 2)
for l in btr: print(l)

输出结果:

此时既可以通过next()方法调用,也可以通过 for 循环进行调用。

当然还是需要注意,迭代不能重复,即遍历结束后不能从头再遍历一次(再执行如下代码,结果为空;虽然此时也调用了__iter__函数,但迭代器本身现在已经为空了),需要重新创建一个实例化对象才行。

for?l?in?btr:?print(l)

输出结果:

  • 版本2.0:实现重复迭代功能

但是如果我就是想能够重复遍历,而又不想重新创建实例化对象怎么办呢?

也是可以的,修改一下__iter__()函数的返回值,让它重新生成一个实例化对象。也就是说,每for一次,就会调用__iter__重新创建一个迭代器。

如下,把__iter__()的返回值return self改为return BufferedTableIter(self.data, self.bs)

'''类BufferTableIter 2.0 - 实现重复迭代功能'''
class BufferTableIter():
    # 初始化
    def __init__(self, data, batch_size ):
        self.data = data # 数据源
        self.bs = batch_size # 每次读取的批次大小
        self.buf = [] # 用来存储当前批次的数据,初始化为空
        self.idx = 0 # 当前批次数据(self.buf)指针,初始化为0
        self.offset = 0 # 数据源(data)指针,初始化为0

    def readBatch(self):
        if self.idx == len(self.buf):
            slice_size=min(len(self.data)-self.offset,self.bs) #读多少
            self.buf=self.data[self.offset:self.offset+slice_size]

            #更新指针
            self.offset += slice_size
            self.idx = 0    

    def hasNext(self):
        self.readBatch()
        return self.idx < len(self.buf)

    def readNext(self):
        self.readBatch()
        if self.idx < len(self.buf):
            line = self.buf[self.idx]
            self.idx += 1
            return line

    ############修改部分#############
    def __iter__(self):
        print('iter called')
        return BufferTableIter(self.data, self.bs)
    ##################################

    def __next__(self):
        if self.hasNext():
            return self.readNext()
        else:
            raise StopIteration

#实例化
ts1 = [('01:30',128,19),
       ('05:00',124,20),
       ('13:00',131,18),
       ('20:00',138,24),
       ('21:30',122,22)]

btr = BufferTableIter(ts1, 2)
for l in btr: print(l)
for l in btr: print(l)
for l in btr: print(l) #任你for多少次

输出结果:

  • 版本2.1:实现重复迭代功能(拆分成两个类)

或者不修改__iter__()函数的返回值,而是将每for一次就实例化一次的这部分功能抽离出来,定义成另一个类,就命名为DBTable。说白了这个类就是专门用来初始化迭代器的:

class DBTable:
    def __init__(self, data, batch_size):
        self.data = data
        self.bs = batch_size
    def __iter__(self):
        print("__iter__ called")
        return BufferTableIter(self)

原来的类BufferTableIter则修改为:

'''类BufferTableIter 2.1 - 实现重复迭代功能'''
class BufferTableIter():
    # 初始化
    def __init__(self, dbTable):
        ############修改部分#############
        self.data = dbTable.data
        self.bs = dbTable.bs
        ##################################
        self.buf = []
        self.idx = 0
        self.offset = 0 

    def readBatch(self):
        if self.idx == len(self.buf):
            slice_size=min(len(self.data)-self.offset,self.bs)
            self.buf=self.data[self.offset:self.offset+slice_size]

            #更新指针
            self.offset += slice_size
            self.idx = 0    

    def hasNext(self):
        self.readBatch()
        return self.idx < len(self.buf)

    def readNext(self):
        self.readBatch()
        if self.idx < len(self.buf):
            line = self.buf[self.idx]
            self.idx += 1
            return line

    def __iter__(self):
        return self

    def __next__(self):
        if self.hasNext():
            return self.readNext()
        else:
            raise StopIteration
#实例化ts1?=?[('01:30',128,19),???????('05:00',124,20),???????('13:00',131,18),???????('20:00',138,24),???????('21:30',122,22)]dbTable?=?DBTable(ts1,?2)btr?=?BufferTableIter(dbTable)

实例化之后进行调用,见下面代码的运行结果可以发现:DBTable实例化出来的对象可以任意多次重复遍历;而BufferTableIter则不行。

#实例化
ts1 = [('01:30',128,19),
       ('05:00',124,20),
       ('13:00',131,18),
       ('20:00',138,24),
       ('21:30',122,22)]

dbTable = DBTable(ts1, 2)
btr = BufferTableIter(dbTable)```

输出结果:

![](https://upload-images.jianshu.io/upload_images/10386940-8155824bfb9a6aa1?imageMogr2/auto-orient/strip%7CimageView2/2/w/1240)

for l in btr: print(l)
print(‘--------------‘)
for l in btr: print(l)```

输出结果:

  • 版本3.0:实现数据库连接

最后,由于实际中我们的数据是存在数据库中的,所以初始化函数__init__readBatch()函数需要做些修改(python连接数据库请参考上一篇),具体不再赘述,最终的代码如下:

'''类BufferTableIter 3.0 - 实现数据库连接'''
class DBTable:
    def __init__(self, con, sql, batch_size):
        ############修改部分#############
        self.con = con #创建连接
        self.sql = sql #需要执行的sql语句
        ################################
        self.bs = batch_size

    def __iter__(self):
        return BufferTableIter(self)

class BufferTableIter():
    # 初始化
    def __init__(self, dbTable):
        ############修改部分#############
        self.cursor = dbTable.con.cursor(buffered=True) #创建游标
        self.cursor.execute(dbTable.sql) #执行sql语句
        self.readCount = 0
        ################################
        self.bs = dbTable.bs
        self.buf = []
        self.idx = 0 

    def readBatch(self):
        if self.idx == len(self.buf):
            ############修改部分#############
            self.buf = self.cursor.fetchmany(size=self.bs) #从数据库中读取批次数据

            #更新指针
            self.readCount += len(self.buf)
            self.idx = 0
            ################################

    def hasNext(self):
        self.readBatch()
        return self.idx < len(self.buf)

    def readNext(self):
        self.readBatch()
        if self.idx < len(self.buf):
            line = self.buf[self.idx]
            self.idx += 1
            return line
        return None

    def __iter__(self):
        return self

    def __next__(self):
        if self.hasNext():
            return self.readNext()
        raise StopIteration

现在结合上一篇的内容,往数据库中添加一些数据:

'''在mysql数据库中添加测试数据'''

import mysql.connector 

con = mysql.connector.connect(
  host="127.0.0.1",
  user="WXT",
  passwd="12345",
  database="wxt_db",
  use_pure="False"
)

mycursor=con.cursor(buffered=True)

# 创建一个表
mycursor.execute("CREATE TABLE patient (time VARCHAR(255), hr INT, hxpl INT)")

# 往表里插入一些记录
mycursor.execute("INSERT INTO patient (time, HR, HXPL) VALUES ('01:30',128,19)")
mycursor.execute("INSERT INTO patient (time, HR, HXPL) VALUES ('05:00',124,20)")
mycursor.execute("INSERT INTO patient (time, HR, HXPL) VALUES ('13:00',131,18)")
mycursor.execute("INSERT INTO patient (time, HR, HXPL) VALUES ('20:00',138,24)")
mycursor.execute("INSERT INTO patient (time, HR, HXPL) VALUES ('21:30',122,22)")

con.commit()
con.close()      

进行测试:

'''连接mysql数据库进行功能测试:分批读取数据'''

import mysql.connector 

con = mysql.connector.connect(
  host="127.0.0.1",
  user="WXT",
  passwd="12345",
  database="wxt_db",
  use_pure="False"
)

# 检查一个表是否存在
def tableExists(mycursor, name):
    stmt="SHOW TABLES LIKE '"+name+"'"
    mycursor.execute(stmt)
    return mycursor.fetchone()

try:
    mycursor=con.cursor(buffered=True)

    if tableExists(mycursor, 'patient'):
        print('process table:', 'patient')
        print("--------")

        #查询表里的记录
        sql = "SELECT * FROM patient"

        dbTbl = DBTable(con,sql,2)
        btr = BufferTableIter(dbTbl)

        for rec in dbTbl:
            print("read record:", rec)

finally:
    con.close() #关闭数据

输出结果:

再补充一个小知识点,这里使用了try..finally结构,这是一种检验和处理异常的机制。通常情况下,如果程序在某个位置出现异常,整个程序会被直接中断,后面的语句不会再执行。

try..finally中,try 语句块里的代码会被监测,不论这部分有没有发生异常,finally 里的语句都会执行,这样就可以对异常做一些收尾工作,比如这里的关闭数据库连接操作。

因为如果前面一旦发生异常,数据库没能够被关闭,会存在一定的危险性。

??tips5:try-finally结果可以对异常进行检测和处理,如果try语句块中出现了异常,finally后面可以做一些必要的清理工作(如关闭文件或断开服务器连接等)

4. 结语

总结一下,本文实现了有序序列的合并、时间序列数据表的对齐、以及对数据库中的数据表进行分批查询,主要使用的Pyhton编程技巧有循环、函数、类和迭代器

但其实还没有完全解决问题,目前只是把数据从数据库给读出来了,还没有对其进行处理,所以之后还会再写后半部分的内容,计划有:

  1. 把从数据库中读取出来的、来自不同数据表的时间序列进行合并对齐
  • 尝试不同的对齐方式,如插值
  1. 对齐后的时间序列做分组(例如每小时,每天)聚合(例如每组做计数,求平均等)
  2. 用生成器机制(yield)对迭代器的功能进行优化。

(注:本文是由团队内部培训的笔记整理而来,如有问题,欢迎交流指正!)

原文地址:https://www.cnblogs.com/dengfaheng/p/10959130.html

时间: 2024-10-10 02:52:40

python时间序列数据的对齐和数据库的分批查询的相关文章

Python 收集Twitter时间序列数据

CODE: #!/usr/bin/python # -*- coding: utf-8 -*- ''' Created on 2014-7-18 @author: guaguastd @name: collect_time_series.py ''' if __name__ == '__main__': # import json import json # import partial from functools import partial # import trend from tren

MySQL实验准备(二)--Python模拟数据(MySQL数据库)

Python模拟数据(MySQL数据库) 数据模拟 目的:模拟多个表的插入和查询数据的模拟,再通过基准测试脚本测试服务器性能和收集数据,仿真模拟. 备注: 如果需要基础的python环境,可以查看<MySQL实验准备(一)--环境准备>文档 实验脚本 通过对一个简单表的脚本插入和查询模拟,后面能 举一反三,完成多张表的多表插入,多线程的多表插入,多线程查询,和多线程的join查询. 数据库的表结构 mysql> show create table zdemo.student; +----

python爬取数据并保存到数据库中(第一次练手完整代码)

1.首先,下载需要的模块requests, BeautifulSoup, datetime, pymysql(注意,因为我用的python3.7,不支持mysqldb了),具体的下载方法有pip下载,或者使用Anaconda版本python的童鞋可以使用conda下载. 2.创建conndb,py,包含数据库的连接断开,增删改查等操作: #!/usr/bin/env python # -*- coding:utf-8 -*- import pymysql def conn_db(): # 连接数

最简洁的Python时间序列可视化:数据科学分析价格趋势,预测价格,探索价格

时间序列数据在数据科学领域无处不在,在量化金融领域也十分常见,可以用于分析价格趋势,预测价格,探索价格行为等. 学会对时间序列数据进行可视化,能够帮助我们更加直观地探索时间序列数据,寻找其潜在的规律. 本文会利用Python中的matplotlib[1]库,并配合实例进行讲解.matplotlib库是?个?于创建出版质量图表的桌?绘图包(2D绘图库),是Python中最基本的可视化工具. [工具]Python 3 [数据]Tushare [注]示例注重的是方法的讲解,请大家灵活掌握. 01 单个

python 读取SQLServer数据插入到MongoDB数据库中

# -*- coding: utf-8 -*-import pyodbcimport osimport csvimport pymongofrom pymongo import ASCENDING, DESCENDINGfrom pymongo import MongoClientimport binascii '''连接mongoDB数据库'''client = MongoClient('10.20.4.79', 27017)#client = MongoClient('10.20.66.10

Python for Infomatics 第14章 数据库和SQL的应用一(译)

14.1 什么是数据库 数据库一种存储结构数据的文件.绝大多数数据库类似字典——映射键和值的关系.最大的区别是数据库是保存在硬盘或其它永久性的存储上,所以在程序结束后它仍然存在.而保存在内存中的字典容量受限于计算机的内存配置,所以数据库可以比字典存储更多的数据. 类似字典,数据库软件在插入和访问数据时非常迅速,即使是庞大的数据.通过创建数据索引,数据库软件维持它的性能,并允许计算机快速跳至一个特定入口. 目前有很多不同目的的数据库系统,包括:Oracle, MySQL, 微软的SQL Serve

数据定义未来——2016数据库技术大会总结

背景 作为国内数据库与大数据领域最大规模的技术盛宴,2016第七届中国数据库技术大会(DTCC)如约于2016年5月12日-14日再度震撼来袭.大会以"数据定义未来"为主题,云集了国内外顶尖专家,共同探讨MySQL.NoSQL.Oracle.缓存技术.云端数据库.智能数据平台.大数据安全.数据治理.大数据和开源.大数据创业.大数据深度学习等领域的前瞻性热点话题与技术.本届大会共设定2个主会场,25个分会场,并将吸引5000多名IT人士参会,为数据库人群.大数据从业人员.广大互联网人士及

Python学习系列(七)( 数据库编程)

一,MySQL-Python插件 Python里操作MySQL数据库,需要Python下安装访问MySQL数据库接口API包即插件,从而使得Python2.7能访问操作MySQL数据库.MySQL软件可以去官网下载:http://www.mysql.com/ 二,访问MySQL数据库 1,连接数据库mysql 基本格式:connect ([host=]'ip',[user=]'user',[passwd=]'password',[db=]'dbname')     2,数据库的基本操作 1)cr

使用Grafana展示时间序列数据

简介 Grafana是一个独立运行的系统,内置了Web服务器.它可以基于仪表盘的方式来展示.分析时间序列数据. Grafana支持多种数据源,例如:Graphite.OpenTSDB.InfluxDB.Elasticsearch.你可以混合使用多种数据源.它对Graphite有以下增强的支持: 点击修改Metrics路径的每一个片断 快速的添加函数,支持点击函数参数以修改之 修改函数顺序 丰富的模板支持 在UI方面,Grafana具有以下特性: 丰富的.基于客户端的图表组件:Bar图.区域图.线