本文聊下提供数据在线的一些经验
典型场景:数据组的同学每天凌晨在Hive上基于历史数据计算用户行为数据,通过工具将该数据推送到HBase,业务方通过RPC Service获取用户行为数据
整体架构
调度:启动计算/传输任务
监控:监控任务运行(可视化界面),异常报警
质量:对核心数据质做质量检查
传输:将数据从HDFS/HIVE导入HBase/Redis/Memcache/Mysql
RPC Service:通过Service获取数据
JDBC:获取Mysql中的数据
Memcache:作为HBase/Redis的查询缓冲层
业务接入存储选型
一个新增的数据服务,在选取存储类型时,follow以下原则:
1. 优先使用HBase/Redis,HBase/Redis无法支持的查询语义使用Mysql(分页查询,count)
2. 小数据量(内存占用<10G量级)或者对查询响应时间要求很高的应用使用Redis(user profile)
3. 海量数据使用HBase(用户行为流水/消息)
传输设计
参考阿里DataX的设计,实现了点评的异构数据离线传输工具wormhole,支持HBase、Redis、Memcache、Mysql写入
离线数据在计算完成后,通过wormhole将数据从Hive导出到HBase、Redis、Memcache、Mysql
遇到的问题:
1. 数据在写入时影响查询性能
方案:
- 增量导入:在Hive上实现了一个数据对比工具,只导入变更数据到线上服务存储
- 限速导入:导的过程中限速
2. 如何保证数据导入过程数据的可用性
对于Mysql:先将数据写入临时表,再完成切换
对于HBase/Memcache/Redis:使用Update的方式写入,不影响数据可用性
3. 传输任务失败怎么办?
参考之前写的一篇Sqoop的容错方案,思路基本上就以下几种:
对于一个传输工具/平台,传输任务失败不可怕,可怕的地方在于“脏数据”如何处理,3种思路:
1. 临时表:使用临时表缓存数据,然后在一个transaction中将临时表的数据move到目的表
2. 自定义回滚:通过用户自定义的语句/方法,在任务失败后,执行清数据操作
3. 传输任务的幂等性:如果一个任务失败了,产生了脏数据,解决问题后,再跑一次任务,能够最终正确,例如hive写入使用INSERT OVERWRITE
HBase方案设计
HBase集群主要目标包括:
1. 支持海量数据(数十亿级)高速存取
2. 集群高可用
3. 最终一致性
方案:
1. 两套HBaes集群,M-M做Replication
2. 用户通过Service访问
3. 封装客户端,主要工作:易用性、集群切换
示例,一个HBase查询API:
/** * 单个查询:根据表名、key、family、column返回value * * @param tableName * 表名 * @param key * 查询的key * @param family * 列所在的family(规范定义:“dim”) * @param column * 列名 * @return * 返回指定列的value, 指定列不存在返回null * @throws * IOException 远程调用失败或网络异常 */ public String query(String tableName, String key, String family, String column) throws IOException;
经验和问题:
1. 两套集群如何使用?
- 正常情况下,访问一个集群,如果集群出现问题,将请求手动切到另一个集群
- 在做集群变更的时候,backup集群承担了“预发环境”的职责
- 个别高写应用和其他应用分布在不同的集群,在某个集群出问题的时候,切到一个集群,服务降级(响应时间变大)
2. HBase遇到的问题
- 不适合对响应时间要求非常严格的应用
- 在做线上变更(配置修改,扩容等)时,风险很大,谨慎!谨慎!
3. 为什么通过RPC Service来请求
- 提高易用性
- 封装集群切换
- jar包依赖耦合过高
- Hadoop Kerberos安全机制令部署成本高,不利于业务扩张
Redis集群方案
由于需求原因,我们引入HBase比Redis更早,但HBase无法满足随机数据的高速读写需求,故而引入Redis,并做了集群方案
其设计目标:
1. 支持高速随机读写服务
2. 高可用
3. 最终一致性
方案:
- 通过RPC Service访问
- 客户端屏蔽多个Reids Node
- Sentine集群负责主从切换和向客户端汇报当前哪个结点为master结点
- 读:轮询
- 写/删:redis master
- 批量写/删:部分失败则全部失败
Redis部署
- 线上采用96G内存的机器,每台机器3个Redis-Server结点,每个结点内存使用量控制在20G以内,目标是降低Redis Servier结点fork子进程带来的影响
- 考虑到我们的应用场景,线上关闭自动bgrewriteaof,每天凌晨同一台机器上的Redis-Server结点顺序做bgrewriteaof
- 监控:zabbix+logscan
Memcache:
采用了公司现有方案
封装了客户端,客户端采用一致性Hash,多台Memcache机器组成集群,提供超越单台机器内存的存储能力
不保证可用性
模型
在我们的应用中,90%以上的请求为KV型的请求
典型应用:
通过userid查询这个用户的sex和age
针对这类需求,我们统一了HBase/Memcache/Redis的查询模型为,目标是:
数据开发同学在Hive上新增了一个字段,业务方线上可以请求到这个字段,而无需了解下层存储是什么
模型:
TableName | Family | Key | Columns |
TableName:Hive中表名,包括模式名,示例:bi.dprpt_user_city_profile_service
Family:列簇,一个Column Family中可以由任意多个Column组成
Key:业务Key(例如userid,guid)
Columns:Hbase中的qualifiers,在Redis/Memcache中被序列化为Json
在HBase中,上述概念和HBase一致
在Reids/Memcache中:
TableName + Family + Key组织成为物理存储的Key
Columns被序列化为Json
从而统一查询接口:
/** * 单个查询:根据表名、key、family、column list返回value * * @param tableName * 表名 * @param key * 查询的key * @param family * 列所在的family(默认:“dim”) * @param columnList * 列List * @return * 返回指定列(json格式) * @throws * IOException 远程调用失败或网络异常 */ public String query(String tableName, String family, String key, List<String> columnList) throws IOException;
示例,一张Hive中的表:
HIVE > select * from bi.dprpt_user_city_profile_service limit 1;
OK
1_1 2010-09-02 2014-05-02 2010-09-02 2014-05-02 2011-06-01 2013-09-27 2010-06-08 2013-09-16 2012-08-29 2013-07-08 2 101 842
在HBase中保存为:
hbase(main):001:0> get ‘bi.dprpt_user_city_profile_service‘,‘1_1‘
COLUMN CELL
dim:first_app_tg_date timestamp=1400025620039, value=2012-08-29
dim:first_app_visit_date timestamp=1400025620039, value=2010-09-02
dim:first_tg_date timestamp=1400025620039, value=2010-06-08
dim:first_tg_visit_date timestamp=1400025620039, value=2011-06-01
dim:first_visit_date timestamp=1400025620039, value=2010-09-02
dim:last_app_tg_date timestamp=1400025620039, value=2013-07-08
dim:last_app_visit_date timestamp=1400025620039, value=2014-05-02
dim:last_tg_date timestamp=1400025620039, value=2013-09-16
dim:last_tg_visit_date timestamp=1400025620039, value=2013-09-27
dim:last_visit_date timestamp=1400025620039, value=2014-05-02
dim:prefer_tg_cat0 timestamp=1400025620039, value=2
dim:prefer_tg_cat1 timestamp=1400025620039, value=101
dim:prefer_tg_region timestamp=1400025620039, value=842
在Redis/Memcache中保存为(7036447591228069586为tablename+family+key做MurmurHash之后的值):
Redis > get 7036447591228069586
"{"prefer_tg_cat0":"2","last_tg_date":"2013-09-16","last_tg_visit_date":"2013-09-27","last_app_visit_date":"2014-05-02","first_app_tg_date":"2012-08-29","first_visit_date":"2010-09-02","last_app_tg_date":"2013-07-08","prefer_tg_cat1":"101","prefer_tg_region":"842","first_tg_date":"2010-06-08","last_visit_date":"2014-05-02","first_app_visit_date":"2010-09-02","first_tg_visit_date":"2011-06-01"}"
性能
在我们的应用场景中:通过一个key,获取多个column
各个存储响应时间(单位ms):
AVG | 95线 | 99.9线 | |
HBase | 2.2 | 7.8 | 62.6 |
Memcache | 0.7 | 1.0 | 3.6 |
Redis | 0.3 | 0.6 | 1.2 |
集群规模、QPS、应用场景不一样,都会导致上述数据不一样,仅供参考