Storm集群组件和编程模型



Storm工作原理:

Storm是一个开源的分布式实时计算系统,常被称为流式计算框架。什么是流式计算呢?通俗来讲,流式计算顾名思义:数据流源源不断的来,一边来,一边计算结果,再进入下一个流。

比如一般金融系统一直不断的执行,金融交易、用户全部行为都记录进日志里,日志分析出站点运维、猎户信息。海量数据使得单节点处理只是来。所以就用到分布式计算机型,storm 是当中的典型代表之中的一个,一般应用场景是:中间使用一个消息队列系统如kafka,先将消息缓存起来,storm 中有非常多的节点,分布式并行执行处理程序,进行数据处理。

仅仅要不是人为干预。storm 就一直实时不断地进行数据处理。值得注意的是:并非storm去处理,而是它能够将我们程序的非常多jar包。业务程序,同一时候放到不同的server中并发的执行, 终于得到的结果就是不同系统的海量数据就会分散到不同的server中并发的进行处理,负载能力非常强。 所以真正进行数据处理的是我们写好的数据处理程序,storm的强大作用之中的一个就是它为这些程序提供了执行温床,将应用程序上传到storm 集群中,在多台机器上并发执行,这样就能够扩展程序的负载处理能力,实现流式计算。

Storm 集群组件:

集群角色:

Nimbus:集群主节点。主要负责任务分配、响应client提交topology请求以及任务失败的调度

Supervisor:集群从节点。主要负责启动、停止业务逻辑组件程序进程

主从节点之间通过zookeeper集群进行连接,主从节点之间是fail-fast(java的一种错误机制)、无状态的,主从节点的状态信息均保存到zookeeper中或者本地硬盘里。

这种优点就在于,哪怕是主节点kill掉了,storm会自己主动起一个备份主节点。由于无状态的关系,所以随意一个节点都能够充当Nimbus一角。

这种设计使得storm十分稳定。【译自apache storm官网】

watermark/2/text/aHR0cDovL2Jsb2cuY3Nkbi5uZXQv/font/5a6L5L2T/fontsize/400/fill/I0JBQkFCMA==/dissolve/70/gravity/Center">

Storm 编程模型

Topology

业务处理模型

Spout

数据源组件。用于获取数据,可通过文件或者消息队列【kafka、activeMQ】中获取数据

Bolt

逻辑处理组件

简单理解,topology【拓扑结构】就是包括了数据源、逻辑处理组件的一个外在集合框架,使用storm能够定义一个topology里set多少个数据源组件。多少个逻辑处理组件。

以下通过demo来详细解释Storm编程模型的几个主要元组

比如如今须要对一组数据进行处理,将数据中全部的英文转成大写,再加上标识后缀,最后保存到本地文本中。当然这仅仅是一个特别简单的数据处理逻辑。仅用于帮助大家理解Storm编程模型。
那依据Storm的编程模型。实现这个数据处理需求须要建立1个数据源Spout组件。2个业务逻辑组件Bolt,以及一个Topology结构,将这3个组件增加到这个topology结构中。

public class RandomSpout extends BaseRichSpout{
	SpoutOutputCollector collector=null;
	String[] goods={"iphone","xiaomi","meizu","zhongxing","huawei","moto","sumsung","simens"};
	/*
	 * 获取消息并发送给下一个组件的方法。会被storm 不断地调用
	 * 从goods 数组中随机获取一个商品名封装到tuple中去
	 */
	@Override
	public void nextTuple() {
		Random random=new Random();
		String good=goods[random.nextInt(goods.length)];

		//封装到tuple中发送给下一个组件
		collector.emit(new Values(good));
	}

	//进行初始化,仅仅在開始时调用一次
	@Override
	public void open(Map conf, TopologyContext context, SpoutOutputCollector collector) {
		this.collector=collector;
	}
	/*
	 * 定义tunple的schema
	 *
	 */
	@Override
	public void declareOutputFields(OutputFieldsDeclarer declarer) {
		declarer.declare(new Fields("src_word"));
	}
}

数据源Spout组件通过继承Storm基类。重写三个最核心的方法,各自是open、nextTuple、和delcare方法。open是在将运行数据传递之前所运行的方法,用于初始化数据。nextTuple中核心方法就是collector的emit方法,用于将数据传递给下一个元组。delcare用于成名元组传递、接收数据的格式,能够简单的理解为给传递的数据加上一个标识键。

public class UpperBolt extends BaseBasicBolt {

	//每来一个消息元组tuple,都会被运行一次该方法
	@Override
	public void execute(Tuple tuple,BasicOutputCollector collector) {
		//从tuple 中拿到数据--原始商品名
		String src_word=tuple.getString(0);//获取下标第一个消息
		String  upper=src_word.toUpperCase();
		//发送出去
		collector.emit(new Values(upper));
	}
	//给消息申明一个字段名
	@Override
	public void declareOutputFields(OutputFieldsDeclarer declare) {
		declare.declare(new Fields("upper"));
	}
}

这个逻辑处理bolt 用于将spout数据源组件中传递的元组转成大写格式,先获取tuple的数据,然后emit发送给下一个元组。

/*
 * 给商品名称加入后缀。然后写入文件里
 */
public class SuffixBolt extends BaseBasicBolt{
	FileWriter file =null;
	@Override
	public void prepare(Map stormConf, TopologyContext context) {
		try {
			 file = new FileWriter("D://eclipse_plugin"+UUID.randomUUID());
		} catch (IOException e) {
			e.printStackTrace();
		}
	}
	//每一次执行都去new 一个writer 。应该在调用excute 之前先把writer 初始化好==持续执行
	@Override
	public void execute(Tuple tuple, BasicOutputCollector collector) {
		//从消息元组中拿到上一个组件发送过来的数据
		String upper=tuple.getString(0);
		String result=upper +"_suffix";
			try {
				file.append(result);
				file.append("/n");
			} catch (IOException e) {

				e.printStackTrace();
			}
	}
	//声明该组件要发送出去的tuple的字段定义
	@Override
	public void declareOutputFields(OutputFieldsDeclarer declare) {
	}
}

bolt和spout一样,继承storm基类之后,也会有prepare方法用于准备数据,初始化一些对象;excute方法则是每每传递过来一个元组。便会触发运行一次。这个bolt的作用在于将上一个元组传递过来的数据加上后缀处理,然后写入本地文件里。

那么。写好了这些基础的数据源和业务逻辑处理元组,怎样组织他们的数据传递关系。这就是Topology类的职责。

/*
 * 描写叙述topology的结构,以及创建topology并提交给集群
 */
public class TopoMain {
	public static void main(String[] args) throws AlreadyAliveException, InvalidTopologyException {
		TopologyBuilder builder=new TopologyBuilder();

		//设置消息源组件  4表示spout进程个数
		builder.setSpout("randomSpout", new RandomSpout(),4);

		//设置逻辑处理组件
		//shuffleGrouping 指定接收哪个组件传过来的消息
		builder.setBolt("upper", new UpperBolt(),4).shuffleGrouping("randomSpout");
		builder.setBolt("result", new SuffixBolt(),4).shuffleGrouping("upper");

		//创建一个topology
		StormTopology topology=builder.createTopology();

		Config config=new Config();
		config.setNumWorkers(4);//设置进程个数
		config.setDebug(true);//设置调试状态
		config.setNumAckers(0);//消息应答器,事务性不是非常强。可设置为0

//提交topology到storm  定义一个名称。好在集群里去标识;通过配置对象传递參数给集群,集群依据这些參数,任务调度进行调整
		StormSubmitter.submitTopology("demotopo", config, topology);
	}
}

Topology类便将之前编写的1个spout 和2个bolt组装到一个topology中。并通过追加shuffleGrouping方法设置了他们之间的数据传递方向,以及进程个数。

通过这个实例应该对storm的编程模型和编码流程有了简单的认识。

但这仅仅是storm的大山一小角,比如zookeeper对storm集群主从节点的管理、storm与消息中间件的结合处理海量数据。复杂的数据处理流程。这些才是storm真正大展身手的地方。



时间: 2024-11-07 08:19:15

Storm集群组件和编程模型的相关文章

Storm集群安装部署步骤【详细版】

作者: 大圆那些事 | 文章可以转载,请以超链接形式标明文章原始出处和作者信息 网址: http://www.cnblogs.com/panfeng412/archive/2012/11/30/how-to-install-and-deploy-storm-cluster.html 本文以Twitter Storm官方Wiki为基础,详细描述如何快速搭建一个Storm集群,其中,项目实践中遇到的问题及经验总结,在相应章节以“注意事项”的形式给出. 1. Storm集群组件 Storm集群中包含两

Storm集群安装详解

Storm集群安装详解 storm有两种操作模式: 本地模式和远程模式. 本地模式:你可以在你的本地机器上开发测试你的topology, 一切都在你的本地机器上模拟出来; 远端模式:你提交的topology会在一个集群的机器上执行. 本文以Twitter Storm官方Wiki为基础,详细描述如何快速搭建一个Storm集群,其中,项目实践中遇到的问题及经验总结,在相应章节以“注意事项”的形式给出. 1.   Strom集群组件 Storm集群中包含两类节点:主控节点(Master Node)和工

Apache Storm 集群环境搭建

Apache storm 是一个由twitter开源的大数据处理系统,与其他系统不同的是,storm旨在用于分布式实时处理并且与语言无关.笔者所认知的storm使用场景诸 如 实时日志分析.网站用户行为实时分析.实时计算等,目前很多公司也都把storm作为自己的大数据架构的一部分,来实现一些实时业务的处理. 相信大家都和我有一样的认知,那就是现在的技术都是项目驱动模式,没有最好的技术,只有最适合自己项目的技术.下面先跟大家分享一下我对storm的一些简单了解: storm的优点: 1.简单的编程

1.1 Storm集群安装部署步骤

安装storm集群,需要依赖以下组件: Zookeeper Python Zeromq Storm JDK JZMQ 故安装过程根据上面的组件分为以下几步: 安装JDK 安装Zookeeper集群 安装Python及依赖 安装Storm 另外,操作系统环境为:Centos6.4,安装用户为:root. 1. 安装JDK 安装jdk有很多方法,可以参考文博客使用yum安装CDH Hadoop集群中的jdk安装步骤,需要说明的是下面的zookeeper集群安装方法也可以参考此文. 不管你用什么方法,

storm文档(11)----搭建storm集群

转载请注明出处:http://blog.csdn.net/beitiandijun/article/details/41684717 源地址:http://storm.apache.org/documentation/Setting-up-a-Storm-cluster.html 本文叙述了storm集群搭建和运行步骤.如果你打算在AWS上进行的话,可以使用storm-deploy项目.storm-deploy在EC2上完全自动进行下载.配置.以及storm集群的安装等步骤.它也为你配置了Gan

Storm集群的安装配置

Storm集群的安装分为以下几步: 1.首先保证Zookeeper集群服务的正常运行以及必要组件的正确安装 2.释放压缩包 3.修改storm.yaml添加集群配置信息 4.使用storm脚本启动相应服务并查看服务状态 5.通过web查看storm集群的状态 安装Storm之前首先保证之前安装的Zookeeper服务正常运行,包括配置hosts映射,主机名修改,防火墙都已经设置完好 Storm是由java编写,因此必须依赖JDK运行,系统首先应正确安装JDK 部分需要依赖Python,红帽系列L

Storm 集群安装配置

本文详细介绍了 Storm 集群的安装配置方法.如果需要在 AWS 上安装 Storm,你应该看一下 storm-deploy 项目.storm-deploy 可以自动完成 E2 上 Storm 集群的准备.配置.安装的全部过程,同时还设置好了 Ganglia,方便监控 CPU.磁盘以及网络的使用信息. 如果你在使用 Storm 集群时遇到问题,请先查看“问题与解决”一文中是否已有相应的解决方案.如果检索不到有效的解决方法,请向社区的邮件列表发送关于问题的邮件. 以下是安装 Storm 的步骤:

HeartBeat 集群组件概述

Heartbeat 是一个基于Linux开源的高可用集群系统.主要包括心跳服务和资源接管两个高可用集群组件.心跳监测服务可以通过网络链路和串口进行,而且支持冗余链路, 它们之间相互发送报文来告诉对方自己当前的状态,如果在指定的时间内未收到对方发送的报文,那么就认为对方失效,这时需启动资源接管模块来接管运行在对方主机上的资源或者服务.本文简要描述了heartbeat v2集群架构组件及其相关概念,供大家参考. 一.高可用集群的特点 高可用服务 通常使用集群方式实现,这也是集群的最大作用和体现. 其

Storm集群上的开发 ,任务计算输出到mysql数据库,集成jdbc(十)

storm集成jdbc,把计算结果保存到mysql中. 首先在mysql中建表 ,表的字段与输出的tuple的schema一致: create table result( word varchar(20), total int ); 编写一个连接提供器,用于获取mysql数据库连接: 需要引入jar :/usr/local/apps/apache-storm-1.0.3/external/storm-jdbc 的 storm-jdbc-1.0.3.jar package mystorm.word