维度表,作为数据仓库里面的概念,是维度属性的集合,比如时间维、地点维;
但这里要讨论流计算中的维度表问题,
流计算中维表问题和数据仓库中有所不同,往往是因为通过agent采集到的数据比较有限,在做数据业务的时候,需要先实时的把这些维度信息给补全;
这个问题其实就是,主数据流和多个静态表或半静态表之间的join问题。
在flink中称为side input问题,https://cwiki.apache.org/confluence/display/FLINK/FLIP-17+Side+Inputs+for+DataStream+API
解决维表问题考虑到点,
a. 对元数据库的读压力;如果分析程序有1000并发,是否需要读1000次
b. 读维表数据不能拖慢主数据流的throughput,每秒千万条数据量
c. 动态维表更新问题和一致性问题;元数据是不断变化的,如何把更新同步到各个并发上
d. 冷启动问题,如何保证主数据流流过的时候,维表数据已经ready,否则会出现数据无法处理
e. 超大维表数据会导致流量抖动和频繁gc,比如几十万条的实例数据,可能上百兆
下面谈谈我们解决这个问题的思路,
1. 最简单的版本,每个进程都会独立的去从元数据里面读取元数据;
这样的优点是简单,c,d问题天然解决;但只能适用于数据量较小的场景,否则并发太大,a,肯定就无法满足
2. 随着业务量的扩大,处理程序的并发越来越大,1,很快会达到瓶颈
我们就采用新的方案,这个方案是在Jstorm环境实现的,用一个spout读,然后广播给所有的处理进程
这个方案主要解决a,c的问题,
但是也引入了d,e的问题,
解决d,Jstorm支持让某个spout在job启动后等待一段时间,所以可以让主数据流spout等待几分钟再开始读数据,这样保证数据到的时候,维表数据已经ready;这个解法每次重启job都要等好几分钟,体验挺差的,但是勉强可以work
e问题,一个spout广播超大维表到几百并发的线程,首先就是会队列满,因为jstorm发一份数据到所有并发的时候,是需要产生几百份真实数据在队列中的;然后GC也会很严重,因为大量的临时对象会产生释放,在传输和进程cache过程中,会导致业务抖动
这个问题只能增加内存和worker数来解决,否则job有可能会完全hang死
我们也用Chronicle Map(https://github.com/OpenHFT/Chronicle-Map)来尝试解决内存使用和gc的问题
BTW,有同学问,如果让数据和维表数据都 shuffle by key,是不是可以缓解这个问题
如果数据量比较小,可以考虑,但是对于我们的主数据流的数据量,是没法shuffle的,所以需要在每个并发上保留全量的维表信息
2.1 用Flink带替换Jstorm
Flink虽然在window,乱序,一致性等方面做了很大的改进,但是在这个问题上仍然没有很好的解,上面提到的side input也没有实现出来;
并且Flink随着更多的高层的封装,程序员的开发自由度是降低的,和JStorm比,所以如果用Flink解决上面的问题,没有本质变化,可能JStorm更麻烦;
需要用ConnectedStreams去joine数据流和side input流,
对于d问题,没法直接解决
对于e问题,因为flink对内存管理做的比较好,gc问题有所缓解,但是job抖动的问题还是会存在
因为广播这么大的数据,会中断主数据流的数据处理,也会大大增加checkpoint的时间,如下图,可以看到30分钟一次的同步
BTW,Flink保障一致性,提供checkpoint机制,但也增加复杂性,这个地方处理不好会有很多问题
比如,如果在source中同步读数据库数据,如果读库的时间比较长,就会hang住主数据流,因为其他operator都会等它完成checkpoint,写JStorm的程序员需要注意这点,Flink需要更精细的控制,任何operator,任何并发的hang都会导致整个任务hang
我个人尝试使用flink本身的机制,statebackend,rocksdb等来更优雅的解决这个问题,但是没有发现比较好的方法,或者实现过于复杂
3. Redis版本
这其实是把1,2方法做了综合
使用redis来做cache,只用一个job,负责从元数据库同步数据到redis,这样就解决a,c
然后所有的并发都从redis直接查询需要的元数据,这样就解决d
对于b,在并发上做local cache,只有第一次需要真正查询redis,后续定期异步更新就好,不会影响到主数据流
对于e,因为现在不需要一下全量的读取维表数据到内存,用到的时候才去读,分摊了负载,也可以得到缓解
该方案当前线上跑着,还算比较稳定
这个方案最大的缺点是增加依赖,对于需要全球多region大规模部署的应用,增加依赖是成本极高的
同时要额外保障redis和同步job的稳定性
BTW,这里不建议local cache用LRU,因为要考虑到当redis挂掉或同步job挂掉的时候,不能影响主数据流,所以我只会异步的更新local cache,但不去做过期,这样就算redis挂了,也只是影响更新的实例,大大降低故障发生概率
总结,
分享一下自己的一些实战经验,希望可以抛砖引玉,找到更合理,优雅的方案