kafka spring 实例

使用定时器发送后  结果如下

kafka 代码下载

Java代码  

  1. 15.安装kafka
  2. cd /usr/local/
  3. wget http://mirror.bit.edu.cn/apache/kafka/0.10.0.0/kafka_2.10-0.10.0.0.tgz
  4. tar xf kafka_2.10-0.10.0.0.tgz
  5. ln -s /usr/local/kafka_2.10-0.10.0.0 /usr/local/kafka
  6. chown -R hdfs:hadoop /usr/local/kafka_2.10-0.10.0.0 /usr/local/kafka
  7. chown -R root:root /usr/local/kafka_2.10-0.10.0.0 /usr/local/kafka
  8. /usr/local/zookeeper/bin/zkCli.sh
  9. create /kafka ‘‘
  10. vim /usr/local/kafka/config/server.properties
  11. broker.id=0
  12. zookeeper.connect=dev10.aoiplus.openpf:2181,dev06.aoiplus.openpf:2181,dev05.aoiplus.openpf:2181/kafka
  13. scp -r /usr/local/kafka_2.10-0.10.0.0.tgz root@dev05.aoiplus.openpf:/usr/local/
  14. scp -r /usr/local/kafka_2.10-0.10.0.0.tgz root@dev06.aoiplus.openpf:/usr/local/
  15. scp -r /usr/local/kafka/config/server.properties root@dev05.aoiplus.openpf:/usr/local/kafka/config/server.properties
  16. scp -r /usr/local/kafka/config/server.properties root@dev06.aoiplus.openpf:/usr/local/kafka/config/server.properties
  17. master slave 启动
  18. /usr/local/kafka/bin/kafka-server-start.sh /usr/local/kafka/config/server.properties &
  19. 创建topic
  20. /usr/local/kafka/bin/kafka-topics.sh --create --zookeeper dev10.aoiplus.openpf:2181,dev06.aoiplus.openpf:2181,dev05.aoiplus.openpf:2181/kafka --replication-factor 3 --partitions 5 --topic baoy-topic
  21. /usr/local/kafka/bin/kafka-topics.sh --describe --zookeeper dev10.aoiplus.openpf:2181,dev06.aoiplus.openpf:2181,dev05.aoiplus.openpf:2181/kafka --topic baoy-topic
  22. /usr/local/kafka/bin/kafka-console-producer.sh --broker-list dev10.aoiplus.openpf:9092,dev05.aoiplus.openpf:9092,dev06.aoiplus.openpf:9092 --topic baoy-topic
  23. /usr/local/kafka/bin/kafka-console-consumer.sh --zookeeper dev10.aoiplus.openpf:2181,dev05.aoiplus.openpf:2181,dev06.aoiplus.openpf:2181/kafka --from-beginning --topic baoy-topic

安装完成 后测试 下载

productor

 consumer

spring 接受信息

 代码部分

applicationContext-kafka-productor.xml

Java代码 下载  

  1. <?xml version="1.0" encoding="UTF-8"?>
  2. <beans xmlns="http://www.springframework.org/schema/beans"
  3. xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xmlns:int="http://www.springframework.org/schema/integration"
  4. xmlns:int-kafka="http://www.springframework.org/schema/integration/kafka"
  5. xmlns:task="http://www.springframework.org/schema/task"
  6. xsi:schemaLocation="http://www.springframework.org/schema/integration/kafka
  7. http://www.springframework.org/schema/integration/kafka/spring-integration-kafka.xsd
  8. http://www.springframework.org/schema/integration
  9. http://www.springframework.org/schema/integration/spring-integration.xsd
  10. http://www.springframework.org/schema/beans
  11. http://www.springframework.org/schema/beans/spring-beans.xsd
  12. http://www.springframework.org/schema/task
  13. http://www.springframework.org/schema/task/spring-task.xsd">
  14. <!-- commons config -->
  15. <bean id="stringSerializer" class="org.apache.kafka.common.serialization.StringSerializer" />
  16. <bean id="kafkaEncoder"  class="org.springframework.integration.kafka.serializer.avro.AvroReflectDatumBackedKafkaEncoder">
  17. <constructor-arg value="java.lang.String" />
  18. </bean>
  19. <bean id="producerProperties"
  20. class="org.springframework.beans.factory.config.PropertiesFactoryBean">
  21. <property name="properties">
  22. <props>
  23. <prop key="topic.metadata.refresh.interval.ms">3600000</prop>
  24. <prop key="message.send.max.retries">5</prop>
  25. <prop key="serializer.class">kafka.serializer.StringEncoder</prop>
  26. <prop key="request.required.acks">1</prop>
  27. </props>
  28. </property>
  29. </bean>
  30. <!-- topic test config -->
  31. <int:channel id="pChannel">
  32. <int:queue />
  33. </int:channel>
  34. <int-kafka:outbound-channel-adapter
  35. id="kafkaOutboundChannelAdapterProductor"
  36. kafka-producer-context-ref="producerContext"
  37. auto-startup="true"
  38. channel="pChannel"
  39. order="3">
  40. <int:poller fixed-delay="1000" time-unit="MILLISECONDS"  receive-timeout="1" task-executor="taskProductorExecutor" />
  41. </int-kafka:outbound-channel-adapter>
  42. <task:executor id="taskProductorExecutor" pool-size="5" keep-alive="120" queue-capacity="500" />
  43. <int-kafka:producer-context id="producerContext" producer-properties="producerProperties">
  44. <int-kafka:producer-configurations>
  45. <int-kafka:producer-configuration
  46. broker-list="172.23.27.120:9092,172.23.27.115:9092,172.23.27.116:9092"
  47. key-serializer="stringSerializer"
  48. value-class-type="java.lang.String"
  49. value-serializer="stringSerializer"
  50. topic="baoy-topic" />
  51. </int-kafka:producer-configurations>
  52. </int-kafka:producer-context>
  53. </beans>

applicationContext-kafka-consumer.xml

Java代码

  1. <?xml version="1.0" encoding="UTF-8"?>
  2. <beans xmlns="http://www.springframework.org/schema/beans"
  3. xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xmlns:int="http://www.springframework.org/schema/integration"
  4. xmlns:int-kafka="http://www.springframework.org/schema/integration/kafka"
  5. xmlns:task="http://www.springframework.org/schema/task"
  6. xsi:schemaLocation="http://www.springframework.org/schema/integration/kafka
  7. http://www.springframework.org/schema/integration/kafka/spring-integration-kafka.xsd
  8. http://www.springframework.org/schema/integration
  9. http://www.springframework.org/schema/integration/spring-integration.xsd
  10. http://www.springframework.org/schema/beans
  11. http://www.springframework.org/schema/beans/spring-beans.xsd
  12. http://www.springframework.org/schema/task
  13. http://www.springframework.org/schema/task/spring-task.xsd">
  14. <!-- topic test conf -->
  15. <int:channel id="cChannel">
  16. <int:dispatcher task-executor="kafkaMessageExecutor" />
  17. </int:channel>
  18. <!-- zookeeper配置 可以配置多个 -->
  19. <int-kafka:zookeeper-connect id="zookeeperConnect"
  20. zk-connect="172.23.27.120:2181,172.23.27.115:2181,172.23.27.116:2181/kafka" zk-connection-timeout="6000"
  21. zk-session-timeout="6000" zk-sync-time="2000" />
  22. <!-- channel配置 auto-startup="true" 否则接收不发数据 -->
  23. <int-kafka:inbound-channel-adapter
  24. id="kafkaInboundChannelAdapter" kafka-consumer-context-ref="consumerContext"
  25. auto-startup="true" channel="cChannel">
  26. <int:poller fixed-delay="1" time-unit="MILLISECONDS" />
  27. </int-kafka:inbound-channel-adapter>
  28. <task:executor id="kafkaMessageExecutor" pool-size="8" keep-alive="120" queue-capacity="500" />
  29. <bean id="kafkaDecoder" class="org.springframework.integration.kafka.serializer.common.StringDecoder" />
  30. <bean id="consumerProperties"
  31. class="org.springframework.beans.factory.config.PropertiesFactoryBean">
  32. <property name="properties">
  33. <props>
  34. <prop key="auto.offset.reset">smallest</prop>
  35. <prop key="socket.receive.buffer.bytes">10485760</prop> <!-- 10M -->
  36. <prop key="fetch.message.max.bytes">5242880</prop>
  37. <prop key="auto.commit.interval.ms">1000</prop>
  38. </props>
  39. </property>
  40. </bean>
  41. <!-- 消息接收的BEEN -->
  42. <bean id="kafkaConsumerService" class="com.curiousby.baoy.cn.kafka.KafkaConsumerService" />
  43. <!-- 指定接收的方法 -->
  44. <int:outbound-channel-adapter channel="cChannel"  ref="kafkaConsumerService" method="process" />
  45. <int-kafka:consumer-context id="consumerContext"
  46. consumer-timeout="1000" zookeeper-connect="zookeeperConnect"
  47. consumer-properties="consumerProperties">
  48. <int-kafka:consumer-configurations>
  49. <int-kafka:consumer-configuration
  50. group-id="default" value-decoder="kafkaDecoder" key-decoder="kafkaDecoder"
  51. max-messages="5000">
  52. <int-kafka:topic id="baoy-topic" streams="5" />
  53. </int-kafka:consumer-configuration>
  54. </int-kafka:consumer-configurations>
  55. </int-kafka:consumer-context>
  56. </beans>

KafkaConsumerService

Java代码 下载  

  1. @Service
  2. public class KafkaConsumerService {
  3. public void process(Map<String, Map<Integer, String>> msgs) {
  4. for (Map.Entry<String, Map<Integer, String>> entry : msgs.entrySet()) {
  5. System.out.println("======================================Consumer Message received: ");
  6. System.out.println("=====================================Suchit Topic:" + entry.getKey());
  7. for (String msg : entry.getValue().values()) {
  8. System.out.println("================================Suchit Consumed Message: " + msg);
  9. }
  10. }
  11. }
  12. }

KafkaProductorService

Java代码  

  1. @Service
  2. ublic class KafkaProductorService {
  3. @Autowired
  4. @Qualifier("pChannel")
  5. private MessageChannel messageChannel;
  6. public void sendInfo(String topic, Object obj) {
  7. System.out.println("---Service:KafkaService------sendInfo------");
  8. messageChannel.send(MessageBuilder.withPayload(obj).setHeader(KafkaHeaders.TOPIC,topic).build());
  9. }

pom

Java代码 下载  

  1. <project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
  2. xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/maven-v4_0_0.xsd">
  3. <modelVersion>4.0.0</modelVersion>
  4. <groupId>com.curiousby.baoyou.cn</groupId>
  5. <artifactId>SpringKafkaDEMO</artifactId>
  6. <packaging>war</packaging>
  7. <version>0.0.1-SNAPSHOT</version>
  8. <name>SpringKafkaDEMO Maven Webapp</name>
  9. <url>http://maven.apache.org</url>
  10. <!-- properties constant -->
  11. <properties>
  12. <spring.version>4.2.5.RELEASE</spring.version>
  13. </properties>
  14. <dependencies>
  15. <!-- junit4 -->
  16. <dependency>
  17. <groupId>junit</groupId>
  18. <artifactId>junit</artifactId>
  19. <version>4.7</version>
  20. <type>jar</type>
  21. <scope>test</scope>
  22. </dependency>
  23. <dependency>
  24. <groupId>org.dbunit</groupId>
  25. <artifactId>dbunit</artifactId>
  26. <version>2.4.9</version>
  27. <scope>test</scope>
  28. </dependency>
  29. <dependency>
  30. <groupId>com.github.springtestdbunit</groupId>
  31. <artifactId>spring-test-dbunit</artifactId>
  32. <version>1.1.0</version>
  33. <scope>test</scope>
  34. </dependency>
  35. <dependency>
  36. <groupId>org.springframework</groupId>
  37. <artifactId>spring-test</artifactId>
  38. <version>${spring.version}</version>
  39. <scope>test</scope>
  40. </dependency>
  41. <dependency>
  42. <groupId>javax.servlet</groupId>
  43. <artifactId>javax.servlet-api</artifactId>
  44. <version>3.1.0</version>
  45. <scope>provided</scope>
  46. </dependency>
  47. <dependency>
  48. <groupId>org.aspectj</groupId>
  49. <artifactId>aspectjrt</artifactId>
  50. <version>1.7.2</version>
  51. </dependency>
  52. <dependency>
  53. <groupId>org.aspectj</groupId>
  54. <artifactId>aspectjweaver</artifactId>
  55. <version>1.7.2</version>
  56. </dependency>
  57. <dependency>
  58. <groupId>org.springframework</groupId>
  59. <artifactId>spring-aspects</artifactId>
  60. <version>${spring.version}</version>
  61. <type>jar</type>
  62. </dependency>
  63. <dependency>
  64. <groupId>org.springframework</groupId>
  65. <artifactId>spring-core</artifactId>
  66. <version>${spring.version}</version>
  67. </dependency>
  68. <dependency>
  69. <groupId>org.springframework</groupId>
  70. <artifactId>spring-web</artifactId>
  71. <version>${spring.version}</version>
  72. </dependency>
  73. <dependency>
  74. <groupId>org.springframework</groupId>
  75. <artifactId>spring-webmvc</artifactId>
  76. <version>${spring.version}</version>
  77. </dependency>
  78. <dependency>
  79. <groupId>org.springframework.integration</groupId>
  80. <artifactId>spring-integration-kafka</artifactId>
  81. <version>1.3.0.RELEASE</version>
  82. </dependency>
  83. <dependency>
  84. <groupId>commons-logging</groupId>
  85. <artifactId>commons-logging</artifactId>
  86. <version>1.1.1</version>
  87. </dependency>
  88. <dependency>
  89. <groupId>org.slf4j</groupId>
  90. <artifactId>slf4j-api</artifactId>
  91. <version>1.6.4</version>
  92. <type>jar</type>
  93. </dependency>
  94. <dependency>
  95. <groupId>org.slf4j</groupId>
  96. <artifactId>slf4j-log4j12</artifactId>
  97. <version>1.6.4</version>
  98. <type>jar</type>
  99. </dependency>  下载
  100. <dependency>
  101. <groupId>javax</groupId>
  102. <artifactId>javaee-api</artifactId>
  103. <version>7.0</version>
  104. </dependency>
  105. <dependency>
  106. <groupId>com.fasterxml.jackson.core</groupId>
  107. <artifactId>jackson-core</artifactId>
  108. <version>2.7.6</version>
  109. </dependency>
  110. <dependency>
  111. <groupId>com.fasterxml.jackson.core</groupId>
  112. <artifactId>jackson-databind</artifactId>
  113. <version>2.7.6</version>
  114. </dependency>
  115. <dependency>
  116. <groupId>com.fasterxml.jackson.core</groupId>
  117. <artifactId>jackson-annotations</artifactId>
  118. <version>2.7.6</version>
  119. </dependency>
  120. <dependency>
  121. <groupId>org.apache.avro</groupId>
  122. <artifactId>avro</artifactId>
  123. <version>1.7.7</version>
  124. </dependency>
  125. </dependencies>
  126. <build>
  127. <finalName>SpringKafkaDEMO</finalName>
  128. <plugins>
  129. <plugin>
  130. <groupId>org.apache.maven.plugins</groupId>
  131. <artifactId>maven-compiler-plugin</artifactId>
  132. <version>3.3</version>
  133. <dependencies>
  134. <dependency>
  135. <groupId>org.codehaus.plexus</groupId>
  136. <artifactId>plexus-compiler-javac</artifactId>
  137. <version>2.5</version>
  138. </dependency>
  139. </dependencies>
  140. <configuration>
  141. <source>1.7</source>
  142. <target>1.7</target>
  143. <encoding>UTF-8</encoding>
  144. <compilerArguments>
  145. <verbose />
  146. <bootclasspath>${java.home}/lib/rt.jar:${java.home}/lib/jce.jar</bootclasspath>
  147. </compilerArguments>
  148. </configuration>
  149. </plugin>
  150. </plugins>
  151. </build>
  152. </project>

遇到的问题:下载地址

1. spring 中 日志 中的 logback  必须 保持一致   ,这里我使用 org.slf4j 1.6.4

Java代码  

  1. <groupId>org.slf4j</groupId>
  2. <artifactId>slf4j-api</artifactId>
  3. <version>1.6.4</version>
  4. <type>jar</type>
  5. </dependency>
  6. <dependency>
  7. <groupId>org.slf4j</groupId>
  8. <artifactId>slf4j-log4j12</artifactId>
  9. <version>1.6.4</version>
  10. <type>jar</type>
  11. </dependency>
时间: 2024-12-05 14:35:08

kafka spring 实例的相关文章

深入浅出Spring(四) Spring实例分析

上次的博文中 深入浅出Spring(二) IoC详解 和 深入浅出Spring(三) AOP详解中,我们分别介绍了一下Spring框架的两个核心一个是IoC,一个是AOP.接下来我们来做一个Spring的实例. 为了更好的讲解Spring的相关内容,这次的博文会针对一个[添加用户]的实例,进行逐步的解剖和优化,再此过程中,细节内容大家不需要考虑,只需要加深对Spring的理解即可. 1.实例一 首先,我们来看一个没有使用任何Spring框架内容,比较单纯的添加用户的实例.先看一下相关的类图和实现

kafka使用实例

定义一个procucer package cn.vko.common.kafka; import java.util.Properties; import org.apache.commons.lang3.StringUtils; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import cn.vko.common.utils.mybatis.GenCreateInterceptor; import kafka.javaapi

Spring实例:Ioc依赖注入的一个例子

一.利用MyEclipse引入spring特性: 新建一个web项目后,右击项目名称,选择Myeclipse,选择add spring capabilities,选择所需的Libraries. 二.spring鼓励的面向接口编程: Interface包里: 包含两个接口 1 package SSHinterface; 2 3 public interface Person { 4 public void useAxe(); 5 } 1 package SSHinterface; 2 3 publ

SpringBoot Kafka 整合实例教程

1.使用IDEA新建工程引导方式,创建消息生产工程 springboot-kafka-producer. 工程POM文件代码如下: 1 <?xml version="1.0" encoding="UTF-8"?> 2 <project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instanc

kafka生产实例安装

2019/3/14 星期四 Linux 初始化脚本 (centos6 centos7 通用)Linux 初始化脚本 (centos6 centos7 通用)zookeeper生产环境搭建 zookeeper生产环境搭建 在安装前请务必安装好zookeeper 查看上面2个链接地址! kafka 生产环境搭建 [root@emm-kafka01-10--174 ~]# cd /opt/ins/ [root@emm-kafka01-10--174 ins]# ll total 233044 -rwx

整合 KAFKA+Flink 实例(第一部分,趟坑记录)

2017年后,一大波网络喧嚣,说流式处理如何牛叉,如何高大上,抱歉,工作满负荷,没空玩那个: 今年疫情隔离在家,无聊,开始学习 KAFKA+Flink ,目前的打算是用爬虫抓取网页数据,传递到Kafka中,再用Flink计算. 个人性格原因,我不愿意过分沉迷于纸质或者电子教程材料,也不是特别喜欢网上某些培训机构已经过时了的所谓培训视频, 喜欢动手直接写代码,所以简单翻看一点PDF教程,看了两集“培训视频”,也没说Kafka.flink两组件咋结合使用,不耐烦,直接开码(码农的糙性): 之前我写过

kafka producer实例及原理分析

1.前言 首先,描述下应用场景: 假设,公司有一款游戏,需要做行为统计分析,数据的源头来自日志,由于用户行为非常多,导致日志量非常大.将日志数据插入数据库然后再进行分析,已经满足不了.最好的办法是存日志,然后通过对日志的分析,计算出有用的数据.我们采用kafka这种分布式日志系统来实现这一过程. 步骤如下: 搭建KAFKA系统运行环境 如果你还没有搭建起来,可以参考我的博客: http://zhangfengzhe.blog.51cto.com/8855103/1556650 设计数据存储格式

Kafka 应用实例

Kafka是一种分布式的,基于发布/订阅的消息系统.主要设计目标如下: 通过O(1)的磁盘数据结构提供消息的持久化,这种结构对于即使数以TB的消息存储也能够保持长时间的稳定性能. 高吞吐量:即使是非常普通的硬件kafka也可以支持每秒数十万的消息. Consumer客户端pull,随机读,利用sendfile系统调用进行zero-copy ,批量拉数据 消费状态保存在客户端 支持Kafka Server间的消息分区,及分布式消费,同时保证每个Partition内的消息顺序传输. 数据迁移.扩容对

kafka java实例

生产者 1 package com; 2 import java.util.Properties; 3 import java.util.concurrent.TimeUnit; 4 5 import kafka.javaapi.producer.Producer; 6 import kafka.producer.KeyedMessage; 7 import kafka.producer.ProducerConfig; 8 import kafka.serializer.StringEncode