Storm专题二:Storm Trident API 使用详解

一、概述

Storm Trident中的核心数据模型就是“Stream”,也就是说,Storm Trident处理的是Stream,但是实际上Stream是被成批处理的,Stream被切分成一个个的Batch分布到集群中,所有应用在Stream上的函数最终会应用到每个节点的Batch中,实现并行计算,具体如下图所示:

在Trident中有五种操作类型:

  1. Apply Locally:本地操作,所有操作应用在本地节点数据上,不会产生网络传输
  2. Repartitioning:数据流重定向,单纯的改变数据流向,不会改变数据内容,这部分会有网络传输
  3. Aggragation:聚合操作,会有网络传输
  4. Grouped streams上的操作
  5. Merge和Join

小结:上面提到了Trident实际上是通过把函数应用到每个节点的Batch上的数据以实现并行,而应用的这些函数就是TridentAPI,下面我们就具体介绍一下TridentAPI的各种操作。

二、Trident五种操作详解

2.1 Apply Locally本地操作:操作都应用在本地节点的Batch上,不会产生网络传输

2.1.1 Functions:函数操作

函数的作用是接收一个tuple(需指定接收tuple的哪个字段),输出0个或多个tuples。输出的新字段值会被追加到原始输入tuple的后面,如果一个function不输出tuple,那就意味这这个tuple被过滤掉了,下面举例说明:

  • 定义一个Function:
    public class MyFunction extends BaseFunction {
      @Override
      public void execute(TridentTuple tuple, TridentCollector collector) {
           for ( int i = 0; i < tuple.getInteger(0); i++) {
              collector.emit( new Values(i));
          }
     }
   }

小结:Function实际上就是对经过Function函的tuple做一些操作以改变其内容。

  • 比如我们处理一个“mystream”的数据流,它有三个字段分别是[“a”, “b”, “c”] ,数据流中tuple的内容是:

[1,
2, 3] [4, 1, 6] [3, 0, 8]

  • 我们运行我们的Function:
 java mystream.each(new Fields("b"), new MyFunction(), new Fields("d")));

它意思是接收输入的每个tuple “b”字段得值,把函数结算结果做为新字段“d”追加到每个tuple后面,然后发射出去。

  • 最终运行结果会是每个tuple有四个字段[“a”, “b”, “c”, “d”],每个tuple的内容变成了:

 [1,
2, 3, 0] [1, 2, 3, 1] [4, 1, 6, 0]

2.1.2 Filters:过滤操作

  • Filters很简单,接收一个tuple并决定是否保留这个tuple。举个例子,定义一个Filter:
 public class MyFilter extends BaseFilter {
     public boolean isKeep(TridentTuple tuple) {
           return tuple.getInteger(0) == 1 && tuple.getInteger(1) == 2;
     }
   }
  • 假设我们的tuples有这个几个字段 [“a”,
    “b”, “c”]:

[1, 2, 3] [2, 1, 1] [2, 3, 4]

  • 然后运行我们的Filter:
 java mystream.each(new Fields("b", "a"), new MyFilter());
  • 则最终得到的tuple是 :

[2,
1, 1]

说明第一个和第三个不满足条件,都被过滤掉了。

小结:Filter就是一个过滤器,它决定是否需要保留当前tuple。

2.1.3 PartitionAggregate

PartitionAggregate的作用对每个Partition中的tuple进行聚合,与前面的函数在原tuple后面追加数据不同,PartitionAggregate的输出会直接替换掉输入的tuple,仅数据PartitionAggregate中发射的tuple。下面举例说明:

  • 定义一个累加的PartitionAggregate:
java mystream.partitionAggregate(new Fields("b"), new Sum(), new Fields("sum"));
  • 假设我们的Stream包含两个字段 [“a”,
    “b”],各个Partition的tuple内容是:

```
Partition 0: [“a”, 1] [“b”, 2]

Partition 1: [“a”, 3] [“c”, 8]

Partition 2: [“e”, 1] [“d”, 9] [“d”, 10] ```

  • 输出的内容只有一个字段“sum”,值是:

```
Partition 0: [3]

Partition 1: [11]

Partition 2: [20] ```

TridentAPI提供了三个聚合器的接口:CombinerAggregator, ReducerAggregator,
and Aggregator.

我们先看一下CombinerAggregator接口:

 public interface CombinerAggregator <T> extends Serializable {
         T init(TridentTuple tuple);
         T combine(T val1, T val2);
         T zero();
    }

CombinerAggregator接口只返回一个tuple,并且这个tuple也只包含一个field。init方法会先执行,它负责预处理每一个接收到的tuple,然后再执行combine函数来计算收到的tuples直到最后一个tuple到达,当所有tuple处理完时,CombinerAggregator会发射zero函数的输出,举个例子:

  • 定义一个CombinerAggregator实现来计数:
 public class CombinerCount implements CombinerAggregator<Integer>{
     @Override
     public Integer init(TridentTuple tuple) {
           return 1;
     }
     @Override
     public Integer combine(Integer val1, Integer val2) {

           return val1 + val2;
     }
     @Override
     public Integer zero() {
           return 0;
     }
   }

小结:当你使用aggregate 方法代替PartitionAggregate时,CombinerAggregator的好处就体现出来了,因为Trident会自动优化计算,在网络传输tuples之前做局部聚合。

我们再看一下ReducerAggregator:

 public interface ReducerAggregator <T> extends Serializable {
         T init();
         T reduce(T curr, TridentTuple tuple);
     }

ReducerAggregator通过init方法提供一个初始值,然后为每个输入的tuple迭代这个值,最后生产处一个唯一的tuple输出,下面举例说明:

  • 定义一个ReducerAggregator接口实现技术器的例子:
 public class ReducerCount implements ReducerAggregator<Long>{
     @Override
     public Long init() {
           return 0L;
     }
     @Override
     public Long reduce(Long curr, TridentTuple tuple) {
           return curr + 1;
     }
 }
最后一个是Aggregator接口,它是最通用的聚合器,它的形式如下:
  public interface Aggregator<T> extends Operation {
        T init(Object batchId, TridentCollector collector);
        void aggregate(T val, TridentTuple tuple, TridentCollector collector);
        void complete(T val, TridentCollector collector);
   }

Aggregator接口可以发射含任意数量属性的任意数据量的tuples,并且可以在执行过程中的任何时候发射:

  1. init:在处理数据之前被调用,它的返回值会作为一个状态值传递给aggregate和complete方法
  2. aggregate:用来处理每一个输入的tuple,它可以更新状态值也可以发射tuple
  3. complete:当所有tuple都被处理完成后被调用
    下面举例说明:
  • 定义一个实现来完成一个计数器:
   public class CountAgg extends BaseAggregator<CountState>{
     static class CountState { long count = 0; }
     @Override
     public CountState init(Object batchId, TridentCollector collector) {
           return new CountState();
     }
     @Override
     public void aggregate(CountState val, TridentTuple tuple, TridentCollector collector) {
          val. count+=1;
     }
     @Override
     public void complete(CountState val, TridentCollector collector) {
          collector.emit( new Values(val. count));
     }
  }
    有时候我们需要同时执行多个聚合器,这在Trident中被称作chaining,使用方法如下:
java mystream.chainedAgg() .partitionAggregate(new Count(), new Fields("count")) .partitionAggregate(new Fields("b"), new Sum(), new Fields("sum")) .chainEnd();
    这点代码会在每个Partition上运行count和sum函数,最终输出一个tuple:[“count”, “sum”]
projection:投影操作
     投影操作作用是仅保留Stream指定字段的数据,比如有一个Stream包含如下字段: [“a”, “b”, “c”, “d”]
     运行如下代码:   
java mystream.project(new Fields("b", "d"))
    则输出的流仅包含 [“b”, “d”]字段。
2.2 Repartitioning重定向操作
     重定向操作是如何在各个任务间对tuples进行分区。分区的数量也有可能改变重定向的结果。重定向需要网络传输,下面介绍下重定向函数:
  1. shuffle:通过随机分配算法来均衡tuple到各个分区
  2. broadcast:每个tuple都被广播到所有的分区,这种方式在drcp时非常有用,比如在每个分区上做stateQuery
  3. partitionBy:根据指定的字段列表进行划分,具体做法是用指定字段列表的hash值对分区个数做取模运算,确保相同字段列表的数据被划分到同一个分区
  4. global:所有的tuple都被发送到一个分区,这个分区用来处理整个Stream
  5. batchGlobal:一个Batch中的所有tuple都被发送到同一个分区,不同的Batch会去往不同的分区
  6. Partition:通过一个自定义的分区函数来进行分区,这个自定义函数实现了 backtype.storm.grouping.CustomStreamGrouping

2.3 Aggragation聚合操作

     Trident有aggregate和 persistentAggregate方法来做聚合操作。aggregate是独立的运行在Stream的每个Batch上的,而persistentAggregate则是运行在Stream的所有Batch上并把运算结果存储在state source中。
     运行aggregate方法做全局聚合。当你用到  ReducerAggregator或Aggregator时,Stream首先被重定向到一个分区中,然后其中的聚合函数便在这个分区上运行。当你用到CombinerAggregator时,Trident会首先在每个分区上做局部聚合,然后把局部聚合后的结果重定向到一个分区,因此使用CombinerAggregator会更高效,可能的话我们需要优先考虑使用它。
     下面举个例子来说明如何用aggregate进行全局计数:
java mystream.aggregate(new Count(), new Fields("count"));

和paritionAggregate一样,aggregators的聚合也可以串联起来,但是如果你把一个 CombinerAggregator和一个非CombinerAggregator串联在一起,Trident是无法完成局部聚合优化的。

2.4 grouped streams
      GroupBy操作是根据特定的字段对流进行重定向的,还有,在一个分区内部,每个相同字段的tuple也会被Group到一起,下面这幅图描述了这个场景:

     如果你在grouped Stream上面运行aggregators,聚合操作会运行在每个Group中而不是整个Batch。persistentAggregate也能运行在GroupedSteam上,不过结果会被保存在MapState中,其中的key便是分组的字段。
     当然,aggregators在GroupedStreams上也可以串联。
2.5 Merge和Joins:
api的最后一部分便是如何把各种流汇聚到一起。最简单的方式就是把这些流汇聚成一个流。我们可以这么做:
java topology.merge(stream1, stream2, stream3);

另一种合并流的方式就是join。一个标准的join就像是一个sql,必须有标准的输入,因此,join只针对符合条件的Stream。join应用在来自Spout的每一个小Batch中。join时候的tuple会包含:  

 1. join的字段,如Stream1中的key和Stream2中的x    
 2. 所有非join的字段,根据传入join方法的顺序,a和b分别代表steam1的val1和val2,c代表Stream2的val1          
     当join的是来源于不同Spout的stream时,这些Spout在发射数据时需要同步,一个Batch所包含的tuple会来自各个Spout。     
				
时间: 2024-10-01 02:17:12

Storm专题二:Storm Trident API 使用详解的相关文章

专题二、ArrayList序列化技术细节详解

一.绪论 所谓的JAVA序列化与反序列化,序列化就是将JAVA 对象以一种的形式保持,比如存放到硬盘,或是用于传输.反序列化是序列化的一个逆过程. JAVA规定被序列化的对象必须实现java.io.Serializable这个接口,而我们分析的目标ArrayList同样实现了该接口. 通过对ArrayList源码的分析,可以知道ArrayList的数据存储都是依赖于elementData数组,它的声明为: transient Object[] elementData; 注意transient修饰

Android 开源框架Universal-Image-Loader完全解析(二)--- 图片缓存策略详解

本篇文章继续为大家介绍Universal-Image-Loader这个开源的图片加载框架,介绍的是图片缓存策略方面的,如果大家对这个开源框架的使用还不了解,大家可以看看我之前写的一篇文章Android 开源框架Universal-Image-Loader完全解析(一)--- 基本介绍及使用,我们一般去加载大量的图片的时候,都会做缓存策略,缓存又分为内存缓存和硬盘缓存,我之前也写了几篇异步加载大量图片的文章,使用的内存缓存是LruCache这个类,LRU是Least Recently Used 近

Android API Levels 详解

Android API Levels 当你开发你的Android应用程序时,了解该平台API变更管理的基本方法和概念是很有帮助的.同样的,知道API级别标识以及该标识如何保障你的应用与实际硬件设备相兼容对于开发及后续的发布.维护都是有益的. 本节内容告诉你API级别的知识,以及它如何影响你开发和使用的应用. 关于如何使用“以API级别进行过滤”来使用API参考手册,从本文末尾的文档过滤(Filtering the documentation)中可以得到更多信息. API级别是什么?(What i

spark读写压缩文件API使用详解

最近研究了下Spark如何读写压缩格式的文件,主要有如下三种方式,这里以lzo方式压缩为例     /*******************old hadoop api*************************/     val confHadoop = new JobConf     confHadoop.set("mapred.output.compress", "true")     confHadoop.set("mapred.output

FastDFS安装、配置、部署(二)-Tracker配置详解

1.基本配置 # is this config file disabled # false for enabled # true for disabled disabled=false # bind an address of this host # empty for bind all addresses of this host bind_addr=10.16.123.132 # the tracker server pfort port=22122 # connect timeout in

storm集群部署和配置过程详解

---恢复内容开始--- 先整体介绍一下搭建storm集群的步骤: 设置zookeeper集群 安装依赖到所有nimbus和worker节点 下载并解压storm发布版本到所有nimbus和worker节点 配置storm.yaml 启动相关后台进程 1 配置zookeeper集群 我们知道storm通过zookeeper来协调整个集群.zookeeper不是用来做消息传递,因此storm不会给zookeeper带来很大的压力.单节点的zookeeper在大多情形下是可以胜任的,但是如果你想得到

Hibernate3 Api,配置文件详解

1 api详解[多练] 1.1 体系结构 PO:persistent object ,用于与数据库交互数据.--dao层  (JavaBean + hbm ) BO:Business object 业务数据对象.--service层 VO:Value Object 值对象.--web层 开发中:直接使用JavaBean 描述三个对象. 1.2 Configuration 配置对象 l hibernate 核心配置文件种类 hibernate.cfg.xml 通常使用xml配置文件,可以配置内容更

160329(二)、web.xml配置详解

1.启动一个WEB项目的时候,WEB容器会去读取它的配置文件web.xml,读取<listener>和<context-param>两个结点. 2.紧急着,容创建一个ServletContext(servlet上下文),这个web项目的所有部分都将共享这个上下文. 3.容器将<context-param>转换为键值对,并交给servletContext. 4.容器创建<listener>中的类实例,创建监听器. 二  Load-on-startup Load

Netty 中文教程 (二) Hello World !详解

1.HelloServer 详解 HelloServer首先定义了一个静态终态的变量---服务端绑定端口7878.至于为什么是这个7878端口,纯粹是笔者个人喜好.大家可以按照自己的习惯选择端口.当然了.常用的几个端口(例如:80,8080,843(Flash及Silverlight策略文件请求端口等等),3306(Mysql数据库占用端口))最好就不要占用了,避免一些奇怪的问题. HelloServer类里面的代码并不多.只有一个main函数,加上内部短短的几行代码. Main函数开始的位置定