canal+kafka订阅Mysql binlog将数据异构到elasticsearch(或其他存储方式)

canal本质就是"冒充"从库,通过订阅mysql bin-log来获取数据库的更改信息。

mysql配置(my.cnf)

mysql需要配置my.cnf开启bin-log日志并且将bin-log日志格式设置为row, 同时为了防止bin-log日志占用过多磁盘,可以设置一下过期时间,

[mysqld]
log-bin=mysql-bin # 打开binlog
binlog-format=ROW # ROW格式
server_id=1 # mysql Replication 需要设置 在mysql集群里唯一

expire_logs_days=7 # binlog文件保存7天
max_binlog_size = 500m # 每个binlog日志文件大小 

canal配置

除了kafka之外,canal还支持将数据库修改的消息投递到rocketMQ, 或者不经过消息队列直接投递到canal的客户端,然后再在客户端实现自己的代码(如写入其他存储/其他消息队列) ,但是只能选其一。而如果选择canal客户端的方式, 一个canal server也只能将消息投递到一个canal client。

但是可以开启多个canal服务端和客户端(同一个实例,即对mysql来说只算是一个从库), 他们通过zookeeper保证只有一个服务端和客户端是有效的,其他只是作为HA的冗余。

然后需要修改canal目录下(以下为近最小配置)

conf/example/instance.properties

## mysql serverId
canal.instance.mysql.slaveId = 1234

# 数据库address
canal.instance.master.address = 127.0.0.1:3306

# 数据库账号密码
canal.instance.dbUsername = canal
canal.instance.dbPassword = canal

# 需要订阅的数据库.表名 默认全部
canal.instance.filter.regex = .\*\\\\..\*  # 去掉转义符其实就是 .*\..*

# topic名 固定
canal.mq.topic=canal

# 动态topic
# canal.mq.dynamicTopic=mytest,.*,mytest.user,mytest\\..*,.*\\..*

# 库名.表名: 唯一主键,多个表之间用逗号分隔
# canal.mq.partitionHash=mytest.person:id,mytest.role:id

其中动态topic 和 主键hash看上去有点难理解,去看其他人的博客找到的解释和例子如下

动态topic

canal 1.1.3版本之后, 支持配置格式:schema 或 schema.table,多个配置之间使用逗号分隔

例子1:test.test 指定匹配的单表,发送到以 test_test为名字的topic上
例子2:.…* 匹配所有表,每个表都会发送到各自表名的topic上
例子3:test 指定匹配对应的库,一个库的所有表都会发送到库名的topic上
例子4:test.* 指定匹配的表达式,针对匹配的表会发送到各自表名的topic上
例子5:test,test1.test1,指定多个表达式,会将test库的表都发送到test的topic上,test1.test1的表发送到对应的test1_test1 topic上,其余的表发送到默认的canal.mq.topic值
支持指定topic名称匹配, 配置格式:topicName:schema 或 schema.table,多个配置之间使用逗号分隔, 多组之间使用 ; 分隔

例子:test:test,test1.test1;test2:test2,test3.test1 针对匹配的表会发送到指定的topic上
————————————————
版权声明:本文为CSDN博主「BillowX_」的原创文章,遵循 CC 4.0 BY-SA 版权协议,转载请附上原文出处链接及本声明。
原文链接:https://blog.csdn.net/weixin_35852328/article/details/87600871

主键

canal 1.1.3版本之后, 支持配置格式:schema.table:pk1^pk2,多个配置之间使用逗号分隔

例子1:test.test:pk1^pk2 指定匹配的单表,对应的hash字段为pk1 + pk2
例子2:.…:id 正则匹配,指定所有正则匹配的表对应的hash字段为id
例子3:.…
:pkpkpk 正则匹配,指定所有正则匹配的表对应的hash字段为表主键(自动查找)
例子4: 匹配规则啥都不写,则默认发到0这个partition上
例子5:.…* ,不指定pk信息的正则匹配,将所有正则匹配的表,对应的hash字段为表名
按表hash: 一张表的所有数据可以发到同一个分区,不同表之间会做散列 (会有热点表分区过大问题)
例子6: test.test:id,.…* , 针对test的表按照id散列,其余的表按照table散列
注意:大家可以结合自己的业务需求,设置匹配规则,多条匹配规则之间是按照顺序进行匹配(命中一条规则就返回)
————————————————
版权声明:本文为CSDN博主「BillowX_」的原创文章,遵循 CC 4.0 BY-SA 版权协议,转载请附上原文出处链接及本声明。
原文链接:https://blog.csdn.net/weixin_35852328/article/details/87600871

最后实现消费kafka上canal topic上消息的代码

这里以go为例,可以写入到elasticsearch/redis/其他

package main

import (
    "bytes"
    "context"
    "encoding/json"
    "fmt"
    "github.com/Shopify/sarama"
    "github.com/elastic/go-elasticsearch/esapi"
    "github.com/elastic/go-elasticsearch/v6"
    "os"
)

var esClient *elasticsearch.Client

func init() {
    var err error
    config := elasticsearch.Config{}
    config.Addresses = []string{"http://127.0.0.1:9200"}
    esClient, err = elasticsearch.NewClient(config)
    checkErr(err)
}

type Msg struct {
    Data []struct {
        Id string `json:"id"`
        A  string `json:"a"`
    } `json:"data"`
    Type     string `json:"type"`
    DataBase string `json:"database"`
    Table    string `json:"table"`
}

func checkErr(err error) {
    if err != nil {
        fmt.Println(err)
        os.Exit(-1)
    }
}

type Consumer struct{}

func (c *Consumer) Setup(sarama.ConsumerGroupSession) error {
    return nil
}

func (c *Consumer) Cleanup(sarama.ConsumerGroupSession) error {
    return nil
}

func (consumer *Consumer) ConsumeClaim(session sarama.ConsumerGroupSession, claim sarama.ConsumerGroupClaim) error {
    for message := range claim.Messages() {
    // fmt.Printf("Message claimed: value = %s, timestamp = %v, topic = %s", string(message.Value), message.Timestamp, message.Topic)
        msg := &Msg{}
        err := json.Unmarshal(message.Value, msg)
        checkErr(err)
        if msg.DataBase == "test" && msg.Table == "tbltest" {
            if msg.Type == "INSERT" {
                for k, _ := range msg.Data {
                    // 写elasticsearch 逻辑
                    body := map[string]interface{}{
                        "id": msg.Data[k].Id,
                        "a":  msg.Data[k].A,
                    }
                    jsonBody, _ := json.Marshal(body)
                    req := esapi.IndexRequest{
                        Index:      msg.DataBase,
                        DocumentID: msg.Table + "_" + msg.Data[k].Id,
                        Body:       bytes.NewReader(jsonBody),
                    }
                    res, err := req.Do(context.Background(), esClient)
                    checkErr(err)
                    fmt.Println(res.String())
                    res.Body.Close()
                    session.MarkMessage(message, "")
                }
            }
        }
    }
    return nil
}

func main() {
    consumer := &Consumer{}

    config := sarama.NewConfig()
    config.Version = sarama.MaxVersion
    client, err := sarama.NewConsumerGroup([]string{"127.0.0.1:9092"}, "tg", config)
    checkErr(err)
    ctx := context.Background()

    client.Consume(ctx, []string{"canal"}, consumer)
}

原文地址:https://www.cnblogs.com/Me1onRind/p/11565501.html

时间: 2024-11-01 17:12:05

canal+kafka订阅Mysql binlog将数据异构到elasticsearch(或其他存储方式)的相关文章

Canal+Kafka实现MySql与Redis数据一致性

在生产环境中,经常会遇到MySql与Redis数据不一致的问题.那么如何能够保证MySql与Redis数据一致性的问题呢?话不多说,咱们直接上解决方案. 如果对Canal还不太了解的可以先去看一下官方文档:https://github.com/alibaba/canal 首先,咱们得先开启MySql的允许基于BinLog文件主从复制.因为Canal的核心原理也是相当于把自己当成MySql的一个从节点,然后去订阅主节点的BinLog日志. 开启BinLog文件配置 1. 配置MySQL的  my.

[转载] 利用flume+kafka+storm+mysql构建大数据实时系统

原文: http://mp.weixin.qq.com/s?__biz=MjM5NzAyNTE0Ng==&mid=205526269&idx=1&sn=6300502dad3e41a36f9bde8e0ba2284d&key=c468684b929d2be22eb8e183b6f92c75565b8179a9a179662ceb350cf82755209a424771bbc05810db9b7203a62c7a26&ascene=0&uin=Mjk1ODMy

利用flume+kafka+storm+mysql构建大数据实时系统

架构图 数据流向图 1.Flume 的一些核心概念: 2.数据流模型 Flume以agent为最小的独立运行单位.一个agent就是一个JVM.单agent由Source.Sink和Channel三大组件构成,如下图: Flume的数据流由事件(Event)贯穿始终.事件是Flume的基本数据单位,它携带日志数据(字节数组形式)并且携带有头信息,这些Event由Agent外部的Source,比如上图中的Web Server生成.当Source捕获事件后会进行特定的格式化,然后Source会把事件

MySql binlog恢复数据

1. 直接导入数据库 mysqlbinlog --database=testdb mysql-bin.000001 | mysql -uroot -f 2. 导出成SQL文 (1) 从binlog输出为SQL mysqlbinlog -vv --database=testdb --base64-output=decode-rows mysql-bin.000001 > 0001.sql grep "###" 0001.sql > 0001_#.sql 导出的SQL文如下格式

--------------------------------------------- 第二讲-------- 第一节------数据比较多的问题的存储方式

下面说线性结构,线性结构是数据结构中最基础最简单的一种结构类型 其中典型的是线性表 线性表:举一个列子 下面有一个一元多项式F(x)=a0+a1*x+a2*x+~~~~~~~+an*x; 请你思考并给出,你所能想到的几种储存方式. 1:   用一个数组将其系数储存起来,然后用for循环这样一个一个相加. ------弊端是   1:时间复杂度比较高,机器做了许多的无用功,例如当一元多项式为2*x+3*x^2000.这样就做了许许多多的无用功----------------------下面给出对于

Mysql Binlog 三种格式介绍及分析

一.Mysql Binlog格式介绍       Mysql binlog日志有三种格式,分别为Statement,MiXED,以及ROW! 1.Statement:每一条会修改数据的sql都会记录在binlog中. 优点:不需要记录每一行的变化,减少了binlog日志量,节约了IO,提高性能.(相比row能节约多少性能与日志量,这个取决于应用的SQL情况,正常同一条记录修改或者插入row格式所产生的日志量还小于Statement产生的日志量,但是考虑到如果带条件的update操作,以及整表删除

缓存一致性和跨服务器查询的数据异构解决方案canal

当你的项目数据量上去了之后,通常会遇到两种情况,第一种情况应是最大可能的使用cache来对抗上层的高并发,第二种情况同样也是需要使用分库 分表对抗上层的高并发...逼逼逼起来容易,做起来并不那么乐观,由此引入的问题,不见得你有好的解决方案,下面就具体分享下. 一:尽可能的使用Cache 比如在我们的千人千面系统中,会针对商品,订单等维度为某一个商家店铺自动化建立大约400个数据模型,然后买家在淘宝下订单之后,淘宝会将订单推 送过来,订单会在400个模型中兜一圈,从而推送更贴切符合该买家行为习惯的

Canal——增量同步MySQL数据到ES

1.准备 1.1.组件 JDK:1.8版本及以上: ElasticSearch:6.x版本,目前貌似不支持7.x版本:     Canal.deployer:1.1.4 Canal.Adapter:1.1.4 1.1.配置 需要先开启MySQL的 binlog 写入功能,配置 binlog-format 为 ROW 模式 找到my.cnf文件,我的目录是/etc/my.cnf,添加以下配置: log-bin=mysql-bin # 开启 binlog binlog-format=ROW # 选择

利用Canal解析mysql binlog日志

一.安装包下载(canal.deployer-x.x.x.tar.gz  官方建议使用1.0.22版本) https://github.com/alibaba/canal/releases 二.解压文件 tar -zxvf canal.deployer-1.0.22.tar.gz -C /app/canal/ 三.修改canal配置文件 vim $CANAL_HOME/conf/canal.properties vim $CANAL_HOME/conf/example/instance.prop