Kafka Consumer 启动测试类

https://github.com/MarcoGhise/SpringKafka.git

 1 package it.demo.kafka.springkafka.listener;
 2
 3 import org.springframework.beans.BeansException;
 4 import org.springframework.context.ApplicationContext;
 5 import org.springframework.context.ApplicationContextAware;
 6 import org.springframework.integration.endpoint.EventDrivenConsumer;
 7 import org.springframework.integration.endpoint.SourcePollingChannelAdapter;
 8 import org.springframework.integration.kafka.support.ConsumerConfiguration;
 9 import org.springframework.integration.kafka.support.KafkaConsumerContext;
10
11 import com.yammer.metrics.Metrics;
12
13 public class KafkaConsumerStarter implements ApplicationContextAware
14 {
15     private ApplicationContext appContext;
16
17     private SourcePollingChannelAdapter kafkaInboundChannelAdapter;
18
19     private KafkaConsumerContext kafkaConsumerContext;
20
21     public void initIt() throws Exception
22     {
23         kafkaInboundChannelAdapter = appContext.getBean("kafka-inbound-channel-adapter", SourcePollingChannelAdapter.class);
24         kafkaInboundChannelAdapter.start();
25
26         kafkaConsumerContext = appContext.getBean("consumerContext", KafkaConsumerContext.class);
27     }
28
29     public void cleanUp() throws Exception
30     {
31         if (kafkaInboundChannelAdapter != null)
32         {
33             kafkaInboundChannelAdapter.stop();
34         }
35
36         Thread.sleep(1000);
37
38         Metrics.defaultRegistry().shutdown();
39     }
40
41     public void setApplicationContext(ApplicationContext applicationContext) throws BeansException
42     {
43         this.appContext = applicationContext;
44     }
45
46 }
时间: 2024-08-06 07:04:43

Kafka Consumer 启动测试类的相关文章

【IntellJ IDEA】idea启动测试类报错Error running 'Test1.test': Command line is too long. Shorten command line for Test1.test or also for JUnit default configuration.

idea启动测试类报错 Error running 'Test1.test': Command line is too long. Shorten command line for Test1.test or also for JUnit default configuration. 注意: 最简单的方法,就是你重新创建一个新的测试类,在里面重新写一遍测试方法,代码都可以粘贴过去. 解决方法: 1.打开本项目的.idea文件夹,找到文件夹中的workspace.xml文件 2.搜索 Proper

【原创】kafka consumer源代码分析

顾名思义,就是kafka的consumer api包. 一.ConsumerConfig.scala Kafka consumer的配置类,除了一些默认值常量及验证参数的方法之外,就是consumer的配置参数了,比如group.id, consumer.id等,详细列表见官网. 二.ConsumerIterator.scala KafkaStream的迭代器类,当stream底层的阻塞队列为空时该iterator会置于阻塞状态.这个迭代器还提供了一个shutdownCommand对象可作为一个

【分页查询测试】编写测试类

test下的包路径与main下的包路径保持一致. 测试程序使用@SpringBootTest和@RunWith(SpringRunner.class)注解, 启动测试类会从main下找springBoot启 动类,加载spring容器. @SpringBootTest是一个用于测试的注解 @RunWith(SpringRunner.class) @RunWith就是一个运行器 @RunWith(SpringJUnit4ClassRunner.class),让测试运行于Spring测试环境 原文地

Kafka设计解析(四)- Kafka Consumer设计解析

本文转发自Jason’s Blog,原文链接 http://www.jasongj.com/2015/08/09/KafkaColumn4 摘要 本文主要介绍了Kafka High Level Consumer,Consumer Group,Consumer Rebalance,Low Level Consumer实现的语义,以及适用场景.以及未来版本中对High Level Consumer的重新设计–使用Consumer Coordinator解决Split Brain和Herd等问题. H

[Big Data - Kafka] Kafka设计解析(四):Kafka Consumer解析

High Level Consumer 很多时候,客户程序只是希望从Kafka读取数据,不太关心消息offset的处理.同时也希望提供一些语义,例如同一条消息只被某一个Consumer消费(单播)或被所有Consumer消费(广播).因此,Kafka High Level Consumer提供了一个从Kafka消费数据的高层抽象,从而屏蔽掉其中的细节并提供丰富的语义. Consumer Group High Level Consumer将从某个Partition读取的最后一条消息的offset存

Kafka Consumer应用与高级应用

Kafka Consumer应用与高级应用 PS:本博客仅作学习.总结.交流使用,参考以下博客&资料 1.http://kafka.apache.org/intro.html 2.https://cwiki.apache.org/confluence/display/KAFKA/0.8.0+SimpleConsumer+Example 3.http://www.cnblogs.com/luotianshuai/p/5206662.html 4.http://www.cnblogs.com/fxj

转:Kafka设计解析(四):Kafka Consumer解析

High Level Consumer 很多时候,客户程序只是希望从Kafka读取数据,不太关心消息offset的处理.同时也希望提供一些语义,例如同一条消息只被某一个 Consumer消费(单播)或被所有Consumer消费(广播).因此,Kafka High Level Consumer提供了一个从Kafka消费数据的高层抽象,从而屏蔽掉其中的细节并提供丰富的语义. Consumer Group High Level Consumer将从某个Partition读取的最后一条消息的offset

Kafka consumer在项目中的多线程处理方式

对于KafkaConsumer而言,它不像KafkaProducer,不是线程安全的,状态是在consumer中维护的,所以实现时要注意多线程的使用,一般有2种使用方法: 1:每个Consumer有自己的线程,consumer去拉取数据,并对数据处理,这种方式比较简单,易于实现,容易顺序处理消息 2:消费者处理者方式,创建一个线程池,在consumer拉取数据后,由线程池来中的线程来处理数据,把拉取数据与处理数据解耦,但数据处理有可能破坏partition的消息顺序 从Kafka 文档中我们也可

Injection of autowired dependencies failed; autowire 自动注入失败,测试类已初始化过了Spring容器。

1 严重: StandardWrapper.Throwable 2 org.springframework.beans.factory.BeanCreationException: Error creating bean with name 'searchController': Injection of autowired dependencies failed; nested exception is org.springframework.beans.factory.BeanCreatio