Storm入门--Storm编程

以电信通话记录为例

移动呼叫及其持续时间将作为对Apache Storm的输入,Storm将处理和分组在相同呼叫者和接收者之间的呼叫及其呼叫总数。

编程思想:

在storm中,把对数据的处理过程抽象成一个topology,这个topology包含的组件主要是spout、bolt,以及以tuple形式在组件之间传输的数据流。这个数据流在topology流一遍,就是对数据的一次处理。

1、创建Spout类

这一部分,是创建数据流的源头。

创建一个类,实现IRichSpout接口,实现相应方法。其中几个方法的含义:

  • open -为Spout提供执行环境。执行器将运行此方法来初始化喷头。一般写一些第一次运行时要处理的逻辑
  • nextTuple -通过收集器发出生成的数据。核心,用于生成数据流
  • close -当spout将要关闭时调用此方法。
  • declareOutputFields -声明元组的输出模式。即,声明了从此spout出去的流都的数据格式
  • ack -确认处理了特定元组。
  • fail -指定不处理和不重新处理特定元组。
open(Map conf, TopologyContext context, SpoutOutputCollector collector)
  • conf - 为此spout提供storm配置。
  • context - 提供有关拓扑中的spout位置,其任务ID,输入和输出信息的完整信息。
  • collector - 使我们能够发出将由bolts处理的元组。
nextTuple()

nextTuple()从与ack()和fail()方法相同的循环中定期调用。它必须释放线程的控制,当没有工作要做,以便其他方法有机会被调用。因此,nextTuple的第一行检查处理是否已完成。如果是这样,它应该休眠至少一毫秒,以减少处理器在返回之前的负载。

declareOutputFields(OutputFieldsDeclarer declarer)

declarer -它用于声明输出流id,输出字段等,此方法用于指定元组的输出模式。

ack(Object msgId)

该方法确认已经处理了特定元组。

fail(Object o)

此方法通知特定元组尚未完全处理。 Storm将重新处理特定的元组

package com.jing.calllogdemo;

import org.apache.storm.spout.SpoutOutputCollector;
import org.apache.storm.task.TopologyContext;
import org.apache.storm.topology.IRichSpout;
import org.apache.storm.topology.OutputFieldsDeclarer;
import org.apache.storm.tuple.Fields;
import org.apache.storm.tuple.Values;

import java.util.ArrayList;
import java.util.List;
import java.util.Map;
import java.util.Random;

/*
spout类,负责产生数据流
 */
public class CallLogSpout implements IRichSpout {
    //spout 输出收集器
    private SpoutOutputCollector collector;
    //是否完成
    private boolean completed = false;
    //上下文对象
    private TopologyContext context;
    //随机发生器
    private Random randomGenerator = new Random();
    //索引
    private Integer idx = 0;

    @Override
    public void open(Map map, TopologyContext topologyContext, SpoutOutputCollector spoutOutputCollector) {
        //第一次运行要做的事
        this.context = topologyContext;
        this.collector = spoutOutputCollector;

    }

    @Override
    public void close() {

    }

    @Override
    public void activate() {

    }

    @Override
    public void deactivate() {

    }

    @Override
    public void nextTuple() {
        //产生第一条数据,

        if (this.idx <= 1000){
            List<String> mobileNumbers = new ArrayList<String>();
            mobileNumbers.add("1234123401");
            mobileNumbers.add("1234123402");
            mobileNumbers.add("1234123403");
            mobileNumbers.add("1234123404");

            Integer localIdx = 0;
            while (localIdx++ < 100 && this.idx++ <1000){
                //取出主叫
                String caller = mobileNumbers.get(randomGenerator.nextInt(4));
                //取出被叫
                String callee = mobileNumbers.get(randomGenerator.nextInt(4));
                while (caller == callee){
                    //重新取出被叫
                    callee = mobileNumbers.get(randomGenerator.nextInt(4));
                }
                //模拟通话时长
                Integer duration = randomGenerator.nextInt(60);
                //输出元祖
                this.collector.emit(new Values(caller,callee,duration));
            }
        }

    }

    @Override
    public void ack(Object o) {

    }

    @Override
    public void fail(Object o) {

    }

    @Override
    public void declareOutputFields(OutputFieldsDeclarer outputFieldsDeclarer) {
        //声明输出字段,定义元组的结构,定义输出字段名称
        outputFieldsDeclarer.declare(new Fields("from", "to", "duration"));

    }

    @Override
    public Map<String, Object> getComponentConfiguration() {
        return null;
    }
}

CallLogSpout

2、创建Bolt类

这一部分是完成对数据流的处理,Bolt把元组作为输入,对元组进行处理后,产生新的元组。

创建一个类,实现IRichBolt接口,实现相应方法。

  • prepare -为bolt提供要执行的环境。执行器将运行此方法来初始化spout。
  • execute -处理单个元组的输入
  • cleanup -当spout要关闭时调用。
  • declareOutputFields -声明元组的输出模式。
prepare(Map conf, TopologyContext context, OutputCollector collector)
  • conf -为此bolt提供Storm配置。
  • context -提供有关拓扑中的bolt位置,其任务ID,输入和输出信息等的完整信息。
  • collector -使我们能够发出处理的元组。
execute(Tuple tuple)

这是bolt的核心方法,这里的元组是要处理的输入元组。execute方法一次处理单个元组。元组数据可以通过Tuple类的getValue方法访问。不必立即处理输入元组。多元组可以被处理和输出为单个输出元组。处理的元组可以通过使用OutputCollector类发出。

cleanup()
declareOutputFields(OutputFieldsDeclarer declarer)

这个方法用于指定元组的输出模式,参数declarer用于声明输出流id,输出字段等。

这里有两个bolt

呼叫日志创建者bolt接收呼叫日志元组。呼叫日志元组具有主叫方号码,接收方号码和呼叫持续时间。此bolt通过组合主叫方号码和接收方号码简单地创建一个新值。新值的格式为“来电号码 - 接收方号码”,并将其命名为新字段“呼叫”

package com.jing.calllogdemo;

import org.apache.storm.task.OutputCollector;
import org.apache.storm.task.TopologyContext;
import org.apache.storm.topology.IRichBolt;
import org.apache.storm.topology.OutputFieldsDeclarer;
import org.apache.storm.tuple.Fields;
import org.apache.storm.tuple.Tuple;
import org.apache.storm.tuple.Values;

import java.util.Map;
/*
创建calllog日志的bolt
 */
public class CallLogCreatorBolt implements IRichBolt {
    private OutputCollector collector;

    @Override
    public void prepare(Map map, TopologyContext topologyContext, OutputCollector outputCollector) {
        this.collector = outputCollector;
    }

    @Override
    public void execute(Tuple tuple) {
        //处理新的同话记录
        String from = tuple.getString(0);
        String to = tuple.getString(1);
        Integer duration = tuple.getInteger(2);
        //产生新的tuple
        String fromTO = from + "-" + to;
        collector.emit(new Values(fromTO, duration));

    }

    @Override
    public void cleanup() {

    }

    @Override
    public void declareOutputFields(OutputFieldsDeclarer outputFieldsDeclarer) {
        //设置输出字段的名称
        outputFieldsDeclarer.declare(new Fields("call", "duration"));
    }

    @Override
    public Map<String, Object> getComponentConfiguration() {
        return null;
    }
}

CallLogCreatorBolt

呼叫日志创建者bolt接收呼叫日志元组。呼叫日志元组具有主叫方号码,接收方号码和呼叫持续时间。此bolt通过组合主叫方号码和接收方号码简单地创建一个新值。新值的格式为“来电号码 - 接收方号码”,并将其命名为新字段“呼叫”。

package com.jing.calllogdemo;

import org.apache.storm.task.OutputCollector;
import org.apache.storm.task.TopologyContext;
import org.apache.storm.topology.IRichBolt;
import org.apache.storm.topology.OutputFieldsDeclarer;
import org.apache.storm.tuple.Fields;
import org.apache.storm.tuple.Tuple;

import java.util.HashMap;
import java.util.Map;
/*
通话记录计数器bolt
 */
public class CallLogCounterBolt implements IRichBolt {
    Map<String, Integer> counterMap;
    private OutputCollector collector;
    @Override
    public void prepare(Map map, TopologyContext topologyContext, OutputCollector outputCollector) {
        this.counterMap = new HashMap<String, Integer>();
        this.collector = outputCollector;
    }

    @Override
    public void execute(Tuple tuple) {
        String call = tuple.getString(0);
        Integer duration = tuple.getInteger(1);
        if(!counterMap.containsKey(call)){
            counterMap.put(call, 1);
        }else {
            Integer c = counterMap.get(call) + duration;
            counterMap.put(call, c);
        }
        collector.ack(tuple);

    }

    @Override
    public void cleanup() {
        for(Map.Entry<String, Integer> entry : counterMap.entrySet()){
            System.out.println(entry.getKey() + " : " + entry.getValue());
        }

    }

    @Override
    public void declareOutputFields(OutputFieldsDeclarer outputFieldsDeclarer) {
        outputFieldsDeclarer.declare(new Fields("call"));

    }

    @Override
    public Map<String, Object> getComponentConfiguration() {
        return null;
    }
}

CallLogCounterBolt

3、创建执行入口类,构建Topology

Storm拓扑基本上是一个Thrift结构。 TopologyBuilder类提供了简单而容易的方法来创建复杂的拓扑。TopologyBuilder类具有设置spout(setSpout)和设置bolt(setBolt)的方法。最后,TopologyBuilder有createTopology来创建拓扑。使用以下代码片段创建拓扑 -

package com.jing.calllogdemo;

import org.apache.storm.Config;
import org.apache.storm.LocalCluster;
import org.apache.storm.StormSubmitter;
import org.apache.storm.generated.AlreadyAliveException;
import org.apache.storm.generated.AuthorizationException;
import org.apache.storm.generated.InvalidTopologyException;
import org.apache.storm.topology.TopologyBuilder;
import org.apache.storm.tuple.Fields;

public class App {
    public static void main(String[] args) throws InterruptedException, InvalidTopologyException, AuthorizationException, AlreadyAliveException {
        TopologyBuilder builder = new TopologyBuilder();

        //设置spout
        builder.setSpout("spout", new CallLogSpout());
        //设置creator-bolt
        builder.setBolt("creator-bolt", new CallLogCreatorBolt()).shuffleGrouping("spout");
        //设置countor-bolt
        builder.setBolt("counter-bolt", new CallLogCounterBolt()).
                fieldsGrouping("creator-bolt", new Fields("call"));

        Config config = new Config();
        config.setDebug(true);

        /*本地模式
        LocalCluster cluster = new LocalCluster();
        cluster.submitTopology("LogAnalyserStorm", config, builder.createTopology());
        Thread.sleep(10000);
        cluster.shutdown();

         */

        StormSubmitter.submitTopology("myTop", config, builder.createTopology());

    }
}

App

为了开发目的,我们可以使用“LocalCluster”对象创建本地集群,然后使用“LocalCluster”类的“submitTopology”方法提交拓扑。 “submitTopology”的参数之一是“Config”类的实例。“Config”类用于在提交拓扑之前设置配置选项。此配置选项将在运行时与集群配置合并,并使用prepare方法发送到所有任务(spout和bolt)。一旦拓扑提交到集群,我们将等待10秒钟,集群计算提交的拓扑,然后使用“LocalCluster”的“shutdown”方法关闭集群。完整的程序代码如下 -

参考:

作者:raincoffee
链接:https://www.jianshu.com/p/7af9693d9ffc
来源:简书
简书著作权归作者所有,任何形式的转载都请联系作者获得授权并注明出处。

生产环境的集群上运行topology

1)修改提交方式,在代码中

2)导出jar包 mvn

3)在linux上运行topologys

&>storm jar XXX.jar  full.class.name

原文地址:https://www.cnblogs.com/Jing-Wang/p/11028749.html

时间: 2024-11-06 07:41:43

Storm入门--Storm编程的相关文章

_00019 Storm的体系结构介绍以及Storm入门案例(官网上的简单Java案例)

博文作者:妳那伊抹微笑 博客地址:http://blog.csdn.net/u012185296 个性签名:世界上最遥远的距离不是天涯,也不是海角,而是我站在妳的面前,妳却感觉不到我的存在 技术方向:Flume+Kafka+Storm+Redis/Hbase+Hadoop+Hive+Mahout+Spark ... 云计算技术 转载声明:可以转载, 但必须以超链接形式标明文章原始出处和作者信息及版权声明,谢谢合作! qq交流群:214293307  (期待与你一起学习,共同进步) # Storm

Storm入门(四)WordCount示例

Storm API文档网址如下: http://storm.apache.org/releases/current/javadocs/index.html 一.关联代码 使用maven,代码如下. pom.xml  和Storm入门(三)HelloWorld示例相同 RandomSentenceSpout.java /** * Licensed to the Apache Software Foundation (ASF) under one * or more contributor lice

Storm入门学习随记

推荐慕课网视频:http://www.imooc.com/video/10055 ====Storm的起源. Storm是开源的.分布式.流式计算系统 什么是分布式呢?就是将一个任务拆解给多个计算机去执行,让许多机器共通完成同一个任务, 把这个多机的细节给屏蔽,对外提供同一个接口.同一个服务,这样的系统就是分布式系统. 在多年以前并没有非常范用的分布式系统,即使存在,也都是限定在指定的领域, 当然,也有人尝试从中提取出共通的部分,发明一个通用的分布式系统,但是都没有很好的结果. 后来,Googl

storm入门教程 第一章 前言

转自:http://blog.linezing.com/?p=1847 storm:http://www.cnblogs.com/panfeng412/tag/Storm/ http://blog.linezing.com/?cat=92 1.1   实时流计算 互联网从诞生的第一时间起,对世界的最大的改变就是让信息能够实时交互,从而大大加速了各个环节的效率.正因为大家对信息实时响应.实时交互的需求,软件行业除了个人操作系统之外,数据库(更精确的说是关系型数据库)应该是软件行业发展最快.收益最为

storm入门教程 第一章 前言[转]

1.1   实时流计算 互联网从诞生的第一时间起,对世界的最大的改变就是让信息能够实时交互,从而大大加速了各个环节的效率.正因为大家对信息实时响应.实时交互的需求,软件行业除了个人操作系统之外,数据库(更精确的说是关系型数据库)应该是软件行业发展最快.收益最为丰厚的产品了.记得十年前,很多银行别说实时转账,连实时查询都做不到,但是数据库和高速网络改变了这个情况. 随着互联网的更进一步发展,从Portal信息浏览型到Search信息搜索型到SNS关系交互传递型,以及电子商务.互联网旅游生活产品等将

Storm入门(Storm程序)

Storm简介 Storm是一个分布式实时流式框架,大多应用于以下场景:实时分析.在线机器学习.流式计算.分布式RPC ETL(BL分析)等等.同类型的框架有hadoop和spark.hadoop侧重于海量数据的离线计算,spark则更擅长实时迭代计算.要注意的是,storm并不直接处理数据,而是把我们的业务程序(逻辑)放在很多服务器上并发运行,待处理消息被分散到很多服务器上并发处理,以此扩展程序的负载能力. Direction 简单来说的话,Storm框架包含两个部分.一个是Storm程序,一

storm入门

最近学习了storm的一些基础知识,感觉storm是一个非常强大的实时流处理系统.对其进行简要介绍如下: STORM 1.什么是storm Storm是一个开源的,分布式的,可靠的,实时数据流处理系统.类比Hadoop对数据进行批处理,storm对数据进行实时处理. 2.storm的应用场景 Storm的处理速度快吞吐量大,根据Storm官方网站的资料介绍,Storm的一个节点(Intel [email protected]的CPU,24 GB的内存)在1秒钟能够处理100万个100字节的消息.

Storm 入门的Demo教程

Storm介绍 Storm是Twitter开源的分布式实时大数据处理框架,最早开源于github,从0.9.1版本之后,归于Apache社区,被业界称为实时版Hadoop.随着越来越多的场景对Hadoop的MapReduce高延迟无法容忍,比如网站统计.推荐系统.预警系统.金融系统(高频交易.股票)等等,大数据实时处理解决方案(流计算)的应用日趋广泛,目前已是分布式技术领域最新爆发点,而Storm更是流计算技术中的佼佼者和主流. Storm的核心组件 Nimbus:即Storm的Master,负

Storm入门,看这篇就够了

部分一:Srorm 简介 1.1 Storm是实时的数据流,Hadoop是批量离线数据 起源背景 Twitter 开源的一个类似于Hadoop的实时数据处理框架 Storm是由Nathan Marz 在BackType公司[做社交数据分析,数据量大]工作中实现的,这家公司后来被Twitter收购. Ps:·Hadoop分布式存储和分布式计算两个难题全部解决了.但是缺点就是不能实时处理数据,Storm的作者就像写一个这样实时数据处理场景的框架出来 1.2 Storm应用场景[实时处理数据] 推荐系