1.Spring AMQP
(1)简介
Spring有很多不同的项目,其中就有对AMQP的支持:
Spring AMQP的页面:http://spring.io/projects/spring-amqp
注意:Spring-amqp是对AMQP协议的抽象实现,而spring-rabbit 是对协议的具体实现,也是目前的唯一实现。底层使用的就是RabbitMQ。
(2)依赖和配置
添加AMQP的启动器:
<dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-amqp</artifactId> </dependency>
在application.yml
中添加RabbitMQ地址:
spring: rabbitmq: host: 127.0.0.1 username: guest password: guest virtual-host: /
(3)监听者
在SpringAmqp中,对消息的消费者进行了封装和抽象,一个普通的JavaBean中的普通方法,只要通过简单的注解,就可以成为一个消费者。
@Component public class Listener { @RabbitListener(bindings = @QueueBinding( value = @Queue(value = "spring.test.queue", durable = "true"), exchange = @Exchange( value = "spring.test.exchange", ignoreDeclarationExceptions = "true", type = ExchangeTypes.TOPIC ), key = {"#.#"})) public void listen(String msg){ System.out.println("接收到消息:" + msg); } }
@Componet
:类上的注解,注册到Spring容器@RabbitListener
:方法上的注解,声明这个方法是一个消费者方法,需要指定下面的属性:bindings
:指定绑定关系,可以有多个。值是@QueueBinding
的数组。@QueueBinding
包含下面属性:value
:这个消费者关联的队列。值是@Queue
,代表一个队列exchange
:队列所绑定的交换机,值是@Exchange
类型key
:队列和交换机绑定的RoutingKey
类似listen这样的方法在一个类中可以写多个,就代表多个消费者。
(4)AmqpTemplate
Spring最擅长的事情就是封装,把他人的框架进行封装和整合。
Spring为AMQP提供了统一的消息处理模板:AmqpTemplate,非常方便的发送消息,其发送方法:
红框圈起来的是比较常用的3个方法,分别是:
- 指定交换机、RoutingKey和消息体
- 指定消息
- 指定RoutingKey和消息,会向默认的交换机发送消息
(5)测试代码
@RunWith(SpringRunner.class) @SpringBootTest(classes = Application.class) public class MqDemo { @Autowired private AmqpTemplate amqpTemplate; @Test public void testSend() throws InterruptedException { String msg = "hello, Spring boot amqp"; this.amqpTemplate.convertAndSend("spring.test.exchange","a.b", msg); // 等待10秒后再结束 Thread.sleep(10000); } }
运行后查看日志:
2.搜索服务、商品静态页的数据同步
(1)思路分析
<1>发送方:商品微服务
- 什么时候发?
当商品服务对商品进行写操作:增、删、改的时候,需要发送一条消息,通知其它服务。
- 发送什么内容?
对商品的增删改时其它服务可能需要新的商品数据,但是如果消息内容中包含全部商品信息,数据量太大,而且并不是每个服务都需要全部的信息。因此我们只发送商品id,其它服务可以根据id查询自己需要的信息。
<2>接收方:搜索微服务、静态页微服务
接收消息后如何处理?
- 搜索微服务:
- 增/改:添加新的数据到索引库
- 删:删除索引库数据
- 静态页微服务:
- 增/改:创建新的静态页
- 删:删除原来的静态页
(2)商品服务发送消息
我们先在商品微服务leyou-item-service
中实现发送消息
<1>引入依赖
<dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-amqp</artifactId> </dependency>
<2>配置文件
我们在application.yml中添加一些有关RabbitMQ的配置:
spring: rabbitmq: host: 127.0.0.1 username: guest password: guest virtual-host: / template: exchange: leyou.item.exchange publisher-confirms: true
- template:有关
AmqpTemplate
的配置- exchange:缺省的交换机名称,此处配置后,发送消息如果不指定交换机就会使用这个
- publisher-confirms:生产者确认机制,确保消息会正确发送,如果发送失败会有错误回执,从而触发重试
<3>改造GoodsService
在GoodsService中封装一个发送消息到mq的方法:(需要注入AmqpTemplate模板)
@Autowiredprivate AmqpTemplate amqpTemplate;
/** * 利用rabbitmq发送消息 * @param id * @param type */ private void sendMessage(Long id, String type){ // 发送消息 try { this.amqpTemplate.convertAndSend("item." + type, id); } catch (Exception e) { //logger.error("{}商品消息发送异常,商品id:{}", type, id, e); } }
这里没有指定交换机,因此默认发送到了配置中的:leyou.item.exchange
注意:这里要把所有异常都try起来,不能让消息的发送影响到正常的业务逻辑
然后在新增的时候调用:
/** * 新增商品 * @param spuBo */ @Override @Transactional //添加事务 public void saveGoods(SpuBo spuBo) { // 01 新增spu // 设置默认字段 spuBo.setId(null); spuBo.setSaleable(true); //设置是否可售 spuBo.setValid(true); spuBo.setCreateTime(new Date()); //设置创建时间 spuBo.setLastUpdateTime(spuBo.getCreateTime()); //设置更新时间 this.spuMapper.insertSelective(spuBo); // 02 新增spuDetail SpuDetail spuDetail = spuBo.getSpuDetail(); spuDetail.setSpuId(spuBo.getId()); this.spuDetailMapper.insertSelective(spuDetail); saveSkuAndStock(spuBo); //发送rabbitmq消息 sendMessage(spuBo.getId(),"insert"); }
修改的时候调用:
/** * 更新商品 * @param spu */ @Override @Transactional public void updateGoods(SpuBo spu) { // 查询以前sku Sku sku=new Sku(); sku.setSpuId(spu.getId()); List<Sku> skus = this.skuMapper.select(sku); // 如果以前存在,则删除 if(!CollectionUtils.isEmpty(skus)) { List<Long> ids = skus.stream().map(s -> s.getId()).collect(Collectors.toList()); // 删除以前库存 Example example = new Example(Stock.class); example.createCriteria().andIn("skuId", ids); this.stockMapper.deleteByExample(example); // 删除以前的sku Sku record = new Sku(); record.setSpuId(spu.getId()); this.skuMapper.delete(record); } // 新增sku和库存 saveSkuAndStock(spu); // 更新spu spu.setLastUpdateTime(new Date()); spu.setCreateTime(null); //不能更新的内容,设置为null spu.setValid(null); spu.setSaleable(null); this.spuMapper.updateByPrimaryKeySelective(spu); // 更新spu详情 this.spuDetailMapper.updateByPrimaryKeySelective(spu.getSpuDetail()); //发送rabbitmq消息 sendMessage(spu.getId(),"insert"); }
(3)搜索服务接收消息
搜索服务接收到消息后要做的事情:
- 增:添加新的数据到索引库
- 删:删除索引库数据
- 改:修改索引库数据
因为索引库的新增和修改方法是合二为一的,因此我们可以将这两类消息一同处理,删除另外处理。
<1>引入依赖
<dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-amqp</artifactId> </dependency>
<2>添加配置
spring: rabbitmq: host: 127.0.0.1 username: guest password: guest virtual-host: /
这里只是接收消息而不发送,所以不用配置template相关内容。
<3>编写监听器
package lucky.leyou.listener; import lucky.leyou.service.SearchService; import org.springframework.amqp.core.ExchangeTypes; import org.springframework.amqp.rabbit.annotation.Exchange; import org.springframework.amqp.rabbit.annotation.Queue; import org.springframework.amqp.rabbit.annotation.QueueBinding; import org.springframework.amqp.rabbit.annotation.RabbitListener; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.stereotype.Component; @Component public class GoodsListener { @Autowired private SearchService searchService; /** * 处理insert和update的消息 * * @param id * @throws Exception */ @RabbitListener(bindings = @QueueBinding( value = @Queue(value = "leyou.create.index.queue", durable = "true"), exchange = @Exchange( value = "leyou.item.exchange", ignoreDeclarationExceptions = "true", type = ExchangeTypes.TOPIC), key = {"item.insert", "item.update"})) public void listenCreate(Long id) throws Exception { if (id == null) { return; } // 创建或更新索引 this.searchService.createIndex(id); } /** * 处理delete的消息 * * @param id */ @RabbitListener(bindings = @QueueBinding( value = @Queue(value = "leyou.delete.index.queue", durable = "true"), exchange = @Exchange( value = "leyou.item.exchange", ignoreDeclarationExceptions = "true", type = ExchangeTypes.TOPIC), key = "item.delete")) public void listenDelete(Long id) { if (id == null) { return; } // 删除索引 this.searchService.deleteIndex(id); } }
<4>编写创建和删除索引方法
这里因为要创建和删除索引,我们需要在SearchService中拓展两个方法,创建和删除索引:
public void createIndex(Long id) throws IOException { Spu spu = this.goodsClient.querySpuById(id); // 构建商品 Goods goods = this.buildGoods(spu); // 保存数据到索引库 this.goodsRepository.save(goods); } public void deleteIndex(Long id) { this.goodsRepository.deleteById(id); }
创建索引的方法可以从之前导入数据的测试类中拷贝和改造。
原文地址:https://www.cnblogs.com/luckyplj/p/11624751.html