Storm入门(十四)Trident API Overview

The core data model in Trident is the "Stream", processed as a series of batches. A stream is partitioned among the nodes in the cluster, and operations applied to a stream are applied in parallel across each partition.

There are five kinds of operations in Trident:

  1. Operations that apply locally to each partition and cause no network transfer 没有网络传输
  2. Repartitioning operations that repartition a stream but otherwise don‘t change the contents (involves network transfer) 产生网络传输
  3. Aggregation operations that do network transfer as part of the operation 网络传输是操作的一部分
  4. Operations on grouped streams
  5. Merges and joins

Partition-local operations

Partition-local operations involve no network transfer and are applied to each batch partition independently.

Functions

A function takes in a set of input fields and emits zero or more tuples as output. The fields of the output tuple are appended to the original input tuple in the stream. If a function emits no tuples, the original input tuple is filtered out. Otherwise, the input tuple is duplicated for each output tuple. Suppose you have this function:

输入流字段会在输出流中得以保存。

public class MyFunction extends BaseFunction {
    public void execute(TridentTuple tuple, TridentCollector collector) {
        for(int i=0; i < tuple.getInteger(0); i++) {
            collector.emit(new Values(i));
        }
    }
}

Now suppose you have a stream in the variable "mystream" with the fields ["a", "b", "c"] with the following tuples:

[1, 2, 3]
[4, 1, 6]
[3, 0, 8]

If you run this code:

mystream.each(new Fields("b"), new MyFunction(), new Fields("d")))

The resulting tuples would have fields ["a", "b", "c", "d"] and look like this:

[1, 2, 3, 0]
[1, 2, 3, 1]
[4, 1, 6, 0]
Filters

Filters take in a tuple as input and decide whether or not to keep that tuple or not. Suppose you had this filter:

public class MyFilter extends BaseFilter {
    public boolean isKeep(TridentTuple tuple) {
        return tuple.getInteger(0) == 1 && tuple.getInteger(1) == 2;
    }
}

Now suppose you had these tuples with fields ["a", "b", "c"]:

[1, 2, 3]
[2, 1, 1]
[2, 3, 4]

If you ran this code:

mystream.filter(new MyFilter())

The resulting tuples would be:

[1, 2, 3]
map and flatMap

map returns a stream consisting of the result of applying the given mapping function to the tuples of the stream. This can be used to apply a one-one transformation to the tuples.

For example, if there is a stream of words and you wanted to convert it to a stream of upper case words, you could define a mapping function as follows,

public class UpperCase extends MapFunction {
 @Override
 public Values execute(TridentTuple input) {
   return new Values(input.getString(0).toUpperCase());
 }
}

The mapping function can then be applied on the stream to produce a stream of uppercase words.

mystream.map(new UpperCase())

flatMap is similar to map but has the effect of applying a one-to-many transformation to the values of the stream, and then flattening the resulting elements into a new stream.

For example, if there is a stream of sentences and you wanted to convert it to a stream of words, you could define a flatMap function as follows,

public class Split extends FlatMapFunction {
  @Override
  public Iterable<Values> execute(TridentTuple input) {
    List<Values> valuesList = new ArrayList<>();
    for (String word : input.getString(0).split(" ")) {
      valuesList.add(new Values(word));
    }
    return valuesList;
  }
}

The flatMap function can then be applied on the stream of sentences to produce a stream of words,

mystream.flatMap(new Split())

Of course these operations can be chained, so a stream of uppercase words can be obtained from a stream of sentences as follows,

mystream.flatMap(new Split()).map(new UpperCase())

If you don‘t pass output fields as parameter, map and flatMap preserves the input fields to output fields.

If you want to apply MapFunction or FlatMapFunction with replacing old fields with new output fields, you can call map / flatMap with additional Fields parameter as follows,

mystream.map(new UpperCase(), new Fields("uppercased"))

Output stream wil have only one output field "uppercased" regardless of what output fields previous stream had. Same thing applies to flatMap, so following is valid as well,

mystream.flatMap(new Split(), new Fields("word"))
peek

peek can be used to perform an additional action on each trident tuple as they flow through the stream. This could be useful for debugging to see the tuples as they flow past a certain point in a pipeline.

For example, the below code would print the result of converting the words to uppercase before they are passed to groupBy

可以用来测试。

java mystream.flatMap(new Split()).map(new UpperCase())
    .peek(new Consumer() {
        @Override public void accept(TridentTuple input) {
            System.out.println(input.getString(0)); }
        })
    .groupBy(new Fields("word"))
    .persistentAggregate(new MemoryMapState.Factory(), new Count(), new Fields("count"))
min and minBy

min and minBy operations return minimum value on each partition of a batch of tuples in a trident stream.

Suppose, a trident stream contains fields ["device-id", "count"] and the following partitions of tuples

Partition 0:
[123, 2]
[113, 54]
[23,  28]
[237, 37]
[12,  23]
[62,  17]
[98,  42]

Partition 1:
[64,  18]
[72,  54]
[2,   28]
[742, 71]
[98,  45]
[62,  12]
[19,  174]

Partition 2:
[27,  94]
[82,  23]
[9,   86]
[53,  71]
[74,  37]
[51,  49]
[37,  98]

minBy operation can be applied on the above stream of tuples like below which results in emitting tuples with minimum values of count field in each partition.

mystream.minBy(new Fields("count"))

Result of the above code on mentioned partitions is:

Partition 0:
[123, 2]

Partition 1:
[62,  12]

Partition 2:
[82,  23]

You can look at other min and minBy operations on Stream

public <T> Stream minBy(String inputFieldName, Comparator<T> comparator)
public Stream min(Comparator<TridentTuple> comparator)

Below example shows how these APIs can be used to find minimum using respective Comparators on a tuple.

FixedBatchSpout spout = new FixedBatchSpout(allFields, 10, Vehicle.generateVehicles(20));

        TridentTopology topology = new TridentTopology();
        Stream vehiclesStream = topology.newStream("spout1", spout).
                each(allFields, new Debug("##### vehicles"));

        Stream slowVehiclesStream =
                vehiclesStream
                        .min(new SpeedComparator()) // Comparator w.r.t speed on received tuple.
                        .each(vehicleField, new Debug("#### slowest vehicle"));

        vehiclesStream
                .minBy(Vehicle.FIELD_NAME, new EfficiencyComparator()) // Comparator w.r.t efficiency on received tuple.
                .each(vehicleField, new Debug("#### least efficient vehicle"));

Example applications of these APIs can be located at TridentMinMaxOfDevicesTopology and TridentMinMaxOfVehiclesTopology

max and maxBy

max and maxBy operations return maximum value on each partition of a batch of tuples in a trident stream.

Suppose, a trident stream contains fields ["device-id", "count"] as mentioned in the above section.

max and maxBy operations can be applied on the above stream of tuples like below which results in emitting tuples with maximum values of count field for each partition.

mystream.maxBy(new Fields("count"))

Result of the above code on mentioned partitions is:

Partition 0:
[113, 54]

Partition 1:
[19,  174]

Partition 2:
[37,  98]

You can look at other max and maxBy functions on Stream

public <T> Stream maxBy(String inputFieldName, Comparator<T> comparator)
      public Stream max(Comparator<TridentTuple> comparator)

Below example shows how these APIs can be used to find maximum using respective Comparators on a tuple.

FixedBatchSpout spout = new FixedBatchSpout(allFields, 10, Vehicle.generateVehicles(20));

        TridentTopology topology = new TridentTopology();
        Stream vehiclesStream = topology.newStream("spout1", spout).
                each(allFields, new Debug("##### vehicles"));

        vehiclesStream
                .max(new SpeedComparator()) // Comparator w.r.t speed on received tuple.
                .each(vehicleField, new Debug("#### fastest vehicle"))
                .project(driverField)
                .each(driverField, new Debug("##### fastest driver"));

        vehiclesStream
                .maxBy(Vehicle.FIELD_NAME, new EfficiencyComparator()) // Comparator w.r.t efficiency on received tuple.
                .each(vehicleField, new Debug("#### most efficient vehicle"));

Example applications of these APIs can be located at TridentMinMaxOfDevicesTopology and TridentMinMaxOfVehiclesTopology

Windowing

Trident streams can process tuples in batches which are of the same window and emit aggregated result to the next operation. There are two kinds of windowing supported which are based on processing time or tuples count: 1. Tumbling window 2. Sliding window

Tumbling window

Tuples are grouped in a single window based on processing time or count. Any tuple belongs to only one of the windows.

/**
     * Returns a stream of tuples which are aggregated results of a tumbling window with every {@code windowCount} of tuples.
     */
    public Stream tumblingWindow(int windowCount, WindowsStoreFactory windowStoreFactory,
                                      Fields inputFields, Aggregator aggregator, Fields functionFields);

    /**
     * Returns a stream of tuples which are aggregated results of a window that tumbles at duration of {@code windowDuration}
     */
    public Stream tumblingWindow(BaseWindowedBolt.Duration windowDuration, WindowsStoreFactory windowStoreFactory,
                                     Fields inputFields, Aggregator aggregator, Fields functionFields);
Sliding window

Tuples are grouped in windows and window slides for every sliding interval. A tuple can belong to more than one window.

/**
     * Returns a stream of tuples which are aggregated results of a sliding window with every {@code windowCount} of tuples
     * and slides the window after {@code slideCount}.
     */
    public Stream slidingWindow(int windowCount, int slideCount, WindowsStoreFactory windowStoreFactory,
                                      Fields inputFields, Aggregator aggregator, Fields functionFields);

    /**
     * Returns a stream of tuples which are aggregated results of a window which slides at duration of {@code slidingInterval}
     * and completes a window at {@code windowDuration}
     */
    public Stream slidingWindow(BaseWindowedBolt.Duration windowDuration, BaseWindowedBolt.Duration slidingInterval,
                                    WindowsStoreFactory windowStoreFactory, Fields inputFields, Aggregator aggregator, Fields functionFields);

Examples of tumbling and sliding windows can be found here

Common windowing API

Below is the common windowing API which takes WindowConfig for any supported windowing configurations.

public Stream window(WindowConfig windowConfig, WindowsStoreFactory windowStoreFactory, Fields inputFields,
                         Aggregator aggregator, Fields functionFields)

windowConfig can be any of the below. - SlidingCountWindow.of(int windowCount, int slidingCount) - SlidingDurationWindow.of(BaseWindowedBolt.Duration windowDuration, BaseWindowedBolt.Duration slidingDuration) - TumblingCountWindow.of(int windowLength) - TumblingDurationWindow.of(BaseWindowedBolt.Duration windowLength)

Trident windowing APIs need WindowsStoreFactory to store received tuples and aggregated values. Currently, basic implementation for HBase is given with HBaseWindowsStoreFactory. It can further be extended to address respective usecases. Example of using HBaseWindowStoreFactory for windowing can be seen below.

// window-state table should already be created with cf:tuples column
    HBaseWindowsStoreFactory windowStoreFactory = new HBaseWindowsStoreFactory(new HashMap<String, Object>(), "window-state", "cf".getBytes("UTF-8"), "tuples".getBytes("UTF-8"));
    FixedBatchSpout spout = new FixedBatchSpout(new Fields("sentence"), 3, new Values("the cow jumped over the moon"),
            new Values("the man went to the store and bought some candy"), new Values("four score and seven years ago"),
            new Values("how many apples can you eat"), new Values("to be or not to be the person"));
    spout.setCycle(true);

    TridentTopology topology = new TridentTopology();

    Stream stream = topology.newStream("spout1", spout).parallelismHint(16).each(new Fields("sentence"),
            new Split(), new Fields("word"))
            .window(TumblingCountWindow.of(1000), windowStoreFactory, new Fields("word"), new CountAsAggregator(), new Fields("count"))
            .peek(new Consumer() {
                @Override
                public void accept(TridentTuple input) {
                    LOG.info("Received tuple: [{}]", input);
                }
            });

    StormTopology stormTopology =  topology.build();

Detailed description of all the above APIs in this section can be found here

Example applications

Example applications of these APIs are located at TridentHBaseWindowingStoreTopology and TridentWindowingInmemoryStoreTopology

partitionAggregate

partitionAggregate runs a function on each partition of a batch of tuples. Unlike functions, the tuples emitted by partitionAggregate replace the input tuples given to it. Consider this example:

mystream.partitionAggregate(new Fields("b"), new Sum(), new Fields("sum"))

Suppose the input stream contained fields ["a", "b"] and the following partitions of tuples:

Partition 0:
["a", 1]
["b", 2]

Partition 1:
["a", 3]
["c", 8]

Partition 2:
["e", 1]
["d", 9]
["d", 10]

Then the output stream of that code would contain these tuples with one field called "sum":

Partition 0:
[3]

Partition 1:
[11]

Partition 2:
[20]

There are three different interfaces for defining aggregators: CombinerAggregator, ReducerAggregator, and Aggregator.

Here‘s the interface for CombinerAggregator:

public interface CombinerAggregator<T> extends Serializable {
    T init(TridentTuple tuple);
    T combine(T val1, T val2);
    T zero();
}

A CombinerAggregator returns a single tuple with a single field as output. CombinerAggregators run the init function on each input tuple and use the combine function to combine values until there‘s only one value left. If there‘s no tuples in the partition, the CombinerAggregator emits the output of the zero function. For example, here‘s the implementation of Count:

public class Count implements CombinerAggregator<Long> {
    public Long init(TridentTuple tuple) {
        return 1L;
    }

    public Long combine(Long val1, Long val2) {
        return val1 + val2;
    }

    public Long zero() {
        return 0L;
    }
}

The benefits of CombinerAggregators are seen when you use them with the aggregate method instead of partitionAggregate. In that case, Trident automatically optimizes the computation by doing partial aggregations before transferring tuples over the network.

A ReducerAggregator has the following interface:

public interface ReducerAggregator<T> extends Serializable {
    T init();
    T reduce(T curr, TridentTuple tuple);
}

A ReducerAggregator produces an initial value with init, and then it iterates on that value for each input tuple to produce a single tuple with a single value as output. For example, here‘s how you would define Count as a ReducerAggregator:

init中初始化为0,然后在reduce中,每个tuple加1。

public class Count implements ReducerAggregator<Long> {
    public Long init() {
        return 0L;
    }

    public Long reduce(Long curr, TridentTuple tuple) {
        return curr + 1;
    }
}

ReducerAggregator can also be used with persistentAggregate, as you‘ll see later.

The most general interface for performing aggregations is Aggregator, which looks like this:

public interface Aggregator<T> extends Operation {
    T init(Object batchId, TridentCollector collector);
    void aggregate(T state, TridentTuple tuple, TridentCollector collector);
    void complete(T state, TridentCollector collector);
}

Aggregators can emit any number of tuples with any number of fields. They can emit tuples at any point during execution. Aggregators execute in the following way:

  1. The init method is called before processing the batch. The return value of init is an Object that will represent the state of the aggregation and will be passed into the aggregate and complete methods.
  2. The aggregate method is called for each input tuple in the batch partition. This method can update the state and optionally emit tuples.
  3. The complete method is called when all tuples for the batch partition have been processed by aggregate.

Here‘s how you would implement Count as an Aggregator:

public class CountAgg extends BaseAggregator<CountState> {
    static class CountState {
        long count = 0;
    }

    public CountState init(Object batchId, TridentCollector collector) {
        return new CountState();
    }

    public void aggregate(CountState state, TridentTuple tuple, TridentCollector collector) {
        state.count+=1;
    }

    public void complete(CountState state, TridentCollector collector) {
        collector.emit(new Values(state.count));
    }
}

Sometimes you want to execute multiple aggregators at the same time. This is called chaining and can be accomplished like this:

mystream.chainedAgg()
        .partitionAggregate(new Count(), new Fields("count"))
        .partitionAggregate(new Fields("b"), new Sum(), new Fields("sum"))
        .chainEnd()

This code will run the Count and Sum aggregators on each partition. The output will contain a single tuple with the fields ["count", "sum"].

stateQuery and partitionPersist

stateQuery and partitionPersist query and update sources of state, respectively. You can read about how to use them on Trident state doc.

projection

The projection method on Stream keeps only the fields specified in the operation. If you had a Stream with fields ["a", "b", "c", "d"] and you ran this code:

mystream.project(new Fields("b", "d"))

The output stream would contain only the fields ["b", "d"].

Repartitioning operations

Repartitioning operations run a function to change how the tuples are partitioned across tasks. The number of partitions can also change as a result of repartitioning (for example, if the parallelism hint is greater after repartioning). Repartitioning requires network transfer. Here are the repartitioning functions:

  1. shuffle: Use random round robin algorithm to evenly redistribute tuples across all target partitions
  2. broadcast: Every tuple is replicated to all target partitions. This can useful during DRPC – for example, if you need to do a stateQuery on every partition of data.
  3. partitionBy: partitionBy takes in a set of fields and does semantic partitioning based on that set of fields. The fields are hashed and modded by the number of target partitions to select the target partition. partitionBy guarantees that the same set of fields always goes to the same target partition.
  4. global: All tuples are sent to the same partition. The same partition is chosen for all batches in the stream.
  5. batchGlobal: All tuples in the batch are sent to the same partition. Different batches in the stream may go to different partitions.
  6. partition: This method takes in a custom partitioning function that implements org.apache.storm.grouping.CustomStreamGrouping

Aggregation operations

Trident has aggregate and persistentAggregate methods for doing aggregations on Streams. aggregate is run on each batch of the stream in isolation, while persistentAggregate will aggregation on all tuples across all batches in the stream and store the result in a source of state.

Running aggregate on a Stream does a global aggregation. When you use a ReducerAggregator or an Aggregator, the stream is first repartitioned into a single partition, and then the aggregation function is run on that partition. When you use a CombinerAggregator, on the other hand, first Trident will compute partial aggregations of each partition, then repartition to a single partition, and then finish the aggregation after the network transfer. CombinerAggregator‘s are far more efficient and should be used when possible.

Here‘s an example of using aggregate to get a global count for a batch:

mystream.aggregate(new Count(), new Fields("count"))

Like partitionAggregate, aggregators for aggregate can be chained. However, if you chain a CombinerAggregator with a non-CombinerAggregator, Trident is unable to do the partial aggregation optimization.

You can read more about how to use persistentAggregate in the Trident state doc.

Operations on grouped streams

The groupBy operation repartitions the stream by doing a partitionBy on the specified fields, and then within each partition groups tuples together whose group fields are equal. For example, here‘s an illustration of a groupBy operation:

If you run aggregators on a grouped stream, the aggregation will be run within each group instead of against the whole batch. persistentAggregate can also be run on a GroupedStream, in which case the results will be stored in a MapState with the key being the grouping fields. You can read more about persistentAggregate in the Trident state doc.

Like regular streams, aggregators on grouped streams can be chained.

Merges and joins

The last part of the API is combining different streams together. The simplest way to combine streams is to merge them into one stream. You can do that with the TridentTopology#merge method, like so:

topology.merge(stream1, stream2, stream3);

Trident will name the output fields of the new, merged stream as the output fields of the first stream.

Another way to combine streams is with a join. Now, a standard join, like the kind from SQL, require finite input. So they don‘t make sense with infinite streams. Joins in Trident only apply within each small batch that comes off of the spout.

Here‘s an example join between a stream containing fields ["key", "val1", "val2"] and another stream containing ["x", "val1"]:

topology.join(stream1, new Fields("key"), stream2, new Fields("x"), new Fields("key", "a", "b", "c"));

This joins stream1 and stream2 together using "key" and "x" as the join fields for each respective stream. Then, Trident requires that all the output fields of the new stream be named, since the input streams could have overlapping field names. The tuples emitted from the join will contain:

  1. First, the list of join fields. In this case, "key" corresponds to "key" from stream1 and "x" from stream2.
  2. Next, a list of all non-join fields from all streams, in order of how the streams were passed to the join method. In this case, "a" and "b" correspond to "val1" and "val2" from stream1, and "c" corresponds to "val1" from stream2.

When a join happens between streams originating from different spouts, those spouts will be synchronized with how they emit batches. That is, a batch of processing will include tuples from each spout.

You might be wondering – how do you do something like a "windowed join", where tuples from one side of the join are joined against the last hour of tuples from the other side of the join.

To do this, you would make use of partitionPersist and stateQuery. The last hour of tuples from one side of the join would be stored and rotated in a source of state, keyed by the join field. Then the stateQuery would do lookups by the join field to perform the "join".

时间: 2024-11-04 14:40:03

Storm入门(十四)Trident API Overview的相关文章

[WebGL入门]十四,绘制多边形

注:文章译自http://wgld.org/,原作者杉本雅広(doxas),文章中如果有我的额外说明,我会加上[lufy:],另外,鄙人webgl研究还不够深入,一些专业词语,如果翻译有误,欢迎大家指正. 这是本次的demo的运行结果 绘制流程 这次终于该绘制多边形了,之前的文章(十一,着色器的编译和连接)中介绍了HTML,顶点着色器和片段着色器,这次看一下javascript从开始到最终的全部处理.如果前两篇文章介绍的内容完全理解的话,这次的内容也应该不难了.或许会有不容易理解的地方,不要着急

Storm入门(四)WordCount示例

Storm API文档网址如下: http://storm.apache.org/releases/current/javadocs/index.html 一.关联代码 使用maven,代码如下. pom.xml  和Storm入门(三)HelloWorld示例相同 RandomSentenceSpout.java /** * Licensed to the Apache Software Foundation (ASF) under one * or more contributor lice

Android入门(十四)内容提供器-实现跨程序共享实例

原文链接:http://www.orlion.ga/661/ 打开SQLite博文中创建的 DatabaseDemo项目,首先将 MyDatabaseHelper中使用 Toast弹出创建数据库成功的提示去除掉,因为跨程序访问时我们不能直接使用 Toast.然后添加一个 DatabaseProvider类,代码如下所示: package ga.orlion.databasedemo; import android.content.ContentProvider; import android.c

Java基础学习笔记十四 常用API之基本类型包装类

基本类型包装类 Java中有8种基本的数据类型,可是这些数据是基本数据,想对其进行复杂操作,变的很难.怎么办呢?在实际程序使用中,程序界面上用户输入的数据都是以字符串类型进行存储的.而程序开发中,我们需要把字符串数据,根据需求转换成指定的基本数据类型,如年龄需要转换成int类型,考试成绩需要转换成double类型等.那么,想实现字符串与基本数据之间转换怎么办呢?Java中提供了相应的对象来解决该问题,基本数据类型对象包装类:java将基本数据类型值封装成了对象.封装成对象有什么好处?可以提供更多

Storm系列(十四)架构分析之Executor-输入和输出处理

Executor的数据 mk-executor-data函数用于定义Executor中含有的数据. Executor的输入处理 根据executor-id从Worker的:executor-receive-queue-map中获得Disruptor Queue 如下: 1  receive-queue ((:executor-receive-queue-map worker) executor-id) 说明: Worker的接收线程从ZMQ收到数据后,线程会根据目标的Task Id找到对应的Ex

AI - 深度学习入门十四章- 摘要1

原文链接:https://yq.aliyun.com/topic/111 01 - 一入侯门"深"似海,深度学习深几许 什么是"学习"? "如果一个系统,能够通过执行某个过程,就此改进了它的性能,那么这个过程就是学习". 学习的核心目的,就是改善性能. 什么是机器学习? 定义1: 对于计算机系统而言,通过运用数据及某种特定的方法(比如统计的方法或推理的方法),来提升机器系统的性能,就是机器学习. 定义2: 对于某类任务(Task,简称T)和某项性

【Git入门之十四】Git GUI

[Git入门之十四]Git GUI - JackyStudio - 博客频道 - CSDN.NET 文章都快写完了,Git GUI才浮出水面,很多人要骂我,这么方便的东西怎么不早拿出来. 当然命令行和图形界面各有千秋,个人觉得命令行更能让人清楚的掌握整个项目的代码线.当然萝卜青菜各有所爱,Windows,Linux也是一样.喜欢什么客官您挑.本文只做简单介绍. 1.Git GUI有什么? 霸气测漏的右键菜单和可视化管理界面. 2.初始化仓库 新建一个文件夹叫JackyGUI,右键点击Git In

HTML与CSS入门——第十四章  使用边距、填充、对齐和浮动

知识点: 1.在元素周围添加边距的方法 2.在元素中添加填充的方法 3.对齐的方法 4.float属性的使用 这里提到了CSS禅意花园,这块有时间可以玩玩~ margin和padding:用于添加元素周围的间距 14.1 使用边距 margin是边距 参数可以有[1~4]4种类型 14.2 填充元素 padding用于填充border与内容之间的空间 参数与margin一致 14.3 保持对齐 align,对齐: 有text-align,vertical-align 使用值根据自己的需求再自行研

Storm Trident API 实践

一.概要 1.1 Storm(简介) Storm是一个实时的可靠地分布式流计算框架. 具体就不多说了,举个例子,它的一个典型的大数据实时计算应用场景:从Kafka消息队列读取消息(可以是logs,clicks,sensor data).通过Storm对消息进行计算聚合等预处理.把处理结果持久化到NoSQL数据库或者HDFS做进一步深入分析. 1.2 Trident(简介) Trident是对Storm的更高一层的抽象,除了提供一套简单易用的流数据处理API之外,它以batch(一组tuples)