MongoDB -> kafka 高性能实时同步(采集)mongodb数据到kafka解决方案

写这篇博客的目的

让更多的人了解 阿里开源的MongoShake可以很好满足mongodb到kafka高性能高可用实时同步需求(项目地址:https://github.com/alibaba/MongoShake,下载地址:https://github.com/alibaba/MongoShake/releases)。至此博客就结束了,你可以愉快地啃这个项目了。还是一起来看一下官方的描述:

MongoShake is a universal data replication platform based on MongoDB‘s oplog. Redundant replication and active-active replication are two most important functions. 基于mongodb oplog的集群复制工具,可以满足迁移和同步的需求,进一步实现灾备和多活功能。

没有标题的标题

哈哈,有兴趣听我啰嗦的可以往下。最近,有个实时增量采集mongodb数据(数据量在每天10亿条左右)的需求,需要先调研一下解决方案。我分别百度、google了mongodb kafka sync 同步 采集 实时等 关键词,写这篇博客的时候排在最前面的当属kafka-connect(官方有实现https://github.com/mongodb/mongo-kafka,其实也有非官方的实现)那一套方案,我对kafka-connect相对熟悉一点(不熟悉的话估计编译部署都要花好一段时间),没测之前就感觉可能不满足我的采集性能需求,测下来果然也是不满足需求。后来,也看到了https://github.com/rwynn/route81,编译部署也较为麻烦,同样不满足采集性能需求。我搜索东西的时候一般情况下不会往下翻太多,没找到所需的,大多会尝试换关键词(包括中英文)搜搜,这次可能也提醒我下次要多往下找找,说不定有些好东西未必排在最前面几个

之后在github上搜in:readme mongodb kafka sync,让我眼前一亮。

点进去快速读了一下readme,正是我想要的(后面自己实际测下来确实高性能、高可用,满足我的需求),官方也提供了MongoShake的性能测试报告

这篇博客不讲(也很大可能是笔者技术太渣,无法参透领会(●′ω`●))MongoShake的架构、原理、实现,如何高性能的,如何高可用的等等。就一个目的,希望其他朋友在搜索实时同步mongodb数据时候,MongoShake的解决方案可以排在最前面(实力所归,谁用谁知道,独乐乐不如众乐乐,故作此博客),避免走弯路、绕路。

初次使用MongoShake值得注意的地方

数据处理流程

v2.2.1之前的MongoShake版本处理数据的流程:

MongoDB(数据源端,待同步的数据)
-->MongoShake(对应的是collector.linux进程,作用是采集)
-->Kafka(raw格式,未解析的带有header+body的数据)
-->receiver(对应的是receiver.linux进程,作用是解析,这样下游组件就能拿到比如解析好的一条一条的json格式的数据)
-->下游组件(拿到mongodb中的数据用于自己的业务处理)

v2.2.1之前MongoShake的版本解析入kafka,需要分别启collector.linux和receiver.linux进程,而且receiver.linux需要自己根据你的业务逻辑填充完整,然后编译出来,默认只是把解析出来的数据打个log而已

src/mongoshake/receiver/replayer.go中的代码如图:

详情见:https://github.com/alibaba/MongoShake/wiki/FAQ#q-how-to-connect-to-different-tunnel-except-direct

v2.2.1版本MongoShake的collector.conf有一个配置项tunnel.message

# the message format in the tunnel, used when tunnel is kafka.
# "raw": batched raw data format which has good performance but encoded so that users
# should parse it by receiver.
# "json": single oplog format by json.
# "bson": single oplog format by bson.
# 通道数据的类型,只用于kafka和file通道类型。
# raw是默认的类型,其采用聚合的模式进行写入和
# 读取,但是由于携带了一些控制信息,所以需要专门用receiver进行解析。
# json以json的格式写入kafka,便于用户直接读取。
# bson以bson二进制的格式写入kafka。
tunnel.message = json
  • 如果选择的raw格式,那么数据处理流程和上面之前的一致(MongoDB->MongoShake->Kafka->receiver->下游组件)
  • 如果选择的是jsonbson,处理流程为MongoDB->MongoShake->Kafka->下游组件

v2.2.1版本设置为json处理的优点就是把以前需要由receiver对接的格式,改为直接对接,从而少了一个receiver,也不需要用户额外开发,降低开源用户的使用成本。

简单总结一下就是:
raw格式能够最大程度的提高性能,但是需要用户有额外部署receiver的成本。json和bson格式能够降低用户部署成本,直接对接kafka即可消费,相对于raw来说,带来的性能损耗对于大部分用户是能够接受的。

高可用部署方案

我用的是v2.2.1版本,高可用部署非常简单。collector.conf开启master的选举即可:

# high availability option.
# enable master election if set true. only one mongoshake can become master
# and do sync, the others will wait and at most one of them become master once
# previous master die. The master information stores in the `mongoshake` db in the source
# database by default.
# 如果开启主备mongoshake拉取同一个源端,此参数需要开启。
master_quorum = true

# checkpoint存储的地址,database表示存储到MongoDB中,api表示提供http的接口写入checkpoint。
context.storage = database

同时我checkpoint的存储地址默认用的是database,会默认存储在mongoshake这个db中。我们可以查询到checkpoint记录的一些信息。

rs0:PRIMARY> use mongoshake
switched to db mongoshake
rs0:PRIMARY> show collections;
ckpt_default
ckpt_default_oplog
election
rs0:PRIMARY> db.election.find()
{ "_id" : ObjectId("5204af979955496907000001"), "pid" : 6545, "host" : "192.168.31.175", "heartbeat" : NumberLong(1582045562) }

我在192.168.31.174,192.168.31.175,192.168.31.176上总共启了3个MongoShake实例,可以看到现在工作的是192.168.31.175机器上进程。自测过程,高速往mongodb写入数据,手动kill掉192.168.31.175上的collector进程,等192.168.31.174成为master之后,我又手动kill掉它,最终只保留192.168.31.176上的进程工作,最后统计数据发现,有重采数据现象,猜测有实例还没来得及checkpoint就被kill掉了。

原文地址:https://www.cnblogs.com/itwild/p/12329601.html

时间: 2024-08-19 20:19:04

MongoDB -> kafka 高性能实时同步(采集)mongodb数据到kafka解决方案的相关文章

Postgresql 远程同步(非实时同步,小数据量)

源端要开通目标的相关访问权限 目标端: 1.建立远程表的视图 create view v_bill_tbl_version_update_control_info as SELECT * FROM dblink('hostaddr=10.10.10.8 port=4321 dbname=postgres user=postgres password=postgres', 'SELECT id,appid,ratio,status,create_time,char_package_name,ver

太平洋保险家园大数据项目DSG应用(30多个Oracle等实时同步到KAFKA)

太平洋保险集团"家园项目"大数据平台DSG应用(oracle&kafka) 项目背景根据太平洋保险集团的IT建设规划,在2017年年底,需要完成"一个太保,共同的家园"项目(简称家园项目),旨在给客户提供更加便携.全面的服务,通过一个家园平台,就能够完成所有的服务.众所周知,太平洋保险的业务范围非常广泛,囊括了产险.寿险.车险等业务,同时,一个险种又由多个系统共同提供服务.现在要在一个平台上完成这些服务,数据的汇聚.集中.转换就成了整个项目的核心与难点. 项

全网备份+NFS存储+单点实时同步.docx

目  录 老男孩教育 linux 运维就业班第八和九关(周)课后学习效果能力上机大考察... 1 1.1 逻辑图... 1 1.2 50 台集群服务器全网数据备份解决方案... 1 1.2.1 一.搭建rsync备份服务器... 3 1.2.2 二.web01本地测试... 7 1.2.3 web01本地脚本... 7 1.2.4 本地定时任务... 8 1.2.5 backup服务器上脚本... 8 1.2.6 backup定时任务... 8 1.3 网站集群后端 NFS 共享存储搭建及优化解

Flume和Kafka完成实时数据的采集

Flume和Kafka完成实时数据的采集 写在前面 Flume和Kafka在生产环境中,一般都是结合起来使用的.可以使用它们两者结合起来收集实时产生日志信息,这一点是很重要的.如果,你不了解flume和kafka,你可以先查看我写的关于那两部分的知识.再来学习,这部分的操作,也是可以的. 实时数据的采集,就面临一个问题.我们的实时数据源,怎么产生呢?因为我们可能想直接获取实时的数据流不是那么的方便.我前面写过一篇文章,关于实时数据流的python产生器,文章地址:http://blog.csdn

使用maxwell实时同步mysql数据到kafka

一.软件环境: 操作系统:CentOS release 6.5 (Final) java版本: jdk1.8 zookeeper版本: zookeeper-3.4.11 kafka 版本: kafka_2.11-1.1.0.tgz maxwell版本:maxwell-1.16.0.tar.gz 注意 : 关闭所有机器的防火墙,同时注意启动可以相互telnet ip 端口 二.环境部署 1.安装jdk export JAVA_HOME=/usr/java/jdk1.8.0_181 export P

(转)HubbleDotNet+Mongodb 构建高性能搜索引擎--概述

HubbleDotNet 从 1.2.3 版本以后开始在官方代码中支持和 Mongodb 对接,Mongodb 是10gen 公司开发的 no-sql 数据库,其读写性能比传统关系数据库要快很多,而且可以非常方便的分布式部署.HubbleDotNet 通过支持 Mongodb 也使其本身同时具备了 no-sql 的解决方案.本文将重点概述Hubble+Mongodb 的功能以及和hubble+sql , lucene.net 的一些性能测试对比. Mongodb 的安装 在开始本文之前,我们简单

利用python测试mongodb副本集数据同步延迟

本文主要介绍python如何连接mongodb副本集及读写分离配置,mongodb副本集数据同步延迟测试. 一.python连接mongodb副本集 1.连接副本集 在新版的驱动中直接使用MongoClient连接 如下: from pymongo import MongoClient conn = MongoClient(['192.168.3.11:27017', '192.168.3.12:27017', '192.168.3.13:27017']) 2.读写分离配置 from pymon

MongoDB数据库的主从配对,附数据迁移示例

数据中心在运行中有可能遇到各种硬件.电力.网络故障等问题,需要设计良好的系统来隔离,尽量减少对上层应用的影响,持续对外提供服务:一旦发生业务中断,也应尽快恢复.通过主从备份设计,在主应用系统发生故障时,备机能够及时接管应用,保持业务的连续性. 以如下这个基本的应用架构为例,数据库存在单点隐患,可以考虑利用MongoDB对主从的支持特性设计成Master-Slave部署,完成架构优化.(MongoDB的主从同步通过内网能达到最佳效率,如果只能通过公网进行主从同步,将会影响效率.) 可以直接使用Mo

OGG 从Oracle备库同步数据至kafka

OGG 从Oracle备库同步数据至kafka Table of Contents 1. 目的 2. 环境及规划 3. 安装配置JDK 3.1. 安装jdk 3.2. 配置环境变量 4. 安装Dataguard 4.1. 安装备库软件 4.2. 配置dataguard 4.2.1. 主库 4.2.2. 备库 4.3. 完成操作 4.4. 启动实时复制 5. zookeeper集群 5.1. 上传并解压 5.2. 配置 5.3. 创建myid文件 5.4. 配置环境变量 5.5. 启动和查看服务