Storm集成Siddhi

《Siddhi初探》中我们介绍了Siddhi的基本使用方法,并表示我们将把Siddhi集成到Storm中作为流任务处理引擎。本文将用《Storm初探》中的例子讲解如何集成Siddhi。

《Storm初探》中的例子把名字字符串进行分割与输出,我们将增加一个SIddhiBolt进行名字过滤,过滤规则是筛选出小于50岁的人的名字。

对于输出:刘备 49 关羽 50 张飞 51,曹操 49 郭嘉 50 荀彧 51。我们将过滤出刘备,曹操两个名字。代码如下:

package com.coshaho.learn.storm;

import java.util.ArrayList;
import java.util.List;
import java.util.Map;

import org.wso2.siddhi.core.SiddhiAppRuntime;
import org.wso2.siddhi.core.SiddhiManager;
import org.wso2.siddhi.core.event.Event;
import org.wso2.siddhi.core.query.output.callback.QueryCallback;
import org.wso2.siddhi.core.stream.input.InputHandler;

import backtype.storm.task.OutputCollector;
import backtype.storm.task.TopologyContext;
import backtype.storm.topology.IRichBolt;
import backtype.storm.topology.OutputFieldsDeclarer;
import backtype.storm.tuple.Fields;
import backtype.storm.tuple.Tuple;

/**
 *
 * NamesFilterSiddhiBolt.java Create on 2017年6月26日 下午11:08:45
 *
 * 类功能说明:   根据年龄过滤名称
 *
 * Copyright: Copyright(c) 2013
 * Company: COSHAHO
 * @Version 1.0
 * @Author coshaho
 */
public class NamesFilterSiddhiBolt implements IRichBolt
{
    private static final long serialVersionUID = 1L;

    private OutputCollector collector;

    private InputHandler inputHandler;

    @SuppressWarnings("rawtypes")
    public void prepare(Map stormConf, TopologyContext context,
            OutputCollector collector)
    {
        this.collector = collector;
        init();
    }

    private void init()
    {
        SiddhiManager siddhiManager = new SiddhiManager();

        String siddhiApp = "" +
                "define stream namesStream (name string, age int, streamid String); " +
                "" +
                "@info(name = ‘namefilter‘) " +
                "from namesStream[age < 50] " +
                "select name,streamid,age " +
                "insert into outputStream ;";

        SiddhiAppRuntime siddhiAppRuntime = siddhiManager.createSiddhiAppRuntime(siddhiApp);

        siddhiAppRuntime.addCallback("namefilter", new QueryCallback()
        {
            @Override
            public void receive(long timeStamp, Event[] inEvents, Event[] removeEvents)
            {
                for(Event event : inEvents)
                {
                    String name = event.getData(0) + "";
                    String streamId = event.getData(1) + "";
                    String age = event.getData(2) + "";
                    List<Object> splitList = new ArrayList<Object>();
                    splitList.add(name);
                    System.out.println(name + " 年龄为 " + age);
                    collector.emit(streamId, splitList);
                }
            }
        });

        inputHandler = siddhiAppRuntime.getInputHandler("namesStream");
        siddhiAppRuntime.start();
    }

    public void execute(Tuple input)
    {
        String name = input.getString(0);
        int age = input.getInteger(1);
        String inputStream = input.getSourceStreamId();
        try
        {
            inputHandler.send(new Object[]{name, age, inputStream});
        }
        catch (InterruptedException e)
        {
            e.printStackTrace();
        }

        collector.ack(input);
    }

    public void cleanup()
    {
    }

    public void declareOutputFields(OutputFieldsDeclarer declarer)
    {
        declarer.declare(new Fields("name"));
    }

    public Map<String, Object> getComponentConfiguration()
    {
        return null;
    }
}

需要简单的修改一下名称切割Bolt,增加age字段输出

    public void execute(Tuple input)
    {
        // 打印线程号用于追踪Storm的分配策略
        Thread current = Thread.currentThread();
        String names = input.getString(0);
        System.out.println("准备拆分" + names + "。当前线程号是" + current.getId() + "。");
        List<Tuple> inputList = new ArrayList<Tuple>();
        inputList.add(input);
        String[] nameArray = names.split(" ");
        int age = 49;
        for(String name : nameArray)
        {
            List<Object> splitList = new ArrayList<Object>();
            splitList.add(name);
            splitList.add(age);
            collector.emit(inputList, splitList);
            age++;
        }
        collector.ack(input);
    }

Topo发布时增加Siddhi过滤节点

    public static void main(String[] args) throws InterruptedException
    {
        TopologyBuilder builder = new TopologyBuilder();
        builder.setSpout("names-reader", new NamesReaderSpout());
        // 启动两个名字分割Task,名字列表随机分配给一个Task
        builder.setBolt("names-spliter", new NamesSpliterBolt(), 2)
            .shuffleGrouping("names-reader");
        builder.setBolt("names-filter", new NamesFilterSiddhiBolt(), 1)
            .shuffleGrouping("names-spliter");
        // 启动两个Hello World Task,相同名字发送到同一个Task
        builder.setBolt("hello-world", new HelloWorldBolt(), 2)
            .fieldsGrouping("names-filter", new Fields("name"));

        Config conf = new Config();
        LocalCluster cluster = new LocalCluster();
        cluster.submitTopology("storm-test", conf, builder.createTopology());
    }

输出如下

时间: 2024-11-05 14:53:25

Storm集成Siddhi的相关文章

Storm集成Kafka应用的开发

我们知道storm的作用主要是进行流式计算,对于源源不断的均匀数据流流入处理是非常有效的,而现实生活中大部分场景并不是均匀的数据流,而是时而多时而少的数据流入,这种情况下显然用批量处理是不合适的,如果使用storm做实时计算的话可能因为数据拥堵而导致服务器挂掉,应对这种情况,使用kafka作为消息队列是非常合适的选择,kafka可以将不均匀的数据转换成均匀的消息流,从而和storm比较完善的结合,这样才可以实现稳定的流式计算,那么我们接下来开发一个简单的案例来实现storm和kafka的结合 s

storm集成kafka的应用,从kafka读取,写入kafka

storm集成kafka的应用,从kafka读取,写入kafka by 小闪电 0前言 storm的主要作用是进行流式的实时计算,对于一直产生的数据流处理是非常迅速的,然而大部分数据并不是均匀的数据流,而是时而多时而少.对于这种情况下进行批处理是不合适的,因此引入了kafka作为消息队列,与storm完美配合,这样可以实现稳定的流式计算.下面是一个简单的示例实现从kafka读取数据,并写入到kafka,以此来掌握storm与kafka之间的交互. 1程序框图 实质上就是storm的kafkasp

storm集成kafka

kafkautil: import java.util.Properties; import kafka.javaapi.producer.Producer; import kafka.producer.ProducerConfig; import org.springframework.beans.factory.annotation.Value; public class KafkaUtil { @Value("#{sys['connect']}") private static

Storm 系列(七)—— Storm 集成 Redis 详解

一.简介 Storm-Redis 提供了 Storm 与 Redis 的集成支持,你只需要引入对应的依赖即可使用: <dependency> <groupId>org.apache.storm</groupId> <artifactId>storm-redis</artifactId> <version>${storm.version}</version> <type>jar</type> <

5、Storm集成Kafka

1.pom文件依赖 <!--storm相关jar --> <dependency> <groupId>org.apache.storm</groupId> <artifactId>storm-core</artifactId> <version>${storm.version}</version> <!--排除相关依赖 --> <exclusions> <exclusion>

Storm集群上的开发 ,任务计算输出到mysql数据库,集成jdbc(十)

storm集成jdbc,把计算结果保存到mysql中. 首先在mysql中建表 ,表的字段与输出的tuple的schema一致: create table result( word varchar(20), total int ); 编写一个连接提供器,用于获取mysql数据库连接: 需要引入jar :/usr/local/apps/apache-storm-1.0.3/external/storm-jdbc 的 storm-jdbc-1.0.3.jar package mystorm.word

storm

大数据技术交流群 :494721467 安装storm ------------------- 1.准备 jdk + zookeeper 2.storm安装 下载apache-storm-1.0.1.tar.gz + apache-storm-1.0.1.-src-tar.gz 3.tar 4.移动文件 $>mv /soft/.. 5.配置环境变量 [/etc/environment] path STORM_HOME= 6.配置storm [conf/yaml] storm.zookeeper.

storm:最火的流式处理框架

本文出处:www.cnblogs.com/langtianya/p/5199529.html 伴随着信息科技日新月异的发展,信息呈现出爆发式的膨胀,人们获取信息的途径也更加多样.更加便捷,同时对于信息的时效性要求也越来越高.举个搜索场景中的例子,当一个卖家发布了一条宝贝信息时,他希望的当然是这个宝贝马上就可以被卖家搜索出来.点击.购买啦,相反,如果这个宝贝要等到第二天或者更久才可以被搜出来,估计这个大哥就要骂娘了.再举一个推荐的例子,如果用户昨天在淘宝上买了一双袜子,今天想买一副泳镜去游泳,但是

Storm是什么

Why use Storm? Apache Storm是一个免费的开源的分布式实时计算系统.Storm使得可靠的实时处理无边界的数据量变得很容易,就如同Hadoop做批处理那样.Storm很简单,可以用任意的编程语言. Storm有许多使用案例:实时分析.在线机器学习.持续的计算.分布式RPC.ETL等等.Storm很快速:每个节点每秒钟可以处理一百万个元组.它是可伸缩的.容错的,保证你的数据将会被处理,并且很容易操作. Storm集成了队列和数据库技术.一个Storm拓扑结构以任意复杂的方式消