Storm框架:Storm整合springboot

我们知道Storm本身是一个独立运行的分布式流式数据处理框架,Springboot也是一个独立运行的web框架。那么如何在Strom框架中集成Springboot使得我们能够在Storm开发中运用Spring的Ioc容器及其他如Spring Jpa等功能呢?我们先来了解以下概念:
Storm主要的三个Component:Topology、Spout、Bolt。Topology作为主进程控制着spout、bolt线程的运行,他们相当于独立运行的容器分布于storm集群中的各个机器节点。
SpringApplication:是配置Spring应用上下文的起点。通过调用SpringApplication.run()方法它将创建ApplicationContext实例,这是我们能够使用Ioc容器的主要BeanFactory。之后Spring将会加载所有单例模式的beans,并启动后台运行的CommandLineRunner beans等。
ApplicationContextAware:这是我们能够在普通Java类中调用Spring容器里的beans的关键接口。

实现原理

Storm框架中的每个Spout和Bolt都相当于独立的应用,Strom在启动spout和bolt时提供了一个open方法(spout)和prepare方法(bolt)。我们可以把初始化Spring应用的操作放在这里,这样可以保证每个spout/bolt应用在后续执行过程中都能获取到Spring的ApplicationContext,有了ApplicationContext实例对象,Spring的所有功能就都能用上了。

Spout.open方法实现br/>@Override
public void open(Map map, TopologyContext topologyContext, SpoutOutputCollector spoutOutputCollector) {
//启动Springboot应用
SpringStormApplication.run();

this.map = map;
this.topologyContext = topologyContext;
this.spoutOutputCollector = spoutOutputCollector;

}br/>Bolt.prepare方法实现
@Override
public void prepare(Map map, TopologyContext topologyContext, OutputCollector outputCollector) {
//启动Springboot应用
SpringStormApplication.run();

this.map = map;
this.topologyContext = topologyContext;
this.outputCollector = outputCollector;

}br/>SpringStormApplication启动类
@SpringBootApplication
@ComponentScan(value = "com.xxx.storm")
public class SpringStormApplication {
/**

@SpringBootApplication
@ComponentScan(value = "com.xxx.web")
public class PlatformApplication {
public static void main(String[] args) {
SpringApplication.run(PlatformApplication.class, args);br/>}
}
在spout/bolt中调用了SpringStormApplication.run方法后,我们还需要能够拿到ApplicationContext容器对象,这时候我们还需要实现ApplicationContextAware接口,写个工具类BeanUtils:
@Component
public class BeanUtils implements ApplicationContextAware {
private static ApplicationContext applicationContext = null;

@Override
public void setApplicationContext(ApplicationContext applicationContext) throws BeansException {
    if (BeanUtils.applicationContext == null) {
        BeanUtils.applicationContext = applicationContext;
    }
}

public static ApplicationContext getApplicationContext() {
    return applicationContext;
}

public static Object getBean(String name) {
    return getApplicationContext().getBean(name);
}

public static <T> T getBean(Class<T> clazz) {
    return getApplicationContext().getBean(clazz);
}

public static <T> T getBean(String name, Class<T> clazz) {
    return getApplicationContext().getBean(name, clazz);
}

}
通过@Component注解使得Spring在启动时能够扫描到该bean,因为BeanUtils实现了ApplicationContextAware接口,Spring会在启动成功时自动调用BeanUtils.setApplicationContext方法,将ApplicationContext对象保存到工具类的静态变量中,之后我们就可以使用BeanUtils.getBean()去获取Spring容器中的bean了。

写个简单例子

在FilterBolt的execute方法中获取Spring beanbr/>@Override
public void execute(Tuple tuple) {
FilterService filterService = (FilterService) BeanUtils.getBean("filterService");
filterService.deleteAll();
}
定义FilterService类,这时候我们就可以使用Spring的相关注解,自动注入,Spring Jpa等功能了。
br/>@Service("filterService")
public class FilterService {
br/>@Autowired
UserRepository userRepository;

public void deleteAll() {
    userRepository.deleteAll();
}

}
将storm应用作为Springboot工程的一个子模块

工程主目录的pom文件还是springboot相关的依赖,在storm子模块中引入storm依赖,这时候启动Strom的topology应用会有一个日志包依赖冲突。

SLF4J: Class path contains multiple SLF4J bindings.
SLF4J: Found binding in [jar:file:/Applications/IntelliJ%20IDEA.app/Contents/bin/~/.m2/repository/org/apache/logging/log4j/log4j-slf4j-impl/2.11.1/log4j-slf4j-impl-2.11.1.jar!/org/slf4j/impl/StaticLoggerBinder.class]
SLF4J: Found binding in [jar:file:/Applications/IntelliJ%20IDEA.app/Contents/bin/~/.m2/repository/ch/qos/logback/logback-classic/1.2.3/logback-classic-1.2.3.jar!/org/slf4j/impl/StaticLoggerBinder.class]
SLF4J: See
http://www.slf4j.org/codes.html#multiple_bindings for an explanation.
SLF4J: Actual binding is of type [org.apache.logging.slf4j.Log4jLoggerFactory]
我们需要在storm子模块的pom文件中重写org.springframework.boot:spring-boot-starter包依赖,将Springboot的相关日志包排除掉,如下:

<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter</artifactId>
<exclusions>
<exclusion>
<groupId>org.apache.logging.log4j</groupId>
<artifactId>log4j-to-slf4j2</artifactId>
</exclusion>
<exclusion>
<groupId>ch.qos.logback</groupId>
<artifactId>logback-classic2</artifactId>
</exclusion>
</exclusions>
</dependency>
OK,完美整合!

原文地址:http://blog.51cto.com/14084556/2321808

时间: 2024-11-09 06:24:21

Storm框架:Storm整合springboot的相关文章

流式计算框架-STORM简介

在当前的数据分析领域,对实时数据的计算需求越来越强烈,在此领域,出现了各类计算框架,如:Storm.S4等.目前本土公司对这些流式计算框架的应用也比较广泛,但苦于相关文档英文居多,缺少成系列且与官方相对应的中文手册.本系列试图从官方文档翻译入手,给大家呈现较为完备的中文资料,同时也是对自身知识的总结沉淀. 在这个系列博客中,我们选择了twitter的Storm框架,原因很简单,因为本人长期使用的就是该框架,咱们先从简介开始. Apache Storm是一个免费.开源.分布式的实时计算系统.相对于

Storm框架入门

1 Topology构成 和同样是计算框架的Mapreduce相比,Mapreduce集群上运行的是Job,而Storm集群上运行的是Topology.但是Job在运行结束之后会自行结束,Topology却只能被手动的kill掉,否则会一直运行下去. Storm集群中有两种节点,一种是控制节点(Nimbus节点),另一种是工作节点(Supervisor节点).所有Topology任务的提交必须在Storm客户端节点上进行(需要配置~/.storm/storm.yaml文件),由Nimbus节点分

Storm框架基础(一)

* Storm框架基础(一) Storm简述 如果你了解过SparkStreaming,那么Storm就可以类比着入门,在此我们可以先做一个简单的比较:  在SparkStreaming中: 我们曾尝试过每秒钟的实时数据处理,或者使用Window若干时间范围内的数据统一处理结果.亦或统计所有时间范围内的数据结果. 在Storm中: 我们可以根据进来的每一条数据进行实时处理,也就是说,Storm处理数据的速度,要小于1秒,也就是毫秒级别的. 如果你疑问,1秒处理1次数据,和进来1条数据处理1次有什

storm和kafka整合

storm和kafka整合 依赖 <dependency> <groupId>org.apache.storm</groupId> <artifactId>storm-kafka-client</artifactId> <version>1.2.2</version> <scope>provided</scope> </dependency> <dependency> <

Storm框架最显著的应用

Apache Storm框架支持许多当今最好的工业应用程序.我们将在本章中简要介绍Storm的一些最显著的应用. Klout Klout是一个应用程序,它使用社交媒体分析,根据在线社交影响力通过Klout得分,这是一个介于1和100之间的数值对用户排名.Klout使用Apache Storm的内置Trident抽象来创建流数据的复杂拓扑. 天气频道 天气频道使用Storm拓扑来获取天气数据.它绑定了Twitter,以在Twitter和移动应用程序启用天气知道的广告.OpenSignal是一家专门

SSM框架——详细整合教程(Spring+SpringMVC+MyBatis)

使用SSM(spring.SpringMVC和Mybatis)已经有三个多月了,项目在技术上已经没有什么难点了,基于现有的技术就可以实现想要的功能,当然肯定有很多可以改进的地方.之前没有记录SSM整合的过程,这次刚刚好基于自己的一个小项目重新搭建了一次,而且比项目搭建的要更好一些.以前解决问题的过程和方法并没有及时记录,以后在自己的小项目中遇到我再整理分享一下.这次,先说说三大框架整合过程.个人认为使用框架并不是很难,关键要理解其思想,这对于我们提高编程水平很有帮助.不过,如果用都不会,谈思想就

如何在ASP.NET Core应用中实现与第三方IoC/DI框架的整合?

我们知道整个ASP.NET Core建立在以ServiceCollection/ServiceProvider为核心的DI框架上,它甚至提供了扩展点使我们可以与第三方DI框架进行整合.对此比较了解的读者朋友应该很清楚,针对第三方DI框架的整合可以通过在定义Startup类型的ConfigureServices方法返回一个ServiceProvider来实现.但是真的有这么简单吗? 一.ConfigureServices方法返回的ServiceProvider貌似没有用!? 我们可以通过一个简单的

SSH三大框架注解整合(一)

1.导入jar包,ssh的jar包一共是38个,此时还需要多加一个包,就是struts的注解插件jar. 2.在web.xml文件中配置struts filter 和spring 的listener.代码如下: <!-- spring 监听器 -->  <context-param>   <param-name>contextConfigLocation</param-name>   <param-value>classpath:applicat

ssm 框架的整合(非原创‘借鉴’)

使用SSM(spring.SpringMVC和Mybatis)已经有三个多月了,项目在技术上已经没有什么难点了,基于现有的技术就可以实现想要的功能,当然肯定有很多可以改进的地方.之前没有记录SSM整合的过程,这次刚刚好基于自己的一个小项目重新搭建了一次,而且比项目搭建的要更好一些.以前解决问题的过程和方法并没有及时记录,以后在自己的小项目中遇到我再整理分享一下.这次,先说说三大框架整合过程.个人认为使用框架并不是很难,关键要理解其思想,这对于我们提高编程水平很有帮助.不过,如果用都不会,谈思想就