《Siddhi初探》中我们介绍了Siddhi的基本使用方法,并表示我们将把Siddhi集成到Storm中作为流任务处理引擎。本文将用《Storm初探》中的例子讲解如何集成Siddhi。
《Storm初探》中的例子把名字字符串进行分割与输出,我们将增加一个SIddhiBolt进行名字过滤,过滤规则是筛选出小于50岁的人的名字。
对于输出:刘备 49 关羽 50 张飞 51,曹操 49 郭嘉 50 荀彧 51。我们将过滤出刘备,曹操两个名字。代码如下:
package com.coshaho.learn.storm; import java.util.ArrayList; import java.util.List; import java.util.Map; import org.wso2.siddhi.core.SiddhiAppRuntime; import org.wso2.siddhi.core.SiddhiManager; import org.wso2.siddhi.core.event.Event; import org.wso2.siddhi.core.query.output.callback.QueryCallback; import org.wso2.siddhi.core.stream.input.InputHandler; import backtype.storm.task.OutputCollector; import backtype.storm.task.TopologyContext; import backtype.storm.topology.IRichBolt; import backtype.storm.topology.OutputFieldsDeclarer; import backtype.storm.tuple.Fields; import backtype.storm.tuple.Tuple; /** * * NamesFilterSiddhiBolt.java Create on 2017年6月26日 下午11:08:45 * * 类功能说明: 根据年龄过滤名称 * * Copyright: Copyright(c) 2013 * Company: COSHAHO * @Version 1.0 * @Author coshaho */ public class NamesFilterSiddhiBolt implements IRichBolt { private static final long serialVersionUID = 1L; private OutputCollector collector; private InputHandler inputHandler; @SuppressWarnings("rawtypes") public void prepare(Map stormConf, TopologyContext context, OutputCollector collector) { this.collector = collector; init(); } private void init() { SiddhiManager siddhiManager = new SiddhiManager(); String siddhiApp = "" + "define stream namesStream (name string, age int, streamid String); " + "" + "@info(name = ‘namefilter‘) " + "from namesStream[age < 50] " + "select name,streamid,age " + "insert into outputStream ;"; SiddhiAppRuntime siddhiAppRuntime = siddhiManager.createSiddhiAppRuntime(siddhiApp); siddhiAppRuntime.addCallback("namefilter", new QueryCallback() { @Override public void receive(long timeStamp, Event[] inEvents, Event[] removeEvents) { for(Event event : inEvents) { String name = event.getData(0) + ""; String streamId = event.getData(1) + ""; String age = event.getData(2) + ""; List<Object> splitList = new ArrayList<Object>(); splitList.add(name); System.out.println(name + " 年龄为 " + age); collector.emit(streamId, splitList); } } }); inputHandler = siddhiAppRuntime.getInputHandler("namesStream"); siddhiAppRuntime.start(); } public void execute(Tuple input) { String name = input.getString(0); int age = input.getInteger(1); String inputStream = input.getSourceStreamId(); try { inputHandler.send(new Object[]{name, age, inputStream}); } catch (InterruptedException e) { e.printStackTrace(); } collector.ack(input); } public void cleanup() { } public void declareOutputFields(OutputFieldsDeclarer declarer) { declarer.declare(new Fields("name")); } public Map<String, Object> getComponentConfiguration() { return null; } }
需要简单的修改一下名称切割Bolt,增加age字段输出
public void execute(Tuple input) { // 打印线程号用于追踪Storm的分配策略 Thread current = Thread.currentThread(); String names = input.getString(0); System.out.println("准备拆分" + names + "。当前线程号是" + current.getId() + "。"); List<Tuple> inputList = new ArrayList<Tuple>(); inputList.add(input); String[] nameArray = names.split(" "); int age = 49; for(String name : nameArray) { List<Object> splitList = new ArrayList<Object>(); splitList.add(name); splitList.add(age); collector.emit(inputList, splitList); age++; } collector.ack(input); }
Topo发布时增加Siddhi过滤节点
public static void main(String[] args) throws InterruptedException { TopologyBuilder builder = new TopologyBuilder(); builder.setSpout("names-reader", new NamesReaderSpout()); // 启动两个名字分割Task,名字列表随机分配给一个Task builder.setBolt("names-spliter", new NamesSpliterBolt(), 2) .shuffleGrouping("names-reader"); builder.setBolt("names-filter", new NamesFilterSiddhiBolt(), 1) .shuffleGrouping("names-spliter"); // 启动两个Hello World Task,相同名字发送到同一个Task builder.setBolt("hello-world", new HelloWorldBolt(), 2) .fieldsGrouping("names-filter", new Fields("name")); Config conf = new Config(); LocalCluster cluster = new LocalCluster(); cluster.submitTopology("storm-test", conf, builder.createTopology()); }
输出如下
时间: 2024-11-05 14:53:25