一般来说,ETL实现增量更新的方式有两种,第一种:记录字段的最大值,如果数据源中存在持续增加的数据列,记录上次处理的数据集中,该列的最大值;第二种是,保存HashValue,快速检查所有数据,发现异动的数据之后,只同步更新被修改的数据。
1,记录字段的最大值,使用DateTime记录源数据上次更新的时间,或使用TimeStamp或RowVersion记录上次更新的数值
一般情况下,DBA在设计表时,都会在表中创建LastUpdatedTime字段,数据类型是DateTime 或 DateTime2(7),用于记录数据更新的时间。使用DateTime记录上次更新的时间,要求对数据源进行Update或Insert操作时,都要更新LastUpdatedTime字段为当前的时间值。
如果创建TimeStamp或RowVersion类型的字段,那么对数据进行Update或Insert修改时,系统会负责维护该字段,保证其值持续增加。
使用这种方式的特点是,对Table中的任意一个column进行Update,都会更新参照值。如果源系统有部分column没有导入ETL,而源系统却频繁更新这些column,那么会导致ETL每次加载的数据,并不是“更新”的数据。
2,保存HashValue,使用CheckSum或Binary_checksum获取HashValue,对更新的数据进行同步操作
如果Table中没有持续增加的Column,那么记录字段最后一次更新的值,将是不可行的。此时,可以使用CheckSum或Binary_checksum等Hash函数快速检查源数据,并将HashValue保存到一个HashTable中(该HashTable有三列:业务键,HashValue,SyncStatus)。如果一个数据行的HashValue 和 HashTable中记录的HashValue 有出入,说明该数据行发生变化。
使用这种方式的特点是:每次都要对数据源进行全部扫描,获取HashValue;比较HashValue,只对异动的数据进行数据同步。由于对源数据进行全部的扫描 和 Hash隐射都会消耗时间,在使用这种方式之前,需要多做测试,监控IO,CPU 和 内存的使用情况。
注意:只对ETL需要的Columns进行Hash 映射,保存HashValue。即使源Table的其他columns发生update,ETL也不会认定该数据行发生变化。
3,使用HashValue,将源数据分批导入ETL进行处理
如果对源数据的处理非常耗时,那么可以将源数据分批导入ETL,增加数据处理的并行度,以减少ETL的执行时间。
HashTable有三列:业务键,HashValue,SyncStatus,用以记录上次更新时源数据的HashValue
Step1,扫描源数据,获取HashValue,对于同一个数据行,如果当前的HashValue跟HashTable中记录的HashValue不同,那么设置SyncStatus=0;ETL要处理的全部数据,其SyncStatus=0。
Step2,将处理流程复制为N个Batch,将HashValue/N=0,1,2,,,N-1的数据分别引入对应的N个Batch中,这样,N个batch会同时处理不同的数据。在每个Batch处理完成之后,都会设置SyncStatus=1;
Optional Step3,在一个Batch中,可以使用一个循环,每次只处理 M条数据,并将处理过的数据的SyncStatus设置为1。例如,在10号Batch中,一次循环只处理200条数据,那么这200条数据的特点是:SyncStatus=0,HashValue/N=9。
使用这种设计的场景是:处理数据的Task非常耗时,由于Task是同步的,一个Task在将数据处理完成之前,下游Task必须等待,因此,可以将数据集分为多个不重复的子集同时处理,各个Task都在执行数据处理,提高数据处理的并发度,减少ETL的整体时间。