spring+springmvc+kafka分布式消息中间件集成方案

Honghu的消息服务平台已经抛弃了之前的ActiveMQ,改用高吞吐量比较大的Kafka分布式消息中间件方案:

kafka消息平台使用spring+kafka的集成方案,详情如下:

1. 使用最高版本2.1.0.RELEASE集成jar包:spring-integration-kafka

2. Zookeeper、Kafka分布式集群使用init.properties配置化方案。

Java代码

kafka.servers=127.0.0.1:9092

kafka.topic=xxxooo

[java]view plaincopyprint?

kafka.servers=127.0.0.1:9092

kafka.topic=xxxooo

kafka.servers=127.0.0.1:9092

kafka.topic=xxxooo

3. 使用消息生产者spring-context-producer配置化方案。

Java代码

xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"

xmlns:context="http://www.springframework.org/schema/context"

xsi:schemaLocation="http://www.springframework.org/schema/beans

http://www.springframework.org/schema/beans/spring-beans.xsd

http://www.springframework.org/schema/context

http://www.springframework.org/schema/context/spring-context.xsd">

value="org.apache.kafka.common.serialization.IntegerSerializer"/>

value="org.apache.kafka.common.serialization.StringSerializer"/>

class="org.springframework.kafka.core.DefaultKafkaProducerFactory">

[java]view plaincopyprint?

xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"

xmlns:context="http://www.springframework.org/schema/context"

xsi:schemaLocation="http://www.springframework.org/schema/beans

http://www.springframework.org/schema/beans/spring-beans.xsd

http://www.springframework.org/schema/context

http://www.springframework.org/schema/context/spring-context.xsd">

value="org.apache.kafka.common.serialization.IntegerSerializer"/>

value="org.apache.kafka.common.serialization.StringSerializer"/>

class="org.springframework.kafka.core.DefaultKafkaProducerFactory">

xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"

xmlns:context="http://www.springframework.org/schema/context"

xsi:schemaLocation="http://www.springframework.org/schema/beans

http://www.springframework.org/schema/beans/spring-beans.xsd

http://www.springframework.org/schema/context

http://www.springframework.org/schema/context/spring-context.xsd">

value="org.apache.kafka.common.serialization.IntegerSerializer" />

value="org.apache.kafka.common.serialization.StringSerializer" />

class="org.springframework.kafka.core.DefaultKafkaProducerFactory">

4. 使用消息消费者spring-context-producer配置化方案。

Java代码

xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"

xmlns:context="http://www.springframework.org/schema/context"

xsi:schemaLocation="http://www.springframework.org/schema/beans

http://www.springframework.org/schema/beans/spring-beans.xsd

http://www.springframework.org/schema/context

http://www.springframework.org/schema/context/spring-context.xsd">

value="org.apache.kafka.common.serialization.IntegerDeserializer"/>

value="org.apache.kafka.common.serialization.StringDeserializer"/>

class="org.springframework.kafka.core.DefaultKafkaConsumerFactory">

class="org.springframework.kafka.listener.config.ContainerProperties">

class="org.springframework.kafka.listener.KafkaMessageListenerContainer"

init-method="doStart">

[java]view plaincopyprint?

xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"

xmlns:context="http://www.springframework.org/schema/context"

xsi:schemaLocation="http://www.springframework.org/schema/beans

http://www.springframework.org/schema/beans/spring-beans.xsd

http://www.springframework.org/schema/context

http://www.springframework.org/schema/context/spring-context.xsd">

value="org.apache.kafka.common.serialization.IntegerDeserializer"/>

value="org.apache.kafka.common.serialization.StringDeserializer"/>

class="org.springframework.kafka.core.DefaultKafkaConsumerFactory">

class="org.springframework.kafka.listener.config.ContainerProperties">

class="org.springframework.kafka.listener.KafkaMessageListenerContainer"

init-method="doStart">

xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"

xmlns:context="http://www.springframework.org/schema/context"

xsi:schemaLocation="http://www.springframework.org/schema/beans

http://www.springframework.org/schema/beans/spring-beans.xsd

http://www.springframework.org/schema/context

http://www.springframework.org/schema/context/spring-context.xsd">

value="org.apache.kafka.common.serialization.IntegerDeserializer" />

value="org.apache.kafka.common.serialization.StringDeserializer" />

class="org.springframework.kafka.core.DefaultKafkaConsumerFactory">

class="org.springframework.kafka.listener.config.ContainerProperties">

class="org.springframework.kafka.listener.KafkaMessageListenerContainer"

init-method="doStart">

5. 使用注解方式注入消息类型

@Autowired

private KafkaTemplate kafkaTemplate;

6. 重写MessageListener 的getMessage方法获取消息(业务实现)

7. RestFul服务方式测试消息服务

Java代码

@CrossOrigin(origins ="*", maxAge =3600, methods = { RequestMethod.GET, RequestMethod.POST, RequestMethod.DELETE,

RequestMethod.PUT })

@RestController

@RequestMapping(value ="/rest/kafka")

publicclassKafKaProducer {

@RequestMapping(value ="/send", method = RequestMethod.GET)

publicJSONObject save() {

System.out.println("+++++++++++++++++++++++++++++++");

kafkaTemplate.sendDefault("HongHu KAFKA分布式消息服务测试");

returnnull;

}

@Autowired

privateKafkaTemplate kafkaTemplate;

}

[java]view plaincopyprint?

@CrossOrigin(origins ="*", maxAge =3600, methods = { RequestMethod.GET, RequestMethod.POST, RequestMethod.DELETE,

RequestMethod.PUT })

@RestController

@RequestMapping(value ="/rest/kafka")

publicclassKafKaProducer {

@RequestMapping(value ="/send", method = RequestMethod.GET)

publicJSONObject save() {

System.out.println("+++++++++++++++++++++++++++++++");

kafkaTemplate.sendDefault("HongHu KAFKA分布式消息服务测试");

returnnull;

}

@Autowired

privateKafkaTemplate kafkaTemplate;

}

@CrossOrigin(origins = "*", maxAge = 3600, methods = { RequestMethod.GET, RequestMethod.POST, RequestMethod.DELETE,

RequestMethod.PUT })

@RestController

@RequestMapping(value = "/rest/kafka")

public class KafKaProducer {

@RequestMapping(value = "/send", method = RequestMethod.GET)

public JSONObject save() {

System.out.println("+++++++++++++++++++++++++++++++");

kafkaTemplate.sendDefault("HongHu KAFKA分布式消息服务测试");

return null;

}

@Autowired

private KafkaTemplate kafkaTemplate;

}

Java代码

@RestController

publicclassKafKaConsumerimplementsMessageListener {

@Autowired

privateLogService logService;

publicvoidonMessage(ConsumerRecord records) {

System.out.println("===================="+ records);

Object o = records.value();

Log log =newLog();

log.setIsNewRecord(true);

log.setId(IdGen.uuid());

log.setTitle(String.valueOf(o));

logService.save(log);

}

}

[java]view plaincopyprint?

@RestController

publicclassKafKaConsumerimplementsMessageListener {

@Autowired

privateLogService logService;

publicvoidonMessage(ConsumerRecord records) {

System.out.println("===================="+ records);

Object o = records.value();

Log log =newLog();

log.setIsNewRecord(true);

log.setId(IdGen.uuid());

log.setTitle(String.valueOf(o));

logService.save(log);

}

}

@RestController

public class KafKaConsumer implements MessageListener {

@Autowired

private LogService logService;

public void onMessage(ConsumerRecord records) {

System.out.println("====================" + records);

Object o = records.value();

Log log = new Log();

log.setIsNewRecord(true);

log.setId(IdGen.uuid());

log.setTitle(String.valueOf(o));

logService.save(log);

}

}

愿意了解框架技术或者源码的朋友直接求求交流分享技术:3133806896

分布式的一些解决方案,有愿意了解的朋友可以找我们团队探讨

更多详细源码参考来源

时间: 2024-10-13 19:47:54

spring+springmvc+kafka分布式消息中间件集成方案的相关文章

Spring+SpringMvc+Mybatis框架集成搭建教程

一.背景 最近有很多同学由于没有过SSM(Spring+SpringMvc+Mybatis , 以下简称SSM)框架的搭建的经历,所以在自己搭建SSM框架集成的时候,出现了这样或者那样的问题,很是苦恼,网络上又没有很详细的讲解以及搭建的教程.闲来无事,我就利用空闲时间来写这样一个教程和搭建步骤,来帮助那些有问题的小伙伴,让你从此SSM搭建不再有问题. 二.教程目录 1.Spring+SpringMvc+Mybatis框架集成搭建教程一(项目创建) 2.Spring+SpringMvc+Mybat

JEESZ分布式框架--单点登录集成方案

  JEESZ分布式框架单点登录集成方案第一节:单点登录简介 第一步:了解单点登录SSO主要特点是: SSO应用之间使用Web协议(如HTTPS) ,并且只有一个登录入口.SSO的体系中有下面三种角色:1) User(多个)2) Web应用(多个)3) SSO认证中心(一个) SSO实现包含以下三个原则:1) 所有的登录都在 SSO 认证中心进行.  2) SSO认证中心通过一些方法来告诉Web应用当前访问用户究竟是不是通过认证的用户.  3) SSO认证中心和所有的 Web 应用建立一种信任关

Spring+SpringMvc+Mybatis框架集成搭建教程一(背景介绍及项目创建)

一.背景 最近有很多同学由于没有过SSM(Spring+SpringMvc+Mybatis , 以下简称SSM)框架的搭建的经历,所以在自己搭建SSM框架集成的时候,出现了这样或者那样的问题,很是苦恼,网络上又没有很详细的讲解以及搭建的教程.闲来无事,我就利用空闲时间来写这样一个教程和搭建步骤,来帮助那些有问题的小伙伴,让你从此SSM搭建不再有问题. 二.搭建步骤 1.框架搭建环境 Spring 4.2.6.RELEASE SpringMvc 4.2.6.RELEASE Mybatis 3.2.

spring+springMVC集成(annotation方式)

spring+springMVC集成(annotation方式) SpringMVC+Spring4.0+Hibernate 简单的整合 MyBatis3整合Spring3.SpringMVC3

springMVC系列之(三) spring+springMVC集成(annotation方式)

个人认为使用框架并不是很难,关键要理解其思想,这对于我们提高编程水平很有帮助.不过,如果用都不会,谈思想就变成纸上谈兵了!!!先技术,再思想.实践出真知. 1.基本概念 1.1.Spring Spring是一个开源框架,Spring是于2003 年兴起的一个轻量级的Java 开发框架,由Rod Johnson 在其著作Expert One-On-One J2EE Development and Design中阐述的部分理念和原型衍生而来.它是为了解决企业应用开发的复杂性而创建的.Spring使用

Spring+Struts集成(方案一)

SSH框架是现在非常流行的框架之一,本文接下来主要来对Spring和Struts的集成进行展示. 集成原理:在Action中取得BeanFactory,通过BeanFactory取得业务逻辑对象. 集成框架图如下: 1 spring 和struts依赖包配置. *struts --拷贝struts相关java包和jstl. --在web.xml中配置ActionServlet. --提供struts-config.xml核心配置文件. --提供struts国际化资源文件,最好提供默认国际化文件.

spring springMVC mybatis 集成

最近闲来无事,整理了一下spring springMVC mybatis 集成,关于这个话题在园子里已经有很多人写过了,我主要是想提供一个完整的demo,涵盖crud,事物控制等. 整个demo分三个层次: 一.简单模式:整个框架的参数传递不使用实体对象,统一用Map来存储变量,对mybatis部分不使用mapper接口,使用SqlSessionDaoSupport 提供的SqlSession 来操作mapper XML文件中的命令.这种方式的好处是框架层次结构很简单,适合快速开发,缺点是没有实

Spring Boot微服务如何集成fescar解决分布式事务?

什么是fescar? 关于fescar的详细介绍,请参阅fescar wiki. 传统的2PC提交协议,会持有一个全局性的锁,所有局部事务预提交成功后一起提交,或有一个局部事务预提交失败后一起回滚,最后释放全局锁.锁持有的时间较长,会对并发造成较大的影响,死锁的风险也较高. fescar的创新之处在于,每个局部事务执行完立即提交,释放本地锁:它会去解析你代码中的sql,从数据库中获得事务提交前的事务资源即数据,存放到undo_log中,全局事务协调器在回滚的时候直接使用undo_log中的数据覆

Spring多数据源分布式事务管理/springmvc+spring+atomikos[jta]+druid+mybatis

项目进行读写分离及分库分表,在一个业务中,在一个事务中处理时候将切换多个数据源,需要保证同一事务多个数据源数据的一致性.此处使用atomikos来实现:最后附源码: 1:spring3.0之后不再支持jtom[jta]了,第三方开源软件atomikos(http://www.atomikos.com/)来实现. 2:org.springframework.transaction.jta.JotmFactoryBean类,spring-tx-2.5.6.jar中有此类,spring-tx-3.0.