[Spring cloud 一步步实现广告系统] 14. 全量索引代码实现

上一节我们实现了索引基本操作的类以及索引缓存工具类,本小节我们开始实现加载全量索引数据,在加载全量索引数据之前,我们需要先将数据库中的表数据导出到一份文件中。Let‘s code.

1.首先定义一个常量类,用来存储导出文件存储的目录和文件名称

因为我们导出的文件需要在搜索服务中使用到,因此,我们将文件名 & 目录以及导出对象的信息编写在mscx-ad-commom项目中。

public class FileConstant {
    public static final String DATA_ROOT_DIR = "/Users/xxx/Documents/promotion/data/mysql/";

    //各个表数据的存储文件名
    public static final String AD_PLAN = "ad_plan.data";
    public static final String AD_UNIT = "ad_unit.data";
    public static final String AD_CREATIVE = "ad_creative.data";
    public static final String AD_CREATIVE_RELARION_UNIT = "ad_creative_relation_unit.data";
    public static final String AD_UNIT_HOBBY = "ad_unit_hobby.data";
    public static final String AD_UNIT_DISTRICT = "ad_unit_district.data";
    public static final String AD_UNIT_KEYWORD = "ad_unit_keyword.data";
}

2.定义索引对象导出的字段信息,依然用Ad_Plan为例。

/**
 * AdPlanTable for 需要导出的表字段信息 => 是搜索索引字段一一对应
 *
 * @author <a href="mailto:[email protected]">Isaac.Zhang | 若初</a>
 */
@Data
@AllArgsConstructor
@NoArgsConstructor
public class AdPlanTable {
    private Long planId;
    private Long userId;
    private Integer planStatus;
    private Date startDate;
    private Date endDate;
}

3.导出文件服务实现

同样,最好的实现方式就是将导出服务作为一个子工程来独立运行,我这里直接实现在了mscx-ad-db项目中

  • 定义一个空接口,为了符合我们的编码规范
/**
 * IExportDataService for 导出数据库广告索引初始化数据
 *
 * @author <a href="mailto:[email protected]">Isaac.Zhang | 若初</a>
 */
public interface IExportDataService {
}
  • 实现service
@Slf4j
@Service
public class ExportDataServiceImpl implements IExportDataService {

    @Autowired
    private AdPlanRepository planRepository;

    /**
     * 导出 {@code AdPlan} from DB to File
     *
     * @param fileName 文件名称
     */
    public void exportAdPlanTable(String fileName) {
        List<AdPlan> planList = planRepository.findAllByPlanStatus(CommonStatus.VALID.getStatus());
        if (CollectionUtils.isEmpty(planList)) {
            return;
        }

        List<AdPlanTable> planTables = new ArrayList<>();
        planList.forEach(item -> planTables.add(
                new AdPlanTable(
                        item.getPlanId(),
                        item.getUserId(),
                        item.getPlanStatus(),
                        item.getStartDate(),
                        item.getEndDate()
                )
        ));

        //将数据写入文件
        Path path = Paths.get(fileName);
        try (BufferedWriter writer = Files.newBufferedWriter(path)) {
            for (AdPlanTable adPlanTable : planTables) {
                writer.write(JSON.toJSONString(adPlanTable));
                writer.newLine();
            }
            writer.close();
        } catch (IOException e) {
            e.printStackTrace();
            log.error("export AdPlanTable Exception!");
        }
    }
}
  • 实现Controller,提供操作入口
@Slf4j
@Controller
@RequestMapping("/export")
public class ExportDataController {
    private final ExportDataServiceImpl exportDataService;

    @Autowired
    public ExportDataController(ExportDataServiceImpl exportDataService) {
        this.exportDataService = exportDataService;
    }

    @GetMapping("/export-plan")
    public CommonResponse exportAdPlans() {

        exportDataService.exportAdPlanTable(String.format("%s%s", FileConstant.DATA_ROOT_DIR, FileConstant.AD_PLAN));
        return new CommonResponse();
    }
}
  • 结果文件内容如下,每一行都代表了一个推广计划
{"endDate":1561438800000,"planId":10,"planStatus":1,"startDate":1561438800000,"userId":10}
{"endDate":1561438800000,"planId":11,"planStatus":1,"startDate":1561438800000,"userId":10}
根据文件内容构建索引

我们在之前编写索引服务的时候,创建了一些索引需要使用的实体对象类,比如构建推广计划索引的时候,需要使用到的实体对象com.sxzhongf.ad.index.adplan.AdPlanIndexObject,可是呢,我们在上一节实现索引导出的时候,实体对象又是common 包中的com.sxzhongf.ad.common.export.table.AdPlanTable,读取出来文件中的数据只能反序列化为JSON.parseObject(p, AdPlanTable.class),我们需要将2个对象做相互映射才能创建索引信息。

1.首先我们定义一个操作类型枚举,代表我们每一次的操作类型(也需要对应到后期binlog监听的操作类型

public enum OperationTypeEnum {
    ADD,
    UPDATE,
    DELETE,
    OTHER;

    public static OperationTypeEnum convert(EventType type) {
        switch (type) {
            case EXT_WRITE_ROWS:
                return ADD;
            case EXT_UPDATE_ROWS:
                return UPDATE;
            case EXT_DELETE_ROWS:
                return DELETE;
            default:
                return OTHER;
        }
    }
}

2.因为全量索引的加载和增量索引加载的本质是一样的,全量索引其实就是一种特殊的增量索引,为了代码的可复用,我们创建统一的类来操作索引。

/**
 * AdLevelDataHandler for 通用处理索引类
 * 1. 索引之间存在层级划分,也就是相互之间拥有依赖关系的划分
 * 2. 加载全量索引其实是增量索引 "添加"的一种特殊实现
 *
 * @author <a href="mailto:[email protected]">Isaac.Zhang | 若初</a>
 */
@Slf4j
public class AdLevelDataHandler {

    /**
     * 实现广告推广计划的第二层级索引实现。
     * (第一级为用户层级,但是用户层级不参与索引,所以从level 2开始)
     * 第二层级的索引是表示 不依赖于其他索引,但是可被其他索引所依赖
     */
    public static void handleLevel2Index(AdPlanTable adPlanTable, OperationTypeEnum type) {
        // 对象转换
        AdPlanIndexObject planIndexObject = new AdPlanIndexObject(
                adPlanTable.getPlanId(),
                adPlanTable.getUserId(),
                adPlanTable.getPlanStatus(),
                adPlanTable.getStartDate(),
                adPlanTable.getEndDate()
        );

        //调用通用方法处理,使用IndexDataTableUtils#of来获取索引的实现类bean
        handleBinlogEvent(
                    // 在前一节我们实现了一个索引工具类,来获取注入的bean对象
                IndexDataTableUtils.of(AdPlanIndexAwareImpl.class),
                planIndexObject.getPlanId(),
                planIndexObject,
                type
        );
    }

    /**
     * 处理全量索引和增量索引的通用处理方式
     * K,V代表索引的键和值
     *
     * @param index 索引实现代理类父级
     * @param key   键
     * @param value 值
     * @param type  操作类型
     */
    private static <K, V> void handleBinlogEvent(IIndexAware<K, V> index, K key, V value, OperationTypeEnum type) {
        switch (type) {
            case ADD:
                index.add(key, value);
                break;
            case UPDATE:
                index.update(key, value);
                break;
            case DELETE:
                index.delete(key, value);
                break;
            default:
                break;
        }
    }
}

3.读取文件实现全量索引加载。

因为我们文件加载之前需要依赖另一个组件,也就是我们的索引工具类,需要添加上@DependsOn("indexDataTableUtils"),全量索引在系统启动的时候就需要加载,我们需要添加@PostConstruct来实现初始化加载,被@PostConstruct修饰的方法会在服务器加载Servlet的时候运行,并且只会被服务器调用一次。

@Component
@DependsOn("indexDataTableUtils")
public class IndexFileLoader {

    /**
     * 服务启动时,执行全量索引加载
     */
    @PostConstruct
    public void init() {
        //加载 推广计划
        List<String> adPlanStrings = loadExportedData(String.format("%s%s",
                FileConstant.DATA_ROOT_DIR, FileConstant.AD_PLAN
        ));
        adPlanStrings.forEach(p -> AdLevelDataHandler.handleLevel2Index(
                JSON.parseObject(p, AdPlanTable.class), OperationTypeEnum.ADD
        ));
    }

    /**
     * <h3>读取全量索引加载需要的文件</h3>
     *
     * @param fileName 文件名称
     * @return 文件行数据
     */
    private List<String> loadExportedData(String fileName) {
        try (BufferedReader reader = Files.newBufferedReader(Paths.get(fileName))) {
            return reader.lines().collect(Collectors.toList());
        } catch (IOException e) {
            throw new RuntimeException(e.getMessage());
        }
    }
}

Tips

在实现初始化加载全量索引的过程中,一定要保证数据加载的顺序问题,因为不同的数据有可能存在着相互依赖的关联关系,一旦顺序写错,会造成程序报错问题。

原文地址:https://blog.51cto.com/1917331/2427900

时间: 2024-10-11 17:35:49

[Spring cloud 一步步实现广告系统] 14. 全量索引代码实现的相关文章

[Spring cloud 一步步实现广告系统] 11. 使用Feign实现微服务调用

上一节我们使用了Ribbon(基于Http/Tcp)进行微服务的调用,Ribbon的调用比较简单,通过Ribbon组件对请求的服务进行拦截,通过Eureka Server 获取到服务实例的IP:Port,然后再去调用API.本节课我们使用更简单的方式来实现,使用声明式的Web服务客户端Feign,我们只需要使用Feign来声明接口,利用注解来进行配置就可以使用了,是不是很简单?实际工作中,我们也只会用到Feign来进行服务之间的调用(大多数).接下来,我们来实例操作一把. 为了代码的重用性,我们

[Spring cloud 一步步实现广告系统] 22. 广告系统回顾总结

到目前为止,我们整个初级广告检索系统就初步开发完成了,我们来整体回顾一下我们的广告系统. 整个广告系统编码结构如下: 1.mscx-ad 父模块 主要是为了方便我们项目的统一管理 2.mscx-ad-db 这个模块主要有2个作用,本身只应该作为数据库脚本管理package来使用,但是我们在生成索引文件的过程中,为了方便,我就直接将导出全量索引的json文件生成也写在了该项目中. 主要目的还是通过flyway进行数据库脚本的管理. 3.mscx-ad-common 这个主要是一些通用工具类的存放

[Spring cloud 一步步实现广告系统] 12. 广告索引介绍

索引设计介绍 在我们广告系统中,为了我们能更快的拿到我们想要的广告数据,我们需要对广告数据添加类似于数据库index一样的索引结构,分两大类:正向索引和倒排索引. 正向索引 通过唯一键/主键生成与对象的映射关系. 比如,我们从数据库中查询数据的时候,根据数据主键ID查询当前记录,其实就是一个正向索引的过程. 根据这个描述,很明显,我们的正向索引适用于推广计划,推广单元 和 创意这几张表的数据上,因为广告检索的请求信息,不可能是请求具体的计划或推广单元,它的检索请求一定是限制条件. 倒排索引 也叫

[Spring cloud 一步步实现广告系统] 配置项目结构 &amp; 实现Eureka服务

父项目管理 首先,我们在创建投放系统之前,先看一下我们的工程结构: mscx-ad-sponsor就是我们的广告投放系统.如上结构,我们需要首先创建一个Parent Project mscx-ad 来编写父项目的pom,来管理我们的统一依赖信息. <?xml version="1.0" encoding="UTF-8"?> <project xmlns="http://maven.apache.org/POM/4.0.0" xm

[Spring cloud 一步步实现广告系统] 业务架构分析

什么是广告系统? 主要包含: 广告主投放广告的<广告投放系统> 媒体方(广告展示媒介-<地铁广告屏幕>)检索广告用的<广告检索系统> 广告计费系统(按次,曝光量等等) 报表系统 Etc. 使用技能栈 JDK1.8 MySQL 8+ Maven 3+ Spring cloud Greenwich.SR2 Eureka Zuul / gateway Feign ... Spring boot 2.1.5 Kafka 2.2.0 MySQL Binlog 项目结构 项目架构

[Spring cloud 一步步实现广告系统] 7. 中期总结回顾

在前面的过程中,我们创建了4个project: 服务发现 我们使用Eureka 作为服务发现组件,学习了Eureka Server,Eureka Client的使用. Eureka Server 加依赖 <dependency> <groupId>org.springframework.cloud</groupId> <!--<artifactId>spring-cloud-netflix-eureka-server</artifactId>

[Spring cloud 一步步实现广告系统] 21. 系统错误汇总

广告系统学习过程中问题答疑 博客园 Eureka集群启动报错 Answer 因为Eureka在集群启动过程中,会连接集群中其他的机器进行数据同步,在这个过程中,如果别的服务还没有启动完成,就会出现Connection refused: connecterror,当其他节点启动完成之后,报错就会消失. AdSearch 服务启动报错 2019-08-16 10:27:57.038 ERROR 73180 --- [ main] o.s.boot.SpringApplication : Applic

[Spring cloud 一步步实现广告系统] 6. Service实现&amp;Zuul配置&amp;Test

DAO层设计实现 这里我们使用Spring DATA JPA来实现数据库操作,当然大家也可以使用Mybatis,都是一样的,我们依然以用户表操作为例: /** * AdUserRepository for 用户数据库操作接口 * 继承自JpaRepository<AdUser, Long>,第一个参数AdUser代表当前要操作的实体类的class定义,第二个参数Long表示该类的主键类型 * * @author <a href="mailto:[email protected]

[Spring cloud 一步步实现广告系统] 3. 网关路由

Zuul(Router and Filter) WIKI: 传送门 作用 认证,鉴权(Authentication/Security) 预判(Insights) 压力测试(Stress Testing) 灰度/金丝雀测试(Canary Testing) 动态路由(Dynamic Routing) 服务迁移(Service Migration) 降低负载(Load Shedding) 静态响应处理(Static Response handling) 主动/主动交换管理(Active/Active