Storm Trident示例function, filter, projection

以下代码演示function, filter, projection的使用,可结合注释

省略部分代码,省略部分可参考:https://blog.csdn.net/nickta/article/details/79666918

FixedBatchSpout spout = new FixedBatchSpout(new Fields("user", "score"), 3,
                new Values("nickt1", 4),
                new Values("nickt2", 7),
                new Values("nickt3", 8),
                new Values("nickt4", 9),
                new Values("nickt5", 7),
                new Values("nickt6", 11),
                new Values("nickt7", 5)
                );
        spout.setCycle(false);
        TridentTopology topology = new TridentTopology();
        topology.newStream("spout1", spout)
                .shuffle()
                .each(new Fields("user"),new BaseFilter() {
                    @Override
                    public boolean isKeep(TridentTuple tuple) {
                        if(tuple.getString(0).equals("nickt2")) {
                            return false;
                        }
                        return true;
                    }
                })//过滤点user为nickt2的tuple
                .each(new Fields("user", "score"),new Debug("filter print:"))
                .each(new Fields("score"), new BaseFunction() {

                    @Override
                    public void execute(TridentTuple tuple, TridentCollector collector) {
                        collector.emit(new Values(tuple.getIntegerByField("score") + 100));
                    }
                }, new Fields("sum"))//把score加上100后,生成新的sum字段,并追加到原字段后面,此步操作后就包括了user/score/sum三个字段
                .each(new Fields("user", "score", "sum"),new Debug("function print:"))
                .project(new Fields("user"))
                .each(new Fields("user"),new Debug("project print:"));//project投射之后,只有user字段了
                   

输出:

<Sat Mar 24 13:41:42 CST 2018[partition0-Thread-68-b-0-executor[33 33]]> DEBUG(filter print:): [nickt1, 4]
<Sat Mar 24 13:41:42 CST 2018[partition0-Thread-68-b-0-executor[33 33]]> DEBUG(function print:): [nickt1, 4, 104]
<Sat Mar 24 13:41:42 CST 2018[partition0-Thread-68-b-0-executor[33 33]]> DEBUG(project print:): [nickt1]
<Sat Mar 24 13:41:42 CST 2018[partition0-Thread-68-b-0-executor[33 33]]> DEBUG(filter print:): [nickt3, 8]
<Sat Mar 24 13:41:42 CST 2018[partition0-Thread-68-b-0-executor[33 33]]> DEBUG(function print:): [nickt3, 8, 108]
<Sat Mar 24 13:41:42 CST 2018[partition0-Thread-68-b-0-executor[33 33]]> DEBUG(project print:): [nickt3]
<Sat Mar 24 13:41:42 CST 2018[partition0-Thread-68-b-0-executor[33 33]]> DEBUG(filter print:): [nickt4, 9]
<Sat Mar 24 13:41:42 CST 2018[partition0-Thread-68-b-0-executor[33 33]]> DEBUG(function print:): [nickt4, 9, 109]
<Sat Mar 24 13:41:42 CST 2018[partition0-Thread-68-b-0-executor[33 33]]> DEBUG(project print:): [nickt4]
<Sat Mar 24 13:41:42 CST 2018[partition0-Thread-68-b-0-executor[33 33]]> DEBUG(filter print:): [nickt5, 7]
<Sat Mar 24 13:41:42 CST 2018[partition0-Thread-68-b-0-executor[33 33]]> DEBUG(function print:): [nickt5, 7, 107]
<Sat Mar 24 13:41:42 CST 2018[partition0-Thread-68-b-0-executor[33 33]]> DEBUG(project print:): [nickt5]
<Sat Mar 24 13:41:42 CST 2018[partition0-Thread-68-b-0-executor[33 33]]> DEBUG(filter print:): [nickt6, 11]
<Sat Mar 24 13:41:42 CST 2018[partition0-Thread-68-b-0-executor[33 33]]> DEBUG(function print:): [nickt6, 11, 111]
<Sat Mar 24 13:41:42 CST 2018[partition0-Thread-68-b-0-executor[33 33]]> DEBUG(project print:): [nickt6]
<Sat Mar 24 13:41:42 CST 2018[partition0-Thread-68-b-0-executor[33 33]]> DEBUG(filter print:): [nickt7, 5]
<Sat Mar 24 13:41:42 CST 2018[partition0-Thread-68-b-0-executor[33 33]]> DEBUG(function print:): [nickt7, 5, 105]
<Sat Mar 24 13:41:42 CST 2018[partition0-Thread-68-b-0-executor[33 33]]> DEBUG(project print:): [nickt7]

原文地址:https://www.cnblogs.com/nickt/p/8638663.html

时间: 2024-12-13 21:59:52

Storm Trident示例function, filter, projection的相关文章

storm trident 示例

Storm Trident的核心数据模型是一批一批被处理的“流”,“流”在集群的分区在集群的节点上,对“流”的操作也是并行的在每个分区上进行. Trident有五种对“流”的操作: 1.      不需要网络传输的本地批次运算 2.      需要网络传输的“重分布”操作,不改变数据的内容 3.      聚合操作,网络传输是该操作的一部分 4.      “流”分组(grouby)操作 5.      合并和关联操作 批次本地操作: 批次本地操作不需要网络传输,本格分区(partion)的运算

Storm Trident示例partitionBy

如下代码使用partitionBy做repartition, partitionBy即根据相应字段的值按一定算法,把tuple分配到目标partition当中(Target Partition = hash(fields) % (number of target partition)), 相同值会被分配到同一个partition当中,由于不同值有可能出现相同的hash, 根据上面的算法,不同的值,也可能分配到同一个partition中. 省略部分代码,省略部分可参考:https://blog.c

Storm Trident示例Aggregator

Aggregator首先在输入流上运行全局重新分区操作(global)将同一批次的所有分区合并到一个分区中,然后在每个批次上运行的聚合功能,针对Batch操作.与ReduceAggregator很相似. 省略部分代码,省略部分可参考:https://blog.csdn.net/nickta/article/details/79666918 static class State { int count = 0; } FixedBatchSpout spout = new FixedBatchSpo

storm trident function函数

package cn.crxy.trident; import java.util.List; import backtype.storm.Config; import backtype.storm.LocalCluster; import backtype.storm.generated.StormTopology; import backtype.storm.tuple.Fields; import backtype.storm.tuple.Values; import storm.trid

Storm专题二:Storm Trident API 使用详解

一.概述 Storm Trident中的核心数据模型就是"Stream",也就是说,Storm Trident处理的是Stream,但是实际上Stream是被成批处理的,Stream被切分成一个个的Batch分布到集群中,所有应用在Stream上的函数最终会应用到每个节点的Batch中,实现并行计算,具体如下图所示: 在Trident中有五种操作类型: Apply Locally:本地操作,所有操作应用在本地节点数据上,不会产生网络传输 Repartitioning:数据流重定向,单纯

storm trident merger

import java.util.List; import backtype.storm.Config; import backtype.storm.LocalCluster; import backtype.storm.generated.StormTopology; import backtype.storm.tuple.Fields; import backtype.storm.tuple.Values; import storm.trident.Stream; import storm.

Storm Trident 详细介绍

一.概要 1.1 Storm(简介) Storm是一个实时的可靠地分布式流计算框架. 具体就不多说了,举个例子,它的一个典型的大数据实时计算应用场景:从Kafka消息队列读取消息(可以是logs,clicks,sensor data):通过Storm对消息进行计算聚合等预处理:把处理结果持久化到NoSQL数据库或者HDFS做进一步深入分析. 1.2 Trident(简介) Trident是对Storm的更高一层的抽象,除了提供一套简单易用的流数据处理API之外,它以batch(一组tuples)

Storm Trident API 实践

一.概要 1.1 Storm(简介) Storm是一个实时的可靠地分布式流计算框架. 具体就不多说了,举个例子,它的一个典型的大数据实时计算应用场景:从Kafka消息队列读取消息(可以是logs,clicks,sensor data).通过Storm对消息进行计算聚合等预处理.把处理结果持久化到NoSQL数据库或者HDFS做进一步深入分析. 1.2 Trident(简介) Trident是对Storm的更高一层的抽象,除了提供一套简单易用的流数据处理API之外,它以batch(一组tuples)

[翻译][Trident] Storm Trident 详细介绍

1.Trident对storm提供了什么能力?2.Trident在如何最大程度的保证执行topogloy性能方面是非常智能的?3.storm如何保证每个消息都被处理一次? Trident是在storm基础上,一个以realtime 计算为目标的高度抽象. 它在提供处理大吞吐量数据能力的同时,也提供了低延时分布式查询和有状态流式处理的能力. 如果你对Pig和Cascading这种高级批量处理工具很了解的话,那么应该毕竟容易理解Trident,因为他们之间很多的概念和思想都是类似的.Tident提供