Canal+Kafka实现MySql与Redis数据一致性

  在生产环境中,经常会遇到MySql与Redis数据不一致的问题。那么如何能够保证MySql与Redis数据一致性的问题呢?话不多说,咱们直接上解决方案。

  如果对Canal还不太了解的可以先去看一下官方文档:https://github.com/alibaba/canal

  首先,咱们得先开启MySql的允许基于BinLog文件主从复制。因为Canal的核心原理也是相当于把自己当成MySql的一个从节点,然后去订阅主节点的BinLog日志。

  开启BinLog文件配置

  1. 配置MySQL的  my.ini/my.cnf  开启允许基于binlog文件主从同步

log-bin=mysql-bin #添加这一行就ok
binlog-format=ROW #选择row模式
server_id=1 #配置mysql replaction需要定义,不能和canal的slaveId重复

  配置该文件后,重启mysql服务器即可

  show variables like ‘log_bin‘;//查询MySql是否开启了log_bin.没有开启log_bin的值是OFF,开启之后是ON

  2. 添加cannl的账号或者直接使用自己的root账号。添加完后一定要检查mysql  user 权限为y(SELECT* from `user` where user=‘canal‘)

drop user ‘canal‘@‘%‘;
CREATE USER ‘canal‘@‘%‘ IDENTIFIED BY ‘canal‘;
grant all privileges on *.* to ‘canal‘@‘%‘ identified by ‘canal‘;
flush privileges;

  整合Kafka

  1. 由于Kafka依赖Zookeeper,先安装zookeeper

   zoo_sample.cfg  修改为 zoo.cfg

   修改 zoo.cfg 中的 dataDir=E:\zkkafka\zookeeper-3.4.14\data

   新增环境变量:

   ZOOKEEPER_HOME: E:\zkkafka\zookeeper-3.4.14  (zookeeper目录)

   Path: 在现有的值后面添加 ";%ZOOKEEPER_HOME%\bin;"

   运行zk  zkServer.cmd。

   针对闪退,可按照以下步骤进行解决(参考:https://blog.csdn.net/pangdongh/article/details/90208230):

   1 、编辑zkServer.cmd文件末尾添加pause 。这样运行出错就不会退出,会提示错误信息,方便找到原因。

   2.如果报错内容为:-Dzookeeper.log.dir=xxx"‘ 不是内部或外部命令,也不是可运行的程序 或批处理文件的解决。则建议修改zkServer.cmd文件:

@echo off
REM Licensed to the Apache Software Foundation (ASF) under one or more
REM contributor license agreements.  See the NOTICE file distributed with
REM this work for additional information regarding copyright ownership.
REM The ASF licenses this file to You under the Apache License, Version 2.0
REM (the "License"); you may not use this file except in compliance with
REM the License.  You may obtain a copy of the License at
REM
REM     http://www.apache.org/licenses/LICENSE-2.0
REM
REM Unless required by applicable law or agreed to in writing, software
REM distributed under the License is distributed on an "AS IS" BASIS,
REM WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
REM See the License for the specific language governing permissions and
REM limitations under the License.

setlocal
call "%~dp0zkEnv.cmd"

set ZOOMAIN=org.apache.zookeeper.server.quorum.QuorumPeerMain
echo on
java "-Dzookeeper.log.dir=%ZOO_LOG_DIR%" "-Dzookeeper.root.logger=%ZOO_LOG4J_PROP%" -cp "%CLASSPATH%" %ZOOMAIN% "%ZOOCFG%" %*

endlocal
pause

  2. 安装kafka

  解压 kafka_2.13-2.4.0 改名为 kafka

  修改 server.properties中的配置

  log.dirs=E:\zkkafka\kafka\logs

  启动Kafka:

  Cmd  进入到该目录:

  cd   E:\zkkafka\kafka

  .\bin\windows\kafka-server-start.bat .\config\server.properties

  如果启动报系统找不到指定的路径,进入kafka目录kafka\bin\windows\kafka-run-class.bat,将set JAVA="%JAVA_HOME%/bin/java"改为java环境安装的绝对路径

  例如:set JAVA="D:\LI\JDK\jdk1.8.0_152\bin\java"

Canal配置更改

  1.修改 example/instance.properties

  canal.mq.topic=maikt-topic

  2.修改 canal.properties

  # tcp, kafka, RocketMQ

  canal.serverMode = kafka

  canal.mq.servers = 127.0.0.1:9092

  3.启动startup.bat  查看 \logs\example example.log日志文件是否有 start successful....

SpringBoot项目整合kafka
  maven依赖
      <!-- springBoot集成kafka -->
        <dependency>
            <groupId>org.springframework.kafka</groupId>
            <artifactId>spring-kafka</artifactId>
        </dependency>
        <!-- SpringBoot整合Web组件 -->
        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-web</artifactId>
        </dependency>
        <dependency>
            <groupId>com.alibaba</groupId>
            <artifactId>fastjson</artifactId>
            <version>1.2.62</version>
        </dependency>
        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-data-redis</artifactId>
        </dependency>

  application.yml

# kafka
spring:
  kafka:
    # kafka服务器地址(可以多个)
    bootstrap-servers: 127.0.0.1:9092
    consumer:
      # 指定一个默认的组名
      group-id: kafka2
      # earliest:当各分区下有已提交的offset时,从提交的offset开始消费;无提交的offset时,从头开始消费
      # latest:当各分区下有已提交的offset时,从提交的offset开始消费;无提交的offset时,消费新产生的该分区下的数据
      # none:topic各分区都存在已提交的offset时,从offset后开始消费;只要有一个分区不存在已提交的offset,则抛出异常
      auto-offset-reset: earliest
      # key/value的反序列化
      key-deserializer: org.apache.kafka.common.serialization.StringDeserializer
      value-deserializer: org.apache.kafka.common.serialization.StringDeserializer
    producer:
      # key/value的序列化
      key-serializer: org.apache.kafka.common.serialization.StringSerializer
      value-serializer: org.apache.kafka.common.serialization.StringSerializer
      # 批量抓取
      batch-size: 65536
      # 缓存容量
      buffer-memory: 524288
  redis:
    host: 1.1.1.1
#    password:
    port: 6379
    database: 10
    password: 123456

  Redis工具类

@Component
public class RedisUtils {

    /**
     * 获取我们的redis模版
     */
    @Autowired
    private StringRedisTemplate stringRedisTemplate;

    public void setString(String key, String value) {
        setString(key, value, null);
    }

    public void setString(String key, String value, Long timeOut) {
        stringRedisTemplate.opsForValue().set(key, value);
        if (timeOut != null) {
            stringRedisTemplate.expire(key, timeOut, TimeUnit.SECONDS);
        }
    }

    public String getString(String key) {
        return stringRedisTemplate.opsForValue().get(key);
    }

    /**
     * redis当成数据库中
     * <p>
     * 注意事项:对我们的redis的key设置一个有效期
     */

    public boolean deleteKey(String key) {
        return stringRedisTemplate.delete(key);
    }

}

  Kafka主题监听方法(往redis同步数据的代码可以根据自己的需求去完善,本代码只是做测试用)

 @KafkaListener(topics = "maikt-topic")
    public void receive(ConsumerRecord<?, ?> consumer) {
        System.out.println("topic名称:" + consumer.topic() + ",key:" +
                consumer.key() + "," +
                "分区位置:" + consumer.partition()
                + ", 下标" + consumer.offset() + "," + consumer.value());

        String json = (String) consumer.value();
        JSONObject jsonObject = JSONObject.parseObject(json);
        String sqlType = jsonObject.getString("type");
        JSONArray data = jsonObject.getJSONArray("data");
        if(data!=null)
        {
            JSONObject userObject = data.getJSONObject(0);
            String id = userObject.getString("id");
            String database = jsonObject.getString("database");
            String table = jsonObject.getString("table");
            String key = database + "_" + table + "_" + id;
            if ("UPDATE".equals(sqlType) || "INSERT".equals(sqlType)) {
                redisUtils.setString(key, userObject.toJSONString());
                return;
            }
            if ("DELETE".equals(sqlType)) {
                redisUtils.deleteKey(key);
            }
        }

    }

第一次写文章,如果有不足的地方,欢迎各位大佬指正。  

来源于:蚂蚁课堂

原文地址:https://www.cnblogs.com/nanyuchen/p/12395113.html

时间: 2024-10-21 03:31:30

Canal+Kafka实现MySql与Redis数据一致性的相关文章

canal+kafka订阅Mysql binlog将数据异构到elasticsearch(或其他存储方式)

canal本质就是"冒充"从库,通过订阅mysql bin-log来获取数据库的更改信息. mysql配置(my.cnf) mysql需要配置my.cnf开启bin-log日志并且将bin-log日志格式设置为row, 同时为了防止bin-log日志占用过多磁盘,可以设置一下过期时间, [mysqld] log-bin=mysql-bin # 打开binlog binlog-format=ROW # ROW格式 server_id=1 # mysql Replication 需要设置

mysql到redis的复制

系统开发中时常会需要缓存来提升并发读的能力,这时可以通过mysql的UDF和hiredis来进行同步 前题:安装了mysql5.6和client ,开发环境window7 32.vs2013 1.安装redis 和 hiredis 下载地址 GitHub上的MSOpenTech/redis项目地址 具体编译安装方法可以查看 http://www.cnblogs.com/raker/p/4368741.html 2.下载安装mysql2redis https://github.com/dawnbr

Canal——增量同步MySQL数据到ES

1.准备 1.1.组件 JDK:1.8版本及以上: ElasticSearch:6.x版本,目前貌似不支持7.x版本:     Canal.deployer:1.1.4 Canal.Adapter:1.1.4 1.1.配置 需要先开启MySQL的 binlog 写入功能,配置 binlog-format 为 ROW 模式 找到my.cnf文件,我的目录是/etc/my.cnf,添加以下配置: log-bin=mysql-bin # 开启 binlog binlog-format=ROW # 选择

一步完成 MySQL 向 Redis 迁移

从mysql搬一个大表到redis中,你会发现在提取.转换或是载入一行数据时,速度慢的让你难以忍受.这里我就要告诉一个让你解脱的小技巧.使用“管道输出”的方式把mysql命令行产生的内容直接传递给redis-cli,以绕过“中间件”的方式使两者在进行数据操作时达到最佳速度. 一个约八百万行数据的mysql表,原本导入到redis中需要90分钟,使用这个方法后,只需要两分钟.不管你信不信,反正我是信了. zzxworld zzxworld 翻译于 3年前 14人顶 顶 翻译的不错哦! 其它翻译版本

webmagic自定义存储(mysql、redis存储)

在很多时候,我们使用webmagic爬取网站的时候,爬取的数据希望存储在mysql.redis中.因此需要对其扩展,实行自定义PipeLine.首先我们了解一下webmagic 的四个基本组件 一. WebMagic的四个组件 1.Downloader Downloader负责从互联网上下载页面,以便后续处理.WebMagic默认使用了HttpClient作为下载工具. 2.PageProcessor PageProcessor负责解析页面,抽取有用信息,以及发现新的链接.WebMagic使用J

线上项目mysql、redis平滑迁移方案及步骤

1.清晰系统内网及公网可达,CVM配置 2.迁移完整数据,项目部署,测试网络环境. redis:复制rdb文件mysql:xtrabackup备份3.确保项目正常运行,网络正常访问.项目对外接口及账户中心访问可达.4.初始化redis,mysql.5.配置网络环境,同步mysql 1.主库创建同步账号,配置腾讯云mysql为从并可写.配置log-bin 2.主库xtrabackup备份,设置从库导入.获取同步点,启动从库(可写),校验状态.6.配置网络环境,同步redis 1.配置腾讯云redi

实现mysql和redis之间的触发数据同步——mysql 触发器+gearman+php.worker

上回一次我们已经实现了 redis 作为 mysql 的缓存服务器,但是如果更新了 mysql,redis 中仍然会有对应的 KEY,数据就不会更新,此时就会出现 mysql 和 redis 数据不一致的情 况. 详情请见        基于redis缓存数据库实现lnmp架构高速访问 所以接下来就要通过 mysql 触发器将改变的数据同步到 redis 中. 因为mysql和redis数据格式不同,不能实现直接同步,所以 将MySQL数据首先放入Gearman中,然后通过一个自己编写的PHP

linux安装和配置 mysql、redis 过程中遇到的问题记录(转)

章节目录 mysql redis linux下部署mysql和redis网上的教程很多,这里记录一下我部署.配置的过程中遇到的一些问题和解决办法. mysql ①安装完成后启动的时候报错 Starting MySQL.The server quit without updating PID file 干掉/etc/my.cnf 就好了 原因是: mysql_safe 下指定的  /var/run/mariadb 和 /var/log/mariadb 不存在 也可以选择新建这两个目录 ②安装好之后

一步完成MySQL向Redis迁移

在把一个大表从 MySQL 迁移到 Redis 时,你可能会发现,每次提取.转换.导入一条数据是让人难以忍受的慢!这里有一个技巧,你可以通过使用管道把 MySQL 的输出直接输入到 redis-cli输入端,这可以使两个数据库都能以他们的最顶级速度来运行. 使用了这个技术,我把 800 万条 MySQL 数据导入到 Redis 的时间从 90 分钟缩短到了两分钟. Mysql到Redis的数据协议 redis-cli命令行工具有一个批量插入模式,是专门为批量执行命令设计的.这第一步就是把Mysq