spring cloud集成canal

前提

win运行canal

加入canal依赖

1 <dependency>
2     <groupId>com.alibaba.otter</groupId>
3     <artifactId>canal.client</artifactId>
4     <version>1.1.3</version>
5 </dependency>

把ip、端口、监听表名做成配置文件

代码实现

  1 package com.frame.modules.dabis.archives.thread;
  2
  3 import com.alibaba.fastjson.JSONObject;
  4 import com.alibaba.otter.canal.client.CanalConnector;
  5 import com.alibaba.otter.canal.client.CanalConnectors;
  6 import com.alibaba.otter.canal.protocol.CanalEntry;
  7 import com.alibaba.otter.canal.protocol.Message;
  8 import com.frame.solr.em.SolrCode;
  9 import com.frame.utils.PropertiesLoader;
 10 import org.apache.commons.logging.Log;
 11 import org.apache.commons.logging.LogFactory;
 12
 13 import java.net.InetSocketAddress;
 14 import java.util.HashMap;
 15 import java.util.List;
 16 import java.util.Map;
 17
 18 /**
 19  * @author liwei
 20  * @date 2019/8/2 14:39
 21  * @desc Created with IntelliJ IDEA.
 22  */
 23 public class CanalThread implements Runnable {
 24
 25     Log log = LogFactory.getLog(CanalThread.class);
 26
 27     private String solrName = SolrCode.ARCHIVES.getValue();
 28
 29
 30     @Override
 31     public void run() {
 32         PropertiesLoader loader = new PropertiesLoader("solrConfig.properties");
 33         listener(loader.getProperty("canalHost"), loader.getProperty("canalPort"), loader.getProperty("canalTable"));
 34     }
 35
 36
 37     public void listener(String canalHost, String canalPort, String table) {
 38         // 创建链接
 39         CanalConnector connector = CanalConnectors.newSingleConnector(new InetSocketAddress(canalHost, Integer.valueOf(canalPort)), "example", "", "");
 40         int batchSize = 1000;
 41         try {
 42             // 连接
 43             connector.connect();
 44             // 监听表
 45             connector.subscribe(table);
 46             connector.rollback();
 47             // 一直循环监听
 48             while (true) {
 49                 // 获取指定数量的数据
 50                 Message message = connector.getWithoutAck(batchSize);
 51                 long batchId = message.getId();
 52                 if(-1 != batchId && 0 != message.getEntries().size()) {
 53                     printEntry(message.getEntries());
 54                 }
 55                 // 提交确认
 56                 connector.ack(batchId);
 57             }
 58         } finally {
 59             connector.disconnect();
 60         }
 61     }
 62
 63     /**
 64      * 打印具体变化
 65      * @param entrys
 66      */
 67     private void printEntry(List<CanalEntry.Entry> entrys) {
 68         for (CanalEntry.Entry entry : entrys) {
 69             if (CanalEntry.EntryType.TRANSACTIONBEGIN.equals(entry.getEntryType()) || CanalEntry.EntryType.TRANSACTIONEND.equals(entry.getEntryType())) {
 70                 continue;
 71             }
 72
 73             CanalEntry.RowChange rowChage = null;
 74             try {
 75                 rowChage = CanalEntry.RowChange.parseFrom(entry.getStoreValue());
 76             } catch (Exception e) {
 77                 throw new RuntimeException("ERROR ## parser of eromanga-event has an error , data:" + entry.toString(),
 78                         e);
 79             }
 80
 81             CanalEntry.EventType eventType = rowChage.getEventType();
 82             System.out.println(String.format("================> binlog[%s:%s] , 数据库:%s,表名%s , 类型: %s",
 83                     entry.getHeader().getLogfileName(), entry.getHeader().getLogfileOffset(),
 84                     entry.getHeader().getSchemaName(), entry.getHeader().getTableName(),
 85                     eventType));
 86
 87             for (CanalEntry.RowData rowData : rowChage.getRowDatasList()) {
 88                 if (eventType == CanalEntry.EventType.DELETE) {
 89                     printColumn(rowData.getBeforeColumnsList());
 90                 } else if (eventType == CanalEntry.EventType.INSERT) {
 91                     printColumn(rowData.getAfterColumnsList());
 92                 } else {
 93                     System.out.println("-------修改之前");
 94                     printColumn(rowData.getBeforeColumnsList());
 95                     System.out.println("-------修改之后");
 96                     printColumn(rowData.getAfterColumnsList());
 97                 }
 98             }
 99         }
100     }
101
102     private void printColumn(List<CanalEntry.Column> columns) {
103         Map<String,Object> aaMap = new HashMap<>();
104         for (CanalEntry.Column column : columns) {
105             aaMap.put(column.getName(), column.getValue());
106         }
107         System.out.println( new JSONObject(aaMap).toJSONString());
108     }
109 }

新增

修改

删除

注意:拿到的值都是字符串,建议拿到id反查数据库,拿到对象再同步到自己的缓存。

原文地址:https://www.cnblogs.com/xiaostudy/p/11569750.html

时间: 2024-08-10 11:36:27

spring cloud集成canal的相关文章

Spring Cloud微服务分布式云架构 - spring cloud集成项目

摘要: Spring Cloud集成项目有很多,下面我们列举一下和Spring Cloud相关的优秀项目,我们的企业架构中用到了很多的优秀项目,说白了,也是站在巨人的肩膀上去整合的.在学习Spring Cloud之前大家必须了解一下相关项目,希望可以帮助到大家. Spring Cloud集成项目有很多,下面我们列举一下和Spring Cloud相关的优秀项目,我们的企业架构中用到了很多的优秀项目,说白了,也是站在巨人的肩膀上去整合的.在学习Spring Cloud之前大家必须了解一下相关项目,希

spring cloud微服务分布式云架构 - Spring Cloud集成项目简介

Spring Cloud集成项目有很多,下面我们列举一下和Spring Cloud相关的优秀项目,我们的企业架构中用到了很多的优秀项目,说白了,也是站在巨人的肩膀上去整合的.在学习Spring Cloud之前大家必须了解一下相关项目,希望可以帮助到大家. Spring Cloud Config 配置管理工具包,让你可以把配置放到远程服务器,集中化管理集群配置,目前支持本地存储.Git以及Subversion. Spring Cloud Bus ?事件.消息总线,用于在集群(例如,配置变化事件)中

(三)spring cloud微服务分布式云架构 - Spring Cloud集成项目简介

Spring Cloud集成项目有很多,下面我们列举一下和Spring Cloud相关的优秀项目,我们的企业架构中用到了很多的优秀项目,说白了,也是站在巨人的肩膀上去整合的.在学习Spring Cloud之前大家必须了解一下相关项目,希望可以帮助到大家. Spring Cloud Config 配置管理工具包,让你可以把配置放到远程服务器,集中化管理集群配置,目前支持本地存储.Git以及Subversion. Spring Cloud Bus ?事件.消息总线,用于在集群(例如,配置变化事件)中

Spring Cloud集成相关优质项目推荐

Spring Cloud Config 配置管理工具包,让你可以把配置放到远程服务器,集中化管理集群配置,目前支持本地存储.Git以及Subversion. Spring Cloud Bus 事件.消息总线,用于在集群(例如,配置变化事件)中传播状态变化,可与Spring Cloud Config联合实现热部署. Eureka 云端服务发现,一个基于 REST 的服务,用于定位服务,以实现云端中间层服务发现和故障转移. Hystrix 熔断器,容错管理工具,旨在通过熔断机制控制服务和第三方库的节

Spring Cloud 集成 RabbitMQ

同步 or 异步 前言:我们现在有一个用微服务架构模式开发的系统,系统里有一个商品服务和订单服务,且它们都是同步通信的. 目前我们商品服务和订单服务之间的通信方式是同步的,当业务扩大之后,如果还继续使用同步的方式进行服务之间的通信,会使得服务之间的耦合增大.例如我们登录操作可能需要同步调用用户服务.积分服务.短信服务等等,而服务之间可能又依赖别的服务,那么这样一个登录过程就会耗费不少的时间,以致用户的体验降低. 那我们在微服务架构下要如何对服务之间的通信进行解耦呢?这就需要使用到消息中间件了,消

spring cloud微服务分布式云架构集成项目

Spring Cloud集成项目有很多,下面我们列举一下和Spring Cloud相关的优秀项目,我们的企业架构中用到了很多的优秀项目,说白了,也是站在巨人的肩膀上去整合的.在学习Spring Cloud之前大家必须了解一下相关项目,希望可以帮助到大家. Spring Cloud Config 配置管理工具包,让你可以把配置放到远程服务器,集中化管理集群配置,目前支持本地存储.Git以及Subversion. Spring Cloud Bus 事件.消息总线,用于在集群(例如,配置变化事件)中传

Spring Cloud微服务分布式云架构-集成项目简介

Spring Cloud集成项目有很多,下面我们列举一下和Spring Cloud相关的优秀项目,我们的企业架构中用到了很多的优秀项目,说白了,也是站在巨人的肩膀上去整合的.在学习Spring Cloud之前大家必须了解一下相关项目,希望可以帮助到大家. Spring Cloud Config 配置管理工具包,让你可以把配置放到远程服务器,集中化管理集群配置,目前支持本地存储.Git以及Subversion. Spring Cloud Bus ?事件.消息总线,用于在集群(例如,配置变化事件)中

Spring Cloud微服务分布式云架构-集成项目

Spring Cloud集成项目有很多,下面我们列举一下和Spring Cloud相关的优秀项目,我们的企业架构中用到了很多的优秀项目,说白了,也是站在巨人的肩膀上去整合的.在学习Spring Cloud之前大家必须了解一下相关项目,希望可以帮助到大家. Spring Cloud Config 配置管理工具包,让你可以把配置放到远程服务器,集中化管理集群配置,目前支持本地存储.Git以及Subversion. Spring Cloud Bus ?事件.消息总线,用于在集群(例如,配置变化事件)中

Spring Cloud官方文档中文版-声明式Rest客户端:Feign

官方文档地址为:http://cloud.spring.io/spring-cloud-static/Dalston.SR2/#spring-cloud-feign 文中例子我做了一些测试在:http://git.oschina.net/dreamingodd/spring-cloud-preparation Declarative REST Client: Feign 声明式Rest客户端:Feign Feign is a declarative web service client. It