一、Stream简介
应用程序通过 inputs 或者 outputs 来与 Spring Cloud Stream 中binder 交互,通过我们配置来 binding ,而 Spring Cloud Stream 的 binder 负责与消息中间件交互。所以,我们只需要搞清楚如何与 Spring Cloud Stream 交互就可以方便使用消息驱动的方式。通过使用Spring Integration来连接消息代理中间件以实现消息事件驱动。Spring Cloud Stream 为一些供应商的消息中间件产品提供了个性化的自动化配置实现,引用了发布-订阅、消费组、分区的三个核心概念。目前仅支持RabbitMQ、Kafka。
二、Stream学习
2.1消息中间件
使用kafka消息中间件来学习:https://www.cnblogs.com/yangk1996/p/10841588.html
2.2、工程改造
改造之前的工程:https://www.cnblogs.com/yangk1996/p/11069610.html
- springcloud-api
@Data public class User implements Serializable { /** * ID */ private Long id; /** * 用户名称 */ private String name; }
- spring-cloud-api-client
- maven
<dependency> <groupId>org.springframework.cloud</groupId> <artifactId>spring-cloud-starter-stream-kafka</artifactId> </dependency>
- 增加消息输出
import org.springframework.cloud.stream.annotation.Output; import org.springframework.messaging.MessageChannel; /** * @Description: 用户信息输出 * @date: 2019/6/23 13:05 */ public interface UserMessage { @Output("user-message") MessageChannel output(); }
- Controller
@Autowired private UserMessage userMessage; @PostMapping("/user/save/message/stream") public boolean saveUserByRabbitMessage(@RequestBody User user){ MessageChannel messageChannel = userMessage.output(); return messageChannel.send(MessageBuilder.withPayload(user).build()); }
- application.properties
## Kafka 生产者配置 spring.kafka.BOOTSTRAP-SERVERS=192.168.100.129:9092 spring.kafka.consumer.group-id= yangk spring.kafka.consumer.clientId=spring-cloud-api-client ## Spring Cloud Stream Binding 配置 ### user-message 为输出管道名称 destination 指定 Topic spring.cloud.stream.bindings.user-message.destination = springCloud
- 启动入口增加EnableBinding
@EnableBinding(UserMessage.class)
- spring-cloud-api-provider
- Maven
<dependency> <groupId>org.springframework.cloud</groupId> <artifactId>spring-cloud-starter-stream-kafka</artifactId> </dependency>
- 消息输入
public interface UserMessage { @Input("user-message") //管道名称 SubscribableChannel input(); }
- UserMessageServiceImpl(三种消息监听实现)
@Autowired private UserMessage userMessage; @Autowired private UserService userService; @Autowired private ObjectMapper objectMapper; @ServiceActivator(inputChannel = "user-message") public void listen(String data) throws IOException { System.out.println("ServiceActivator实现"+data); saveUser(data); } @StreamListener("user-message") public void onMessage(String data) throws IOException { System.out.println(" @StreamListeners实现"+data); saveUser(data); } private void saveUser(String data) throws IOException { User user = objectMapper.readValue(data, User.class); userService.saveUser(user); } @PostConstruct public void init() { SubscribableChannel subscribableChannel = userMessage.input(); subscribableChannel.subscribe(message -> { System.out.println("SubscribableChannel实现"+message); }); }
- application.properties
#Kafka配置 spring.kafka.BOOTSTRAP-SERVERS=192.168.100.129:9092 spring.kafka.consumer.group-id=yangk spring.kafka.consumer.clientId=spring-cloud-api-client ## Spring Cloud Stream Binding 配置 ### userMessage 为输入管道名称 destination 指定 Topic spring.cloud.stream.bindings.user-message.destination = springCloud
- 启动入口激活Stream Binding
//激活 Stream Binding到UserMessage @EnableBinding(UserMessage.class)
三、验证
项目启动顺序:spring-cloud-eureka-server -> spring-cloud-api-client -> spring-cloud-api-provider
这里乱码的应该是kafka的其他属性没有转换过来,这里我也没有处理这些。
原文地址:https://www.cnblogs.com/yangk1996/p/11072712.html
时间: 2024-11-06 03:32:29