storm trident function函数

package cn.crxy.trident;

import java.util.List;

import backtype.storm.Config;

import backtype.storm.LocalCluster;

import backtype.storm.generated.StormTopology;

import backtype.storm.tuple.Fields;

import backtype.storm.tuple.Values;

import storm.trident.TridentTopology;

import storm.trident.operation.BaseFunction;

import storm.trident.operation.TridentCollector;

import storm.trident.testing.FixedBatchSpout;

import storm.trident.tuple.TridentTuple;

public class TridentLocalPologyFunction {

public static class SumBolt extends BaseFunction{

@Override

public void execute(TridentTuple tuple, TridentCollector collector) {

//List<Object> values = tuple.getValues();

//System.err.println("values="+values.toString() );

Integer value0 = tuple.getInteger(0);

// Integer value1 = tuple.getInteger(1);

// Integer value2 = tuple.getInteger(2);

// Integer value3 = tuple.getInteger(3);

//Integer value2 = tuple.getInteger(1);

System.err.println("value0="+value0 );

}

}

public static void main(String[] args) {

//输出为new Fields("sentence")

FixedBatchSpout spout = new FixedBatchSpout(new Fields("sentence"), 1, new Values(9999));

spout.setCycle(true);

TridentTopology tridentTopology = new TridentTopology();

tridentTopology.newStream("spout1", spout).each(new Fields("sentence"), new SumBolt(), new Fields(""));

LocalCluster localCluster = new LocalCluster();

localCluster.submitTopology("trident", new Config(), tridentTopology.build());

}

}

时间: 2024-12-09 07:58:34

storm trident function函数的相关文章

Storm Trident 详细介绍

一.概要 1.1 Storm(简介) Storm是一个实时的可靠地分布式流计算框架. 具体就不多说了,举个例子,它的一个典型的大数据实时计算应用场景:从Kafka消息队列读取消息(可以是logs,clicks,sensor data):通过Storm对消息进行计算聚合等预处理:把处理结果持久化到NoSQL数据库或者HDFS做进一步深入分析. 1.2 Trident(简介) Trident是对Storm的更高一层的抽象,除了提供一套简单易用的流数据处理API之外,它以batch(一组tuples)

Storm Trident API 实践

一.概要 1.1 Storm(简介) Storm是一个实时的可靠地分布式流计算框架. 具体就不多说了,举个例子,它的一个典型的大数据实时计算应用场景:从Kafka消息队列读取消息(可以是logs,clicks,sensor data).通过Storm对消息进行计算聚合等预处理.把处理结果持久化到NoSQL数据库或者HDFS做进一步深入分析. 1.2 Trident(简介) Trident是对Storm的更高一层的抽象,除了提供一套简单易用的流数据处理API之外,它以batch(一组tuples)

Storm专题二:Storm Trident API 使用详解

一.概述 Storm Trident中的核心数据模型就是"Stream",也就是说,Storm Trident处理的是Stream,但是实际上Stream是被成批处理的,Stream被切分成一个个的Batch分布到集群中,所有应用在Stream上的函数最终会应用到每个节点的Batch中,实现并行计算,具体如下图所示: 在Trident中有五种操作类型: Apply Locally:本地操作,所有操作应用在本地节点数据上,不会产生网络传输 Repartitioning:数据流重定向,单纯

storm trident 示例

Storm Trident的核心数据模型是一批一批被处理的“流”,“流”在集群的分区在集群的节点上,对“流”的操作也是并行的在每个分区上进行. Trident有五种对“流”的操作: 1.      不需要网络传输的本地批次运算 2.      需要网络传输的“重分布”操作,不改变数据的内容 3.      聚合操作,网络传输是该操作的一部分 4.      “流”分组(grouby)操作 5.      合并和关联操作 批次本地操作: 批次本地操作不需要网络传输,本格分区(partion)的运算

jquery中的 $(function(){ .. }) 函数

2017-04-29 在讲解jquery中的 $(function(){ .. }) 函数之前,我们先简单了解下匿名函数.匿名函数的形式为:(function(){ ... }),又如 function(arg){ ... };定义了 一个参数为 arg 的匿名函数,然后使用 (function(arg){ ... })(param) 来调用这个函数,其中 param 是传入这个匿名函数的参数. 但需要主要匿名函数与jquery中的 $(function(){ ...}) 函数的区别:$(fun

function 函数的使用

函数的构成 function+函数名(参数1,参数2){ 函数实现: } 函数名不能是数字开头,可以是字母和下划线: 函数的调用: 函数名(): 作用域 定义在函数外面的变量,称之为全局变量,整个文档都可以访问. 定义在函数里面的变量为局部变量,只能在该函数内部访问. var a=10; function aa(){ var a=20; alert(a); } alert(a); aa() 函数是一个数据类型,可以把它赋给变量 var f=function (b){ return (b=b+1)

Template function 函数模板用法

#include<iostream> using namespace std; const double PI = 3.1415926; template <class T> T min(T a[], int n){ int i; T minv = a[0]; for (i = 1; i < n; i++){ if (a[i] < minv) minv = a[i]; } return minv; } template<class T1> double Ci

Lua function 函数

Lua支持面向对象,操作符为冒号‘:’.o:foo(x) <==> o.foo(o, x). Lua程序可以调用C语言或者Lua实现的函数.Lua基础库中的所有函数都是用C实现的.但这些细节对于lua程序员是透明的.调用一个用C实现的函数,和调用一个用Lua实现的函数,二者没有任何区别. 函数的参数跟局部变量一样,用传入的实参来初始化,多余的实参被丢弃,多余的形参初始化为nil. count=0 function incCount(n) n=n or 1 count=count+n end i

storm trident merger

import java.util.List; import backtype.storm.Config; import backtype.storm.LocalCluster; import backtype.storm.generated.StormTopology; import backtype.storm.tuple.Fields; import backtype.storm.tuple.Values; import storm.trident.Stream; import storm.