一、创建Spring boot 工程
创建过程不再描述,创建后的工程结构如下:
POM文件中要加入几个依赖:
<?xml version="1.0" encoding="UTF-8"?> <project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 https://maven.apache.org/xsd/maven-4.0.0.xsd"> <modelVersion>4.0.0</modelVersion> <parent> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-parent</artifactId> <version>2.1.9.RELEASE</version> <relativePath/> <!-- lookup parent from repository --> </parent> <groupId>com.zhbf</groupId> <artifactId>springboot</artifactId> <version>0.0.1-SNAPSHOT</version> <name>springboot</name> <description>Demo project for Spring Boot</description> <properties> <java.version>1.8</java.version> </properties> <dependencies> <dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-web</artifactId> </dependency> <dependency> <groupId>org.projectlombok</groupId> <artifactId>lombok</artifactId> <optional>true</optional> </dependency> <dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-test</artifactId> <scope>test</scope> </dependency> <!--引入kafka依赖--> <dependency> <groupId>org.springframework.kafka</groupId> <artifactId>spring-kafka</artifactId> </dependency> <!-- 添加 gson 依赖 --> <dependency> <groupId>com.google.code.gson</groupId> <artifactId>gson</artifactId> <version>2.8.5</version> </dependency> </dependencies> <build> <plugins> <plugin> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-maven-plugin</artifactId> </plugin> </plugins> </build> </project>
启动SpringbootApplication.java,出现下图界面则说明工程创建好了:
二、创建kafka生产者类,并通过控制器调用
kafka生产者类
/** * Kafka消息生产类 */ @Log @Component public class KafkaProducer { @Resource private KafkaTemplate<String, String> kafkaTemplate; @Value("${kafka.topic.user}") private String topicUser;//topic名称 /** * 发送用户消息 * * @param user 用户信息 */ public void sendUserMessage(User user) { GsonBuilder builder = new GsonBuilder(); builder.setPrettyPrinting(); builder.setDateFormat("yyyy-MM-dd HH:mm:ss"); String message = builder.create().toJson(user); kafkaTemplate.send(topicUser, message); log.info("\n生产消息至Kafka\n" + message); } }
application.yml配置文件
启动ZK、kafka通讯的服务器broker,并启动消费者监听
启动方式参考上一篇文章,戳这里~
配置一个控制器,即调用kafka生成消息的入口
/** * 测试控制器 * PS:@RestController 注解: 该注解是 @Controller 和 @ResponseBody 注解的合体版 */ @RestController @RequestMapping("/kafka") public class KafkaController { @Autowired private User user; @Autowired private KafkaProducer kafkaProducer; @RequestMapping("/createMsg") public void createMsg() { kafkaProducer.sendUserMessage(user); } }
启动SpringbootApplication,并通过浏览器访问控制器,生成消息
可以看到控制台和消费者窗口都打印了kafka生成的消息。
三、创建kafka消费者类,并通过控制器调用
kafka消费者类
@Log @Component public class KafkaConsumerDemo { public void consume() { Properties props = new Properties(); // 必须设置的属性 props.put("bootstrap.servers", "127.0.0.1:9092"); props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer"); props.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer"); props.put("group.id", "group-user"); // 可选设置属性 props.put("enable.auto.commit", "true"); // 自动提交offset,每1s提交一次 props.put("auto.commit.interval.ms", "1000"); props.put("auto.offset.reset", "earliest "); props.put("client.id", "zy_client_id"); KafkaConsumer<String, String> consumer = new KafkaConsumer<>(props); // 订阅test1 topic consumer.subscribe(Collections.singletonList("topic-user")); while (true) { // 从服务器开始拉取数据 ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(100)); records.forEach(record -> { System.out.printf("成功消费消息:topic = %s ,partition = %d,offset = %d, key = %s, value = %s%n", record.topic(), record.partition(), record.offset(), record.key(), record.value()); }); } } }
重启SpringbootApplication,并通过浏览器访问控制器,消费消息
原文地址:https://www.cnblogs.com/riches/p/11720068.html
时间: 2024-10-08 00:33:22