spring boot 集成kafka (多线程,消费者使用kafka的原生api实现,因为@KakfkaListener修改groupId无效)

application-test.properties

 1 #kafka
 2 kafka.consumer.zookeeper.connect=*:2181
 3 kafka.consumer.servers=*:9092
 4 kafka.consumer.enable.auto.commit=true
 5 kafka.consumer.session.timeout=6000
 6 kafka.consumer.auto.commit.interval=1000
 7 #保证每个组一个消费者消费同一条消息,若设置为earliest,那么会从头开始读partition(none)
 8 kafka.consumer.auto.offset.reset=latest
 9 kafka.consumer.concurrency=10
10
11 kafka.producer.servers=*:9092
12 kafka.producer.retries=0
13 kafka.producer.batch.size=4096
14 #//往kafka服务器提交消息间隔时间,0则立即提交不等待
15 kafka.producer.linger=1
16 kafka.producer.buffer.memory=40960

启动类

@SpringBootApplication
@EnableScheduling
public class Application {

    @Autowired
    private KafkaSender kafkaSender;

    public static void main(String[] args) {
        SpringApplication.run(Application .class, args);
    }

    //然后每隔1分钟执行一次
    @Scheduled(fixedRate = 1000 * 60)
    public void testKafka() throws Exception {
        kafkaSender.sendTest();
    }
}

生产者:

 1 @Component
 2 public class KafkaSender {
 3
 4     @Resource
 5     KafkaConsumerPool consumerPool;
 6
 7     /**
 8      *  这里需要放到程序启动完成之后执行 TODO
 9      */
10     @PostConstruct
11     void d(){
12
13         ConsumerGroup consumerThread = new ConsumerGroup("gropu-1","access_data",consumerConfig);
14         ConsumerGroup consumerThread2 = new ConsumerGroup("gropu-2","access_data", consumerConfig);
15
16         /**
17          * 各起两个消费者 ,Kafka consumer是非线程安全的 Consumer 需要一个new 的
18          */
19         consumerPool.SubmitConsumerPool(new Consumer(consumerThread));
20         consumerPool.SubmitConsumerPool(new Consumer(consumerThread));
21
22         consumerPool.SubmitConsumerPool(new Consumer(consumerThread2));
23         consumerPool.SubmitConsumerPool(new Consumer(consumerThread2));
24     }
25
26
27     @Resource
28     KafkaConsumerConfig consumerConfig;
29
30     @Autowired
31     private KafkaTemplate kafkaTemplate;
32
33     @Autowired
34     private KafkaTopics kafkaTopics;
35
36     /**
37      * 发送消息到kafka
38      *
39      */
40     public void sendTest() throws InterruptedException, IOException, KeeperException {
41
42         /**
43          * topic=‘access_data‘
44          */
45         kafkaTemplate.send("access_data",""+ System.currentTimeMillis());
46         kafkaTemplate.send("access_data",""+System.currentTimeMillis());
47         kafkaTemplate.send("access_data",""+System.currentTimeMillis());
48         kafkaTemplate.send("access_data",""+System.currentTimeMillis());
49         kafkaTemplate.send("access_data",""+System.currentTimeMillis());
50         kafkaTemplate.send("access_data",""+System.currentTimeMillis());
51     }
52
53
54 }
KafkaProducerConfig
@Configuration
@EnableKafka
public class KafkaProducerConfig {

    @Value("${kafka.producer.servers}")
    private String servers;
    @Value("${kafka.producer.retries}")
    private int retries;
    @Value("${kafka.producer.batch.size}")
    private int batchSize;
    @Value("${kafka.producer.linger}")
    private int linger;
    @Value("${kafka.producer.buffer.memory}")
    private int bufferMemory;

    public Map<String, Object> producerConfigs() {
        Map<String, Object> props = new HashMap<>();
        props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, servers);
        props.put(ProducerConfig.RETRIES_CONFIG, retries);
        props.put(ProducerConfig.BATCH_SIZE_CONFIG, batchSize);
        props.put(ProducerConfig.LINGER_MS_CONFIG, linger);
        props.put(ProducerConfig.BUFFER_MEMORY_CONFIG, bufferMemory);
        props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
        props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
        return props;
    }

    public ProducerFactory<String, String> producerFactory() {
        return new DefaultKafkaProducerFactory<>(producerConfigs());
    }

    @Bean
    public KafkaTemplate<String, String> kafkaTemplate() {
        return new KafkaTemplate<String, String>(producerFactory());
    }
}
KafkaConsumerConfig
 1 @Configuration
 2 @EnableKafka
 3 public class KafkaConsumerConfig {
 4
 5     @Value("${kafka.consumer.zookeeper.connect}")
 6     public String zookeeperConnect;
 7     @Value("${kafka.consumer.servers}")
 8     public  String servers;
 9     @Value("${kafka.consumer.enable.auto.commit}")
10     public  boolean enableAutoCommit;
11     @Value("${kafka.consumer.session.timeout}")
12     public  String sessionTimeout;
13     @Value("${kafka.consumer.auto.commit.interval}")
14     public  String autoCommitInterval;
15     @Value("${kafka.consumer.auto.offset.reset}")
16     public  String autoOffsetReset;
17     @Value("${kafka.consumer.concurrency}")
18     public  int concurrency;
19
20
21     public String getZookeeperConnect() {
22         return zookeeperConnect;
23     }
24
25     public void setZookeeperConnect(String zookeeperConnect) {
26         this.zookeeperConnect = zookeeperConnect;
27     }
28
29     public String getServers() {
30         return servers;
31     }
32
33     public void setServers(String servers) {
34         this.servers = servers;
35     }
36
37     public boolean isEnableAutoCommit() {
38         return enableAutoCommit;
39     }
40
41     public void setEnableAutoCommit(boolean enableAutoCommit) {
42         this.enableAutoCommit = enableAutoCommit;
43     }
44
45     public String getSessionTimeout() {
46         return sessionTimeout;
47     }
48
49     public void setSessionTimeout(String sessionTimeout) {
50         this.sessionTimeout = sessionTimeout;
51     }
52
53     public String getAutoCommitInterval() {
54         return autoCommitInterval;
55     }
56
57     public void setAutoCommitInterval(String autoCommitInterval) {
58         this.autoCommitInterval = autoCommitInterval;
59     }
60
61     public String getAutoOffsetReset() {
62         return autoOffsetReset;
63     }
64
65     public void setAutoOffsetReset(String autoOffsetReset) {
66         this.autoOffsetReset = autoOffsetReset;
67     }
68
69     public int getConcurrency() {
70         return concurrency;
71     }
72
73     public void setConcurrency(int concurrency) {
74         this.concurrency = concurrency;
75     }
76 }
Consumer
/**
 * 实际消费者,继承了ShutdownableThread ,要多加几个消费者直接继承实现即可
 *
 * @create 2017-11-06 12:42
 * @update 2017-11-06 12:42
 **/
public class Consumer extends ShutdownableThread {

    /**
     * kafka 消费者
     */
    private  KafkaConsumer consumer;

    /**
     *  topic
     */
    private  String topic;

    /**
     *  组id
     */
    private  String groupId;

    public Consumer(ConsumerGroup consumerGroup) {
        super("",false);
        this.consumer = consumerGroup.getConsumer();
        this.topic = consumerGroup.getTopic();
        this.groupId = consumerGroup.getA_groupId();
    }

    /**
     *  * 监听主题,有消息就读取
     * 从kafka里面得到数据后,具体怎么去处理. 如果需要开启kafka处理消息的广播模式,多个监听要监听不同的group,
     * 即方法上的注解@KafkaListener里的group一定要不一样.如果多个监听里的group写的一样,就会造成只有一个监听能处理其中的消息,
     * 另外监听就不能处理消息了.也即是kafka的分布式消息处理方式.
     * 在同一个group里的监听,共同处理接收到的消息,会根据一定的算法来处理.如果不在一个组,但是监听的是同一个topic的话,就会形成广播模式
     */
    @Override
    public void doWork() {
        consumer.subscribe(Collections.singletonList(this.topic));
        ConsumerRecords<Integer, String> records = consumer.poll(1000);
        for (ConsumerRecord<Integer, String> record : records) {
            System.out.println("Thread: "+Thread.currentThread().getName()
                    +"Received message: (" + this.groupId + ", " + record.value() + ") at offset "
                    + record.offset()+" partition : "+records.partitions());
        }
    }
}
ConsumerGroup 设置消费组
 1 public class ConsumerGroup  {
 2
 3     /**
 4      *  日志处理
 5      */
 6     private static final Log log = LogFactory.getLog(ConsumerGroup.class);
 7
 8     /**
 9      *  topic
10      */
11     private final String topic;
12
13     /**
14      *  公共连接属性
15      */
16     private  Properties props ;
17
18     /**
19      * 消费者组
20      */
21     private final String groupId;
22
23
24     public ConsumerGroup(String groupId, String topic, KafkaConsumerConfig consumerConfig) {
25         createConsumerConfig(groupId,consumerConfig);
26         this.topic = topic;
27         this.groupId = groupId;
28     }
29
30
31     private Properties createConsumerConfig(String groupId, KafkaConsumerConfig consumerConfig) {
32         props = new Properties();
33         props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG,consumerConfig.servers);
34         props.put(ConsumerConfig.GROUP_ID_CONFIG, groupId);
35         props.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, consumerConfig.enableAutoCommit);
36         props.put(ConsumerConfig.AUTO_COMMIT_INTERVAL_MS_CONFIG, consumerConfig.autoCommitInterval);
37         props.put(ConsumerConfig.SESSION_TIMEOUT_MS_CONFIG, consumerConfig.sessionTimeout);
38         props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.IntegerDeserializer");
39         props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringDeserializer");
40         props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, consumerConfig.autoOffsetReset);
41         // 其他配置再配置
42         return props;
43     }
44
45     public KafkaConsumer getConsumer() {
46         return new KafkaConsumer(props);
47     }
48
49     /**
50      *  其他类获取topic
51      * @return
52      */
53     public String getTopic() {
54         return topic;
55     }
56
57     public String getA_groupId() {
58         return groupId;
59     }
60 }
 1 @Component
 2 public class KafkaConsumerPool {
 3
 4     /**
 5      * 日志处理
 6      */
 7     private static final Log log = LogFactory.getLog(KafkaConsumerPool.class);
 8
 9     /**
10      *  线程池
11      */
12     private ExecutorService executor;
13
14     /**
15      * 初始化10个线程
16      */
17     @PostConstruct
18     void init(){
19         executor = Executors.newFixedThreadPool(10);
20     }
21
22     /**
23      * 提交新的消费者
24      *
25      * @param shutdownableThread
26      */
27     public void SubmitConsumerPool(ShutdownableThread shutdownableThread) {
28         executor.execute(shutdownableThread);
29     }
30
31     /**
32      * 程序关闭,关闭线程池
33      *
34      */
35     @PreDestroy
36     void fin(){
37         shutdown();
38     }
39
40     public void shutdown() {
41         if (executor != null) executor.shutdown();
42         try {
43             if (!executor.awaitTermination(5000, TimeUnit.MILLISECONDS)) {
44                 log.info("Timed out waiting for consumer threads to shut down, exiting uncleanly");
45             }
46         } catch (InterruptedException e) {
47             log.info("Interrupted during shutdown, exiting uncleanly");
48         }
49     }
50 }
时间: 2024-11-08 22:19:25

spring boot 集成kafka (多线程,消费者使用kafka的原生api实现,因为@KakfkaListener修改groupId无效)的相关文章

Kafka 入门和 Spring Boot 集成

Kafka 入门和 Spring Boot 集成 标签:博客 [TOC] 概述 kafka 是一个高性能的消息队列,也是一个分布式流处理平台(这里的流指的是数据流).由java 和 Scala 语言编写,最早由 LinkedIn 开发,并 2011年开源,现在由 Apache 开发维护. 应用场景 下面列举了一些kafka常见的应用场景. 消息队列 : Kafka 可以作为消息队列使用,可用于系统内异步解耦,流量削峰等场景. 应用监控:利用 Kafka 采集应用程序和服务器健康相关的指标,如应用

170711、spring boot 集成shiro

这篇文章我们来学习如何使用Spring Boot集成Apache Shiro.安全应该是互联网公司的一道生命线,几乎任何的公司都会涉及到这方面的需求.在Java领域一般有Spring Security.Apache Shiro等安全框架,但是由于Spring Security过于庞大和复杂,大多数公司会选择Apache Shiro来使用,这篇文章会先介绍一下Apache Shiro,在结合Spring Boot给出使用案例. Apache Shiro What is Apache Shiro?

Spring Boot 2.X(六):Spring Boot 集成 Redis

Redis 简介 什么是 Redis Redis 是目前使用的非常广泛的免费开源内存数据库,是一个高性能的 key-value 数据库. Redis 与其他 key-value 缓存(如 Memcached )相比有以下三个特点: 1.Redis 支持数据的持久化,它可以将内存中的数据保存在磁盘中,重启的时候可以再次加载进行使用. 2.Redis 不仅仅支持简单的 key-value 类型的数据,同时还提供 list,set,zset,hash 等数据结构的存储. 3.Redis 支持数据的备份

spring boot 集成 dubbo 企业完整版

一.什么是Spring Boot ? 现阶段的 Spring Boot 可谓是太火了,为什么呢?因为使用方便.配置简洁.上手快速,那么它是什么?从官网上我们可以看到,它是 Spring 开源组织下的一个子项目,主要简化了 Spring 繁重的配置,而且 Spring Boot 内嵌了各种 Servlet 容器,如:Tomcat.Jetty 等 官方网站:http://projects.spring.io/spring-boot/ GitHub源码:https://github.com/sprin

Spring Boot集成Jasypt安全框架

Jasypt安全框架提供了Spring的集成,主要是实现 PlaceholderConfigurerSupport类或者其子类. 在Sring 3.1之后,则推荐使用PropertySourcesPlaceholderConfigurer类作为属性替换配置类,这里Spring集成Jasypt则使用Jasypt对属性替换配置类的实现.EncryptablePropertySourcesPlaceholderConfigurer. 在Spring中集成比较容易,而且Jasypt官方也给出了配置Bea

spring boot 集成 hbase

spring boot 集成 hbase 会启动报错 主要因为Spring Boot内嵌了Web容器,方便对应用进行微服务化开发和部署.所以打算将HBase的业务应用作为一个单服务进行开发和发布,其他相关的子系统通过RESTful API来访问. 搭建项目环境时,需要注意的事项: 由于Spring Boot内嵌了Web容器,所以框架默认导入了依赖:tomcat-embed-core-8.5.5.jar.tomcat-embed-el-8.5.5.jar等包.而HBase的jar中包含了:serv

spring boot集成swagger2

做java Web的后端开发已经两年多了,一般都是开发完了接口,都把接口更新到wiki文档上,然后通知前端去文档上去查阅接口的详细描述, 当时经常接口会有变动,加参数或返回值夹字段,所以维护语线上一致的文档是一件非常麻烦的事情,前一段时间同事聊天说他们公司用的swagger2,这个不需要写文档,它是自动生成文档,只要会使用它提供的几个的注解就行,于是上网找了下资料,发现它于spring boot集成也非常方便.不废话直接看了代码. 首先,在maven项目的pom.xml加上他需要的依赖. <de

Spring Boot 集成MyBatis

Spring Boot 集成MyBatis 在集成MyBatis前,我们先配置一个druid数据源. Spring Boot 系列 Spring Boot 入门 Spring Boot 属性配置和使用 Spring Boot 集成MyBatis Spring Boot 静态资源处理 Spring Boot - 配置排序依赖技巧 Spring Boot - DevTools 介绍 Spring Boot 集成druid druid有非常多个配置选项,使用Spring Boot 的配置文件能够方便的

Quartz与Spring Boot集成使用

上次自己搭建Quartz已经是几年前的事了,这次项目中需要定时任务,需要支持集群部署,想到比较轻量级的定时任务框架就是Quartz,于是来一波. 版本说明 通过搜索引擎很容易找到其官网,来到Document的页面,当前版本是2.2.x. 简单的搭建操作 通过Maven引入所需的包: <dependency> <groupId>org.quartz-scheduler</groupId> <artifactId>quartz</artifactId>