开源分布式实时计算引擎 Iveely Computing 之 WordCount 详解(3)

      WordCount是很多分布式计算中,最常用的例子,例如Hadoop、Storm,Iveely Computing也不例外。明白了WordCount在Iveely Computing上的运行原理,就很容易写出新的分布式程序。上一篇中已经知道了如何部署Iveely Computing以及提交任务,现在我们将深入WordCount的代码。

       一、代码结构

           

                             图3-1

      从图3-1中,可以看出,类WordCount中,有两个子类WordInput、WordOutput,以及一个主方法,WordCount.java即是一个Topology,里面至少包涵一个Input和Output(缺一不可,否则没有意义),以及main函数,main函数依然是Topology的入口函数。

      现在问题来了,Input和Output到底是什么关系?还有Topology?

     

      每一个Topology就是一个完整的任务链,可以包含多个Input,多个Output,Input的数据只能传递给一个或多个Output,Output只能将数据传递给一个或多个Output,从而形成一个完整的拓扑结构。

       二、Input 深入

       Input是数据的产生源,通过类WordInput看下是如何产生数据,并传递给Output的。

public static class WordInput extends IInput {

        /**
         * Output data to collector.
         */
        private StreamChannel _channel;

        /**
         * All sample words.
         */
        private final String[] _words = new String[] { "welcome", "iveely", "computing", "0.9.0", "build", "by",
                "liufanping", "thanks", "github.com" };

        private int _index;

        @Override
        public void start(HashMap<String, Object> conf, StreamChannel channel) {
            // Here,must be initialize channel.
            _channel = channel;
            _index = _words.length - 1;
        }

        @Override
        public void declareOutputFields(FieldsDeclarer declarer) {
            declarer.declare(new String[] { "word" }, new Integer[] { 0 });
        }

        @Override
        public void nextTuple() {
            if (_index < 0) {
                _channel.emitEnd();
            } else {
                for (int i = 0; i < 100; i++) {
                    _channel.emit(_words[_index]);
                }
                _index--;
            }
        }

        @Override
        public void end() {
            System.out.println(getName() + " finished.");
        }

        @Override
        public void toOutput() {
            _channel.addOutputTo(new WordOutput());
        }
    }

      函数讲解:

start函数 在执行此Input之前提前调用的函数,用户初始化等相关工作,类似于构造函数,对有数据输出的时候,一定要初始化channel。
declareOutputFields函数 用于声明输出的数据信息。
nextTuple函数 此函数将会被频繁调用,用于输出数据,利用channel.emit提交数据到output。
end函数 是在Input执行完毕之后,会执行的代码,类似于析构函数。
toOutput函数 是指定Input的数据输出到的Output。

      上面代码中,必须注意的几个问题:

      2.1  WordInput必须继承IInput。

      2.2  Input中,必须在start中初始化channel,因为input一定会产生数据。

      2.3  Input中,toOutput函数中,必须指定数据流向。

      三、Output深入

      Output是数据的处理单元,也可以是新数据的产生单元。

public static class WordOutput extends IOutput {

        private TreeMap<String, Integer> _map;

        @Override
        public void start(HashMap<String, Object> conf, StreamChannel channel) {
            _map = new TreeMap<>();
        }

        @Override
        public void declareOutputFields(FieldsDeclarer declarer) {
            declarer.declare(new String[] { "word", "totalCount" }, null);
        }

        @Override
        public void execute(Tuple tuple) {
            String word = tuple.get(0).toString();
            if (_map.containsKey(word)) {
                int currentCount = _map.get(word);
                _map.put(word, currentCount + 1);
            } else {
                _map.put(word, 1);
            }
        }

        @Override
        public void end() {
            // Output map to database or print.
            Iterator<String> it = _map.keySet().iterator();
            while (it.hasNext()) {
                String key = it.next();
                int value = _map.get(key);
                System.out.println(getName() + ":" + key + "," + value);
            }
        }

        @Override
        public void toOutput() {

        }
    }

      与Input相比,output中没有nextTuple函数,而是取而代之的execute函数。nextTuple是产生数据,execute是处理数据。如果execute处理完毕之后的数据也需要提交到新的output中去,则需要在execute中利用channel.emit方法提交数据,此刻toOutput中也需要指定数据流向。

      此处也需要注意几个问题:

      3.1 如果output需要继续传递数据,则需要在start中初始化channel。

      3.2 如果当前output接受的数据源来自不同的input,且数据格式不统一,则需要自行判断数据格式,例如传递数组中,第一个用int标识是什么样的数据格式。

       四、main函数

      main函数,依然是Topology的执行入口,不同的是,它有两种执行方式,一个是本地模式,一个是远程执行模式。本地模式是用于调试用。

public static void main(String[] args) {
        TopologyBuilder builder = new TopologyBuilder(true, WordCount.class.getName(), "WordCount");
        builder.setInput(new WordInput(), 1);
        builder.setOutput(new WordOutput(), 4);
        builder.setSlave(2);
        TopologySubmitter.submit(builder, args);
    }

       main函数中,主要做的工作。

       4.1 新建TopologyBuilder对象,并在构造函数的第一个参数指定当前是本地模式(true)还是远程模式(false),第二个参数,指定执行的类名,第三个参数,当前Topology的名称。

       4.2 设定input和output。并指定运行的数量比(线程)。

       4.3 指定在多少个节点上运行(进程)。

       4.4 利用TopologySubmitter提交任务即可。

       4.5 注意:在生成jar提交到服务器上运行时,一定要将TopologyBuilder的第一个参数改为远程模式(false)。

时间: 2024-10-04 13:41:52

开源分布式实时计算引擎 Iveely Computing 之 WordCount 详解(3)的相关文章

JStorm 是一个分布式实时计算引擎

alibaba/jstorm JStorm 是一个分布式实时计算引擎. JStorm 是一个类似Hadoop MapReduce的系统, 用户按照指定的接口实现一个任务,然后将这个任务递交给JStorm系统,Jstorm将这个任务跑起来,并且按7 * 24小时运行起来,一旦中间一个worker 发生意外故障, 调度器立即分配一个新的worker替换这个失效的worker. 因此,从应用的角度,JStorm 应用是一种遵守某种编程规范的分布式应用.从系统角度, JStorm一套类似MapReduc

权威详解 | 阿里新一代实时计算引擎 Blink,每秒支持数十亿次计算

王峰,淘宝花名"莫问",2006年毕业后即加入阿里巴巴集团,长期从事搜索和大数据基础技术研发工作,目前在计算平台事业部,负责实时计算北京研发团队. 在阿里巴巴的11年工作期间,持续专注大数据计算与存储技术领域,基于Hadoop开源生态打造的数据基础设施一直服务于搜索.推荐等阿里核心电商业务场景,最近一年带领团队对Apache Flink进行了大量架构改进.功能完善和性能提升,打造出了阿里新一代实时计算引擎: Blink.目前数千台规模的Blink生产集群已经开始在线支持搜索.推荐.广告

(第8篇)实时可靠的开源分布式实时计算系统——Storm

摘要: 在Hadoop生态圈中,针对大数据进行批量计算时,通常需要一个或者多个MapReduce作业来完成,但这种批量计算方式是满足不了对实时性要求高的场景.那Storm是怎么做到的呢? 博主福利 给大家赠送一套hadoop视频课程 授课老师是百度 hadoop 核心架构师 内容包括hadoop入门.hadoop生态架构以及大型hadoop商业实战案例. 讲的很细致, MapReduce 就讲了 15 个小时. 学完后可以胜任 hadoop 的开发工作,很多人学的这个课程找到的工作. (包括指导

51信用卡金融风控场景下实时计算引擎的设计与实践

https://mp.weixin.qq.com/s/Rx43XfhgdwerQWLn1eI3Ww 51信用卡金融风控场景下实时计算引擎的设计与实践 原创: 周来 51NB技术 5月7日 原文地址:https://www.cnblogs.com/yuanjiangw/p/10961583.html

实现高性能纠删码引擎 | 纠删码技术详解(下)

作者介绍: 徐祥曦,七牛云工程师,独立开发了多套高性能纠删码/再生码编码引擎.柳青,华中科技大学博士,研究方向为基于纠删码的分布式存储系统. 前言: 在上篇<如何选择纠删码编码引擎>中,我们简单了解了 Reed-Solomon Codes(RS 码)的编/解码过程,以及编码引擎的评判标准.但并没有就具体实现进行展开,本篇作为<纠删码技术详解>的下篇,我们将主要探讨工程实现的问题. 这里先简单提炼一下实现高性能纠删码引擎的要点:首先,根据编码理论将矩阵以及有限域的运算工程化,接下来主

MySQL - 引擎、Explain、权限详解

一.引擎 简介 Innodb引擎 Innodb引擎提供了对数据库ACID事务的支持,并且实现了SQL标准的四种隔离级别.该引擎还提供了行级锁和外键约束,它的设计目标是处理大容量数据库系统,它本身其实就是基于MySQL后台的完整数据库系统,MySQL运行时Innodb会在内存中建立缓冲池,用于缓冲数据和索引.但是该引擎不支持FULLTEXT类型的索引,而且它没有保存表的行数,当SELECT COUNT(*) FROM TABLE时需要扫描全表.当需要使用数据库事务时,该引擎当然是首选.由于锁的粒度

Storm 大数据 视频教程 安装 Spark Kafka Hadoop 分布式实时计算

视频资料都逐个核对,清晰高质量,而且包含各种文档.软件安装包和源码!永久免费更新! 技术团队永久免费解答各种技术问题:Hadoop.Redis.Memcached.MongoDB.Spark.Storm.云计算.R语言.机器学习.Nginx.Linux.MySQL.Java EE..NET.PHP,节省您的时间! 获取视频资料和技术支持地址 ------------------------------------------------------------------------------

如何选择纠删码编码引擎 | 纠删码技术详解(上)

作者介绍: 徐祥曦,七牛云工程师,独立开发了多套高性能纠删码/再生码编码引擎.柳青,华中科技大学博士,研究方向为基于纠删码的分布式存储系统. 前言:随着数据的存储呈现出集中化(以分布式存储系统为基础的云存储系统)和移动化(互联网移动终端)的趋势,数据可靠性愈发引起大家的重视.集群所承载的数据量大大上升,但存储介质本身的可靠性进步却很小,这要求我们必须以更加经济有效的方式来保障数据安全. 副本与纠删码都是通过增加冗余数据的方式来保证数据在发生部分丢失时,原始数据不发生丢失.但相较于副本,纠删码能以

分布式事务(两阶段提交)模型详解

详见:http://blog.yemou.net/article/query/info/tytfjhfascvhzxcyt369 这一几天一直在回顾事务相关的知识,也准备把以前了解皮毛的知识进行一些深入总结,虽然这一些知识并没有用到,但是了解其实现原理还是很有必要的,因为知道了原理,你也能把它实现出来. 在上一节事务的编程模型里面,主要说明了三种编程模型,一般情况下,我们都接触的是单一资源的事务,也就是单独对一个数据库进行操作.如果需要跨多个资源保证事务一致性 举个例子:在ATM机取钱的时候,需