[Spring cloud 一步步实现广告系统] 16. 增量投送到kafka

实现增量数据索引

上一节中,我们为实现增量索引的加载做了充足的准备,使用到mysql-binlog-connector-java 开源组件来实现MySQL 的binlog监听,关于binlog的相关知识,大家可以自行网络查阅。或者可以mailto:[email protected]

本节我们将根据binlog 的数据对象,来实现增量数据的处理,我们构建广告的增量数据,其实说白了就是为了在后期能把广告投放到索引服务,实现增量数据到增量索引的生成。Let‘s code.

  • 定义一个投递增量数据的接口(接收参数为我们上一节定义的binlog日志的转换对象)
/**
 * ISender for 投递增量数据 方法定义接口
 *
 * @author <a href="mailto:[email protected]">Isaac.Zhang | 若初</a>
 */
public interface ISender {

    void sender(MysqlRowData rowData);
}
  • 创建增量索引监听器
/**
 * IncrementListener for 增量数据实现监听
 *
 * @author <a href="mailto:[email protected]">Isaac.Zhang | 若初</a>
 * @since 2019/6/27
 */
@Slf4j
@Component
public class IncrementListener implements Ilistener {

    private final AggregationListener aggregationListener;

    @Autowired
    public IncrementListener(AggregationListener aggregationListener) {
        this.aggregationListener = aggregationListener;
    }

    //根据名称选择要注入的投递方式
    @Resource(name = "indexSender")
    private ISender sender;

    /**
     * 标注为 {@link PostConstruct},
     * 即表示在服务启动,Bean完成初始化之后,立刻初始化
     */
    @Override
    @PostConstruct
    public void register() {
        log.info("IncrementListener register db and table info.");
        Constant.table2db.forEach((tb, db) -> aggregationListener.register(db, tb, this));
    }

    @Override
    public void onEvent(BinlogRowData eventData) {
        TableTemplate table = eventData.getTableTemplate();
        EventType eventType = eventData.getEventType();

        //包装成最后需要投递的数据
        MysqlRowData rowData = new MysqlRowData();
        rowData.setTableName(table.getTableName());
        rowData.setLevel(eventData.getTableTemplate().getLevel());
        //将EventType转为OperationTypeEnum
        OperationTypeEnum operationType = OperationTypeEnum.convert(eventType);
        rowData.setOperationTypeEnum(operationType);

        //获取模版中该操作对应的字段列表
        List<String> fieldList = table.getOpTypeFieldSetMap().get(operationType);
        if (null == fieldList) {
            log.warn("{} not support for {}.", operationType, table.getTableName());
            return;
        }

        for (Map<String, String> afterMap : eventData.getAfter()) {
            Map<String, String> _afterMap = new HashMap<>();
            for (Map.Entry<String, String> entry : afterMap.entrySet()) {
                String colName = entry.getKey();
                String colValue = entry.getValue();

                _afterMap.put(colName, colValue);
            }

            rowData.getFieldValueMap().add(_afterMap);
        }
        sender.sender(rowData);
    }
}
开启binlog监听
  • 首先来配置监听binlog的数据库连接信息
adconf:
  mysql:
    host: 127.0.0.1
    port: 3306
    username: root
    password: 12345678
    binlogName: ""
    position: -1 # 从当前位置开始监听

编写配置类:

/**
 * BinlogConfig for 定义监听Binlog的配置信息
 *
 * @author <a href="mailto:[email protected]">Isaac.Zhang | 若初</a>
 */
@Component
@ConfigurationProperties(prefix = "adconf.mysql")
@Data
@AllArgsConstructor
@NoArgsConstructor
public class BinlogConfig {
    private String host;
    private Integer port;
    private String username;
    private String password;
    private String binlogName;
    private Long position;
}

在我们实现 监听binlog那节,我们实现了一个自定义client CustomBinlogClient,需要实现binlog的监听,这个监听的客户端就必须是一个独立运行的线程,并且要在程序启动的时候进行监听,我们来实现运行当前client的方式,这里我们会使用到一个新的Runnerorg.springframework.boot.CommandLineRunner,let‘s code.

@Slf4j
@Component
public class BinlogRunner implements CommandLineRunner {

    @Autowired
    private CustomBinlogClient binlogClient;

    @Override
    public void run(String... args) throws Exception {
        log.info("BinlogRunner is running...");
        binlogClient.connect();
    }
}
增量数据投递

在binlog监听的过程中,我们看到针对于int, String 这类数据字段,mysql的记录是没有问题的,但是针对于时间类型,它被格式化成了字符串类型:Fri Jun 21 15:07:53 CST 2019

--------Insert-----------
WriteRowsEventData{tableId=91, includedColumns={0, 1, 2, 3, 4, 5, 6, 7}, rows=[
[10, 11, ad unit test binlog, 1, 0, 1236.7655, Thu Jun 27 08:00:00 CST 2019, Thu Jun 27 08:00:00 CST 2019]
--------Update-----------
UpdateRowsEventData{tableId=81, includedColumnsBeforeUpdate={0, 1, 2, 3, 4, 5}, includedColumns={0, 1, 2, 3, 4, 5}, rows=[
    {before=[10, Isaac Zhang, 2D3ABB6F2434109A105170FB21D00453, 0, Fri Jun 21 15:07:53 CST 2019, Fri Jun 21 15:07:53 CST 2019], after=[10, Isaac Zhang, 2D3ABB6F2434109A105170FB21D00453, 1, Fri Jun 21 15:07:53 CST 2019, Fri Jun 21 15:07:53 CST 2019]}

对于这个时间格式,我们需要关注2点信息:

  • CST,这个时间格式会比我们的时间+ 8h(中国标准时间 China Standard Time UT+8:00)
  • 需要对这个日期进行解释处理

当然,我们也可以通过设置mysql的日期格式来改变该行为,在此,我们通过编码来解析该时间格式:

  /**
   * Thu Jun 27 08:00:00 CST 2019
   */
  public static Date parseBinlogString2Date(String dateString) {
      try {
          DateFormat dateFormat = new SimpleDateFormat(
                  "EEE MMM dd HH:mm:ss zzz yyyy",
                  Locale.US
          );
          return DateUtils.addHours(dateFormat.parse(dateString), -8);

      } catch (ParseException ex) {
          log.error("parseString2Date error:{}", dateString);
          return null;
      }
  }

因为我们在定义索引的时候,是根据表之间的层级关系(Level)来设定的,根据代码规范,不允许出现Magic Number, 因此我们定义一个数据层级枚举,来表达数据层级。

/**
 * AdDataLevel for 广告数据层级
 *
 * @author <a href="mailto:[email protected]">Isaac.Zhang | 若初</a>
 */
@Getter
public enum AdDataLevel {

    LEVEL2("2", "level 2"),
    LEVEL3("3", "level 3"),
    LEVEL4("4", "level 4");

    private String level;
    private String desc;

    AdDataLevel(String level, String desc) {
        this.level = level;
        this.desc = desc;
    }
}
实现数据投递

因为增量数据可以投递到不同的位置以及用途,我们之前实现了一个投递接口com.sxzhongf.ad.sender.ISender,接下来我们实现一个投递类:

@Slf4j
@Component("indexSender")
public class IndexSender implements ISender {

    /**
     * 根据广告级别,投递Binlog数据
     */
    @Override
    public void sender(MysqlRowData rowData) {
        if (AdDataLevel.LEVEL2.getLevel().equals(rowData.getLevel())) {
            Level2RowData(rowData);
        } else if (AdDataLevel.LEVEL3.getLevel().equals(rowData.getLevel())) {
            Level3RowData(rowData);
        } else if (AdDataLevel.LEVEL4.getLevel().equals(rowData.getLevel())) {
            Level4RowData(rowData);
        } else {
            log.error("Binlog MysqlRowData error: {}", JSON.toJSONString(rowData));
        }
    }

    private void Level2RowData(MysqlRowData rowData) {

        if (rowData.getTableName().equals(Constant.AD_PLAN_TABLE_INFO.TABLE_NAME)) {
            List<AdPlanTable> planTables = new ArrayList<>();

            for (Map<String, String> fieldValueMap : rowData.getFieldValueMap()) {
                AdPlanTable planTable = new AdPlanTable();
                //Map的第二种循环方式
                fieldValueMap.forEach((k, v) -> {
                    switch (k) {
                        case Constant.AD_PLAN_TABLE_INFO.COLUMN_PLAN_ID:
                            planTable.setPlanId(Long.valueOf(v));
                            break;
                        case Constant.AD_PLAN_TABLE_INFO.COLUMN_USER_ID:
                            planTable.setUserId(Long.valueOf(v));
                            break;
                        case Constant.AD_PLAN_TABLE_INFO.COLUMN_PLAN_STATUS:
                            planTable.setPlanStatus(Integer.valueOf(v));
                            break;
                        case Constant.AD_PLAN_TABLE_INFO.COLUMN_START_DATE:
                            planTable.setStartDate(CommonUtils.parseBinlogString2Date(v));
                            break;
                        case Constant.AD_PLAN_TABLE_INFO.COLUMN_END_DATE:
                            planTable.setEndDate(CommonUtils.parseBinlogString2Date(v));
                            break;
                    }
                });
                planTables.add(planTable);
            }

            //投递推广计划
            planTables.forEach(p -> AdLevelDataHandler.handleLevel2Index(p, rowData.getOperationTypeEnum()));
        } else if (rowData.getTableName().equals(Constant.AD_CREATIVE_TABLE_INFO.TABLE_NAME)) {
            List<AdCreativeTable> creativeTables = new LinkedList<>();

            rowData.getFieldValueMap().forEach(afterMap -> {
                AdCreativeTable creativeTable = new AdCreativeTable();
                afterMap.forEach((k, v) -> {
                    switch (k) {
                        case Constant.AD_CREATIVE_TABLE_INFO.COLUMN_CREATIVE_ID:
                            creativeTable.setAdId(Long.valueOf(v));
                            break;
                        case Constant.AD_CREATIVE_TABLE_INFO.COLUMN_TYPE:
                            creativeTable.setType(Integer.valueOf(v));
                            break;
                        case Constant.AD_CREATIVE_TABLE_INFO.COLUMN_MATERIAL_TYPE:
                            creativeTable.setMaterialType(Integer.valueOf(v));
                            break;
                        case Constant.AD_CREATIVE_TABLE_INFO.COLUMN_HEIGHT:
                            creativeTable.setHeight(Integer.valueOf(v));
                            break;
                        case Constant.AD_CREATIVE_TABLE_INFO.COLUMN_WIDTH:
                            creativeTable.setWidth(Integer.valueOf(v));
                            break;
                        case Constant.AD_CREATIVE_TABLE_INFO.COLUMN_AUDIT_STATUS:
                            creativeTable.setAuditStatus(Integer.valueOf(v));
                            break;
                        case Constant.AD_CREATIVE_TABLE_INFO.COLUMN_URL:
                            creativeTable.setAdUrl(v);
                            break;
                    }
                });
                creativeTables.add(creativeTable);
            });

            //投递广告创意
            creativeTables.forEach(c -> AdLevelDataHandler.handleLevel2Index(c, rowData.getOperationTypeEnum()));
        }
    }

    private void Level3RowData(MysqlRowData rowData) {
       ...
    }

    /**
     * 处理4级广告
     */
    private void Level4RowData(MysqlRowData rowData) {
        ...
    }
}
投放增量数据到MQ(kafka)

为了我们的数据投放更加灵活,方便数据统计,分析等系统的需求,我们来实现一个投放到消息中的接口,其他服务可以订阅当前MQ 的TOPIC来实现数据订阅。

配置文件中配置TOPIC
adconf:
  kafka:
    topic: ad-search-mysql-data

--------------------------------------
/**
 * KafkaSender for 投递Binlog增量数据到kafka消息队列
 *
 * @author <a href="mailto:[email protected]">Isaac.Zhang | 若初</a>
 * @since 2019/7/1
 */
@Component(value = "kafkaSender")
public class KafkaSender implements ISender {

    @Value("${adconf.kafka.topic}")
    private String topic;

    @Autowired
    private KafkaTemplate kafkaTemplate;

    /**
     * 发送数据到kafka队列
     */
    @Override
    public void sender(MysqlRowData rowData) {
        kafkaTemplate.send(
                topic, JSON.toJSONString(rowData)
        );
    }

    /**
     * 测试消费kafka消息
     */
    @KafkaListener(topics = {"ad-search-mysql-data"}, groupId = "ad-search")
    public void processMysqlRowData(ConsumerRecord<?, ?> record) {
        Optional<?> kafkaMsg = Optional.ofNullable(record.value());
        if (kafkaMsg.isPresent()) {
            Object message = kafkaMsg.get();
            MysqlRowData rowData = JSON.parseObject(
                    message.toString(),
                    MysqlRowData.class
            );
            System.out.println("kafka process MysqlRowData: " + JSON.toJSONString(rowData));
            //sender.sender();
        }

    }
}

原文地址:https://blog.51cto.com/1917331/2428435

时间: 2024-10-08 22:34:01

[Spring cloud 一步步实现广告系统] 16. 增量投送到kafka的相关文章

[Spring cloud 一步步实现广告系统] 11. 使用Feign实现微服务调用

上一节我们使用了Ribbon(基于Http/Tcp)进行微服务的调用,Ribbon的调用比较简单,通过Ribbon组件对请求的服务进行拦截,通过Eureka Server 获取到服务实例的IP:Port,然后再去调用API.本节课我们使用更简单的方式来实现,使用声明式的Web服务客户端Feign,我们只需要使用Feign来声明接口,利用注解来进行配置就可以使用了,是不是很简单?实际工作中,我们也只会用到Feign来进行服务之间的调用(大多数).接下来,我们来实例操作一把. 为了代码的重用性,我们

[Spring cloud 一步步实现广告系统] 配置项目结构 &amp; 实现Eureka服务

父项目管理 首先,我们在创建投放系统之前,先看一下我们的工程结构: mscx-ad-sponsor就是我们的广告投放系统.如上结构,我们需要首先创建一个Parent Project mscx-ad 来编写父项目的pom,来管理我们的统一依赖信息. <?xml version="1.0" encoding="UTF-8"?> <project xmlns="http://maven.apache.org/POM/4.0.0" xm

[Spring cloud 一步步实现广告系统] 业务架构分析

什么是广告系统? 主要包含: 广告主投放广告的<广告投放系统> 媒体方(广告展示媒介-<地铁广告屏幕>)检索广告用的<广告检索系统> 广告计费系统(按次,曝光量等等) 报表系统 Etc. 使用技能栈 JDK1.8 MySQL 8+ Maven 3+ Spring cloud Greenwich.SR2 Eureka Zuul / gateway Feign ... Spring boot 2.1.5 Kafka 2.2.0 MySQL Binlog 项目结构 项目架构

[Spring cloud 一步步实现广告系统] 7. 中期总结回顾

在前面的过程中,我们创建了4个project: 服务发现 我们使用Eureka 作为服务发现组件,学习了Eureka Server,Eureka Client的使用. Eureka Server 加依赖 <dependency> <groupId>org.springframework.cloud</groupId> <!--<artifactId>spring-cloud-netflix-eureka-server</artifactId>

[Spring cloud 一步步实现广告系统] 21. 系统错误汇总

广告系统学习过程中问题答疑 博客园 Eureka集群启动报错 Answer 因为Eureka在集群启动过程中,会连接集群中其他的机器进行数据同步,在这个过程中,如果别的服务还没有启动完成,就会出现Connection refused: connecterror,当其他节点启动完成之后,报错就会消失. AdSearch 服务启动报错 2019-08-16 10:27:57.038 ERROR 73180 --- [ main] o.s.boot.SpringApplication : Applic

[Spring cloud 一步步实现广告系统] 22. 广告系统回顾总结

到目前为止,我们整个初级广告检索系统就初步开发完成了,我们来整体回顾一下我们的广告系统. 整个广告系统编码结构如下: 1.mscx-ad 父模块 主要是为了方便我们项目的统一管理 2.mscx-ad-db 这个模块主要有2个作用,本身只应该作为数据库脚本管理package来使用,但是我们在生成索引文件的过程中,为了方便,我就直接将导出全量索引的json文件生成也写在了该项目中. 主要目的还是通过flyway进行数据库脚本的管理. 3.mscx-ad-common 这个主要是一些通用工具类的存放

[Spring cloud 一步步实现广告系统] 12. 广告索引介绍

索引设计介绍 在我们广告系统中,为了我们能更快的拿到我们想要的广告数据,我们需要对广告数据添加类似于数据库index一样的索引结构,分两大类:正向索引和倒排索引. 正向索引 通过唯一键/主键生成与对象的映射关系. 比如,我们从数据库中查询数据的时候,根据数据主键ID查询当前记录,其实就是一个正向索引的过程. 根据这个描述,很明显,我们的正向索引适用于推广计划,推广单元 和 创意这几张表的数据上,因为广告检索的请求信息,不可能是请求具体的计划或推广单元,它的检索请求一定是限制条件. 倒排索引 也叫

[Spring cloud 一步步实现广告系统] 6. Service实现&amp;Zuul配置&amp;Test

DAO层设计实现 这里我们使用Spring DATA JPA来实现数据库操作,当然大家也可以使用Mybatis,都是一样的,我们依然以用户表操作为例: /** * AdUserRepository for 用户数据库操作接口 * 继承自JpaRepository<AdUser, Long>,第一个参数AdUser代表当前要操作的实体类的class定义,第二个参数Long表示该类的主键类型 * * @author <a href="mailto:[email protected]

[Spring cloud 一步步实现广告系统] 15. 使用开源组件监听Binlog 实现增量索引准备

MySQL Binlog简介 什么是binlog? 一个二进制日志,用来记录对数据发生或潜在发生更改的SQL语句,并以而进行的形式保存在磁盘中. binlog 的作用? 最主要有3个用途: 数据复制(主从同步) Mysql 的Master-Slave协议,让Slave可以通过监听binlog实现数据复制,达到数据一致性目的 数据恢复 通过mysqlbinlog工具恢复数据 增量备份 Binlog 变量 log_bin (Binlog 开关,使用show variables like 'log_b