040 RabbitMq及数据同步02

1.Spring AMQP

(1)简介

Spring有很多不同的项目,其中就有对AMQP的支持:

Spring AMQP的页面:http://spring.io/projects/spring-amqp

注意:Spring-amqp是对AMQP协议的抽象实现,而spring-rabbit 是对协议的具体实现,也是目前的唯一实现。底层使用的就是RabbitMQ。

(2)依赖和配置

添加AMQP的启动器:

<dependency>
    <groupId>org.springframework.boot</groupId>
    <artifactId>spring-boot-starter-amqp</artifactId>
</dependency>

application.yml中添加RabbitMQ地址:

spring:
  rabbitmq:
    host: 127.0.0.1
    username: guest
    password: guest
    virtual-host: /

(3)监听者

在SpringAmqp中,对消息的消费者进行了封装和抽象,一个普通的JavaBean中的普通方法,只要通过简单的注解,就可以成为一个消费者。

@Component
public class Listener {

    @RabbitListener(bindings = @QueueBinding(
            value = @Queue(value = "spring.test.queue", durable = "true"),
            exchange = @Exchange(
                    value = "spring.test.exchange",
                    ignoreDeclarationExceptions = "true",
                    type = ExchangeTypes.TOPIC
            ),
            key = {"#.#"}))
    public void listen(String msg){
        System.out.println("接收到消息:" + msg);
    }
}
  • @Componet:类上的注解,注册到Spring容器
  • @RabbitListener:方法上的注解,声明这个方法是一个消费者方法,需要指定下面的属性:
    • bindings:指定绑定关系,可以有多个。值是@QueueBinding的数组。@QueueBinding包含下面属性:

      • value:这个消费者关联的队列。值是@Queue,代表一个队列
      • exchange:队列所绑定的交换机,值是@Exchange类型
      • key:队列和交换机绑定的RoutingKey

类似listen这样的方法在一个类中可以写多个,就代表多个消费者。

(4)AmqpTemplate

Spring最擅长的事情就是封装,把他人的框架进行封装和整合。

Spring为AMQP提供了统一的消息处理模板:AmqpTemplate,非常方便的发送消息,其发送方法:

红框圈起来的是比较常用的3个方法,分别是:

  • 指定交换机、RoutingKey和消息体
  • 指定消息
  • 指定RoutingKey和消息,会向默认的交换机发送消息

(5)测试代码

@RunWith(SpringRunner.class)
@SpringBootTest(classes = Application.class)
public class MqDemo {

    @Autowired
    private AmqpTemplate amqpTemplate;

    @Test
    public void testSend() throws InterruptedException {
        String msg = "hello, Spring boot amqp";
        this.amqpTemplate.convertAndSend("spring.test.exchange","a.b", msg);
        // 等待10秒后再结束
        Thread.sleep(10000);
    }
}

运行后查看日志:

2.搜索服务、商品静态页的数据同步

(1)思路分析

<1>发送方:商品微服务

  • 什么时候发?

    当商品服务对商品进行写操作:增、删、改的时候,需要发送一条消息,通知其它服务。

  • 发送什么内容?

    对商品的增删改时其它服务可能需要新的商品数据,但是如果消息内容中包含全部商品信息,数据量太大,而且并不是每个服务都需要全部的信息。因此我们只发送商品id,其它服务可以根据id查询自己需要的信息。

<2>接收方:搜索微服务、静态页微服务

接收消息后如何处理?

  • 搜索微服务:

    • 增/改:添加新的数据到索引库
    • 删:删除索引库数据
  • 静态页微服务:
    • 增/改:创建新的静态页
    • 删:删除原来的静态页

(2)商品服务发送消息

我们先在商品微服务leyou-item-service中实现发送消息

<1>引入依赖

<dependency>
    <groupId>org.springframework.boot</groupId>
    <artifactId>spring-boot-starter-amqp</artifactId>
</dependency>

<2>配置文件

我们在application.yml中添加一些有关RabbitMQ的配置:

spring:
  rabbitmq:
      host: 127.0.0.1
      username: guest
      password: guest
      virtual-host: /
      template:
        exchange: leyou.item.exchange
      publisher-confirms: true
  • template:有关AmqpTemplate的配置

    • exchange:缺省的交换机名称,此处配置后,发送消息如果不指定交换机就会使用这个
  • publisher-confirms:生产者确认机制,确保消息会正确发送,如果发送失败会有错误回执,从而触发重试

<3>改造GoodsService

在GoodsService中封装一个发送消息到mq的方法:(需要注入AmqpTemplate模板)


@Autowiredprivate AmqpTemplate amqpTemplate;
/**
     * 利用rabbitmq发送消息
     * @param id
     * @param type
     */
    private void sendMessage(Long id, String type){
        // 发送消息
        try {
            this.amqpTemplate.convertAndSend("item." + type, id);
        } catch (Exception e) {
            //logger.error("{}商品消息发送异常,商品id:{}", type, id, e);
        }
    }

这里没有指定交换机,因此默认发送到了配置中的:leyou.item.exchange

注意:这里要把所有异常都try起来,不能让消息的发送影响到正常的业务逻辑

然后在新增的时候调用:

/**
     * 新增商品
     * @param spuBo
     */
    @Override
    @Transactional  //添加事务
    public void saveGoods(SpuBo spuBo) {
        // 01 新增spu
        // 设置默认字段
        spuBo.setId(null);
        spuBo.setSaleable(true);  //设置是否可售
        spuBo.setValid(true);
        spuBo.setCreateTime(new Date());  //设置创建时间
        spuBo.setLastUpdateTime(spuBo.getCreateTime()); //设置更新时间
        this.spuMapper.insertSelective(spuBo);

        // 02 新增spuDetail
        SpuDetail spuDetail = spuBo.getSpuDetail();
        spuDetail.setSpuId(spuBo.getId());
        this.spuDetailMapper.insertSelective(spuDetail);

        saveSkuAndStock(spuBo);

        //发送rabbitmq消息
        sendMessage(spuBo.getId(),"insert");
    }

修改的时候调用:

/**
     * 更新商品
     * @param spu
     */
    @Override
    @Transactional
    public void updateGoods(SpuBo spu) {
        // 查询以前sku
        Sku sku=new Sku();
        sku.setSpuId(spu.getId());
        List<Sku> skus = this.skuMapper.select(sku);

        // 如果以前存在,则删除
        if(!CollectionUtils.isEmpty(skus)) {
            List<Long> ids = skus.stream().map(s -> s.getId()).collect(Collectors.toList());
            // 删除以前库存
            Example example = new Example(Stock.class);
            example.createCriteria().andIn("skuId", ids);
            this.stockMapper.deleteByExample(example);

            // 删除以前的sku
            Sku record = new Sku();
            record.setSpuId(spu.getId());
            this.skuMapper.delete(record);

        }
        // 新增sku和库存
        saveSkuAndStock(spu);

        // 更新spu
        spu.setLastUpdateTime(new Date());
        spu.setCreateTime(null);   //不能更新的内容,设置为null
        spu.setValid(null);
        spu.setSaleable(null);
        this.spuMapper.updateByPrimaryKeySelective(spu);

        // 更新spu详情
        this.spuDetailMapper.updateByPrimaryKeySelective(spu.getSpuDetail());

        //发送rabbitmq消息
        sendMessage(spu.getId(),"insert");

    }

(3)搜索服务接收消息

搜索服务接收到消息后要做的事情:

  • 增:添加新的数据到索引库
  • 删:删除索引库数据
  • 改:修改索引库数据

因为索引库的新增和修改方法是合二为一的,因此我们可以将这两类消息一同处理,删除另外处理。

<1>引入依赖

<dependency>
    <groupId>org.springframework.boot</groupId>
    <artifactId>spring-boot-starter-amqp</artifactId>
</dependency>

<2>添加配置

spring:
  rabbitmq:
      host: 127.0.0.1
      username: guest
      password: guest
      virtual-host: /

这里只是接收消息而不发送,所以不用配置template相关内容。

<3>编写监听器

package lucky.leyou.listener;

import lucky.leyou.service.SearchService;
import org.springframework.amqp.core.ExchangeTypes;
import org.springframework.amqp.rabbit.annotation.Exchange;
import org.springframework.amqp.rabbit.annotation.Queue;
import org.springframework.amqp.rabbit.annotation.QueueBinding;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;

@Component
public class GoodsListener {

    @Autowired
    private SearchService searchService;

    /**
     * 处理insert和update的消息
     *
     * @param id
     * @throws Exception
     */
    @RabbitListener(bindings = @QueueBinding(
            value = @Queue(value = "leyou.create.index.queue", durable = "true"),
            exchange = @Exchange(
                    value = "leyou.item.exchange",
                    ignoreDeclarationExceptions = "true",
                    type = ExchangeTypes.TOPIC),
            key = {"item.insert", "item.update"}))
    public void listenCreate(Long id) throws Exception {
        if (id == null) {
            return;
        }
        // 创建或更新索引
        this.searchService.createIndex(id);
    }

    /**
     * 处理delete的消息
     *
     * @param id
     */
    @RabbitListener(bindings = @QueueBinding(
            value = @Queue(value = "leyou.delete.index.queue", durable = "true"),
            exchange = @Exchange(
                    value = "leyou.item.exchange",
                    ignoreDeclarationExceptions = "true",
                    type = ExchangeTypes.TOPIC),
            key = "item.delete"))
    public void listenDelete(Long id) {
        if (id == null) {
            return;
        }
        // 删除索引
        this.searchService.deleteIndex(id);
    }
}

<4>编写创建和删除索引方法

这里因为要创建和删除索引,我们需要在SearchService中拓展两个方法,创建和删除索引:

public void createIndex(Long id) throws IOException {

    Spu spu = this.goodsClient.querySpuById(id);
    // 构建商品
    Goods goods = this.buildGoods(spu);

    // 保存数据到索引库
    this.goodsRepository.save(goods);
}

public void deleteIndex(Long id) {
    this.goodsRepository.deleteById(id);
}

创建索引的方法可以从之前导入数据的测试类中拷贝和改造。

原文地址:https://www.cnblogs.com/luckyplj/p/11624751.html

时间: 2024-08-03 00:23:43

040 RabbitMq及数据同步02的相关文章

039 RabbitMq及数据同步01

1.RabbitMq (1)问题引出 目前我们已经完成了商品详情和搜索系统的开发.我们思考一下,是否存在问题? 商品的原始数据保存在数据库中,增删改查都在数据库中完成. 搜索服务数据来源是索引库,如果数据库商品发生变化,索引库数据不能及时更新. 商品详情做了页面静态化,静态页面数据也不会随着数据库商品发生变化. 如果我们在后台修改了商品的价格,搜索页面和商品详情页显示的依然是旧的价格,这样显然不对.该如何解决? 这里有两种解决方案: 方案1:每当后台对商品做增删改操作,同时要修改索引库数据及静态

服务 - Sersync数据同步详细教程

Sersync数据同步 一.rsync介绍 rsync是类unix系统下的数据镜像备份工具--remote sync.一款快速增量备份工具 Remote Sync,远程同步支持本地复制,或者与其他SSH.rsync主机同步. 它的特性如下: v 可以镜像保存整个目录树和文件系统. v 可以很容易做到保持原来文件的权限.时间.软硬链接等等. v 无须特殊权限即可安装. v 快速:第一次同步时 rsync 会复制全部内容,但在下一次只传输修改过的文件.rsync 在传输数据的过程中可以实行压缩及解压

搭建中小规模集群之rsync数据同步备份

NFS重要问题 1.有关NFS客户端普通用户写NFS的问题. 1)为什么要普通用户写NFS. 2)exports加all_squash. Rsync介绍 什么是Rsync? Rsync是一款开源的.快速的.多功能的.可实现全量即增量的本地或远程数据同步备份的优秀工具.Rsync软件适用于unix.linux.windows等多种操作系统平台. Rsync简介 Rsync英文全称Remote synchronization.从软件的名称就可以看出来,Rsync具有可使本地和远程两台主机之间的数据快

solr 简单搭建 数据库数据同步(待续)

原来在别的公司负责过文档检索模块的维护(意思就是不是俺开发的啦).所以就稍微接触和研究了下文档检索. 文档检索其实是全文检索,是通过一种技术把N多文档进行一定规律的切割归类,然后创建易于搜索的索引式文件,然后搜索具有某些规律的文档时,能够通过快速定位索引,然后根据索引提供的信息精确定位到文档从而实现迅速找到文档.这个文档一般成为条目. 上家公司的时候使用的是Lucene加上Zoie实现的.lucene是apache下的开源项目,不过并不是全文检索的实现,而是一个全文检索的引擎,是一个架构,是其他

etcd安装部署及数据同步MySQL

一.etcd说明及原理 二.etcd安装部署说明 三.etcd操作说明 四.python安装etcd 五.python-etcd使用说明 六.通过脚本获取本地的信息上传到etcd 七.通过脚本将etc的数据同步到mysql 一.etcd 简介 etcd是用于共享配置和服务发现的分布式,一致的键值存储,重点是: 简单:定义明确,面向用户的API(gRPC) 安全:使用可选的客户端证书认证的自动TLS 快速:基准测试10,000写/秒 可靠:使用Raft协议来进行合理的分布式 etcd是在Go中编写

Rsync数据同步工具应用指南

1.Rsync数据同步工具应用指南 简介Rsync的特性:Rsync的工作方式:Rsync命令同步选项参数:本地主机模式示例远程RPC模式示例 简介     Rsync是一款开源的.快速的.多功能的.可实现全量及增量的本地或远程数据同步备份的优秀工具.可使本地和远程两台或多台主机之间的数据快速复制同步镜像.远程备份的功能.这个功能类似ssh自带的scp命令,但又优于scp命令的功能,scp每次都是全量拷贝,而rsync可以增量拷贝.当然,Rsync还可以在本地主机的不同分区或目录之间全量及增量的

使用Goldengate 实现Oracle for Oracle 单向数据同步

实验环境 数据源端:  host1 ip 192.168.199.163 数据目标端: host2 ip 192.168.199.104 两台机器都安装 http://lqding.blog.51cto.com/9123978/1694971 文中描述安装配置好了Goldengate . 要实现数据的同步,Oracle源端必须满足如下设置 Oracle需要运行在归档模式下 SQL> startup mount  ORACLE instance started. Total System Glob

Sql触发器调用外部程序实现数据同步

首先创建两个数据库:SyncA是数据源,SyncB是对SyncA进行同步的数据库. 在SyncA和SyncB中分别创建Source表和Target表,实际业务中,两张表的结构大多不相同.     然后创建一个类库的项目:MySync(注意项目的版本,Sql08不支持的.net 4.0及更高版本) 下面是同步程序代码: using System; using System.Data; using System.Data.Sql; using Microsoft.SqlServer.Server;

rsync+inotify-tools实时数据同步配置实战

实验环境的准备: 源服务器:10.0.0.130 目标服务器:10.0.0.139 一.在源服务器安装Rsync服务端 1.关闭SELINUX vi /etc/selinux/config #编辑防火墙配置文件 #SELINUX=enforcing #注释掉 #SELINUXTYPE=targeted #注释掉 SELINUX=disabled #增加 :wq! #保存,退出 setenforce 0  #立即生效 2.开启防火墙tcp 873端口(Rsync默认端口) vi /etc/sysc