如何快速写一个分布式实时应用程序

开源搜索引擎Iveely的0.8.0中,我们有提到Iveely Computing实时计算平台,因为Iveely搜索引擎也是基于这个平台做的开发,因此,我们可以利用这个平台,轻松构建分布式实时应用程序。在开始构建程序之前,请按照这里部署Iveely Computing,确定部署无误之后,我们可以从下面代码开始学习。

不管是hadoop还是storm,都会在入门的时候,有一个WordCount示例,Iveely Computing也不例外。

首先,WordCount代码如下

/*
 * To change this license header, choose License Headers in Project Properties.
 * To change this template file, choose Tools | Templates
 * and open the template in the editor.
 */
package iveely.computing.example;

import com.iveely.computing.api.FieldsDeclarer;
import com.iveely.computing.api.IInput;
import com.iveely.computing.api.IOutput;
import com.iveely.computing.api.StreamChannel;
import com.iveely.computing.api.TopologyBuilder;
import com.iveely.computing.api.TopologySubmitter;
import com.iveely.computing.api.Tuple;
import java.util.HashMap;
import java.util.Random;
import java.util.TreeMap;

/**
 *
 * @author [email protected]
 */
public class WordCount {

    public static class WordInput extends IInput {

        /**
         * Output data to collector.
         */
        private StreamChannel _channel;

        /**
         * Count of emitted.
         */
        private int emitCount = 0;

        @Override
        public void start(HashMap<String, Object> conf, StreamChannel channel) {
            _channel = channel;
        }

        @Override
        public void declareOutputFields(FieldsDeclarer declarer) {
            declarer.declare(new String[]{"word", "defaultCount"}, new Integer[]{0});
        }

        @Override
        public void nextTuple() {
            final String[] words = new String[]{"iveely", "mike", "jackson", "golda", "bertels", "blue", "china", "pan", "qq", "baidu", "ABC", "desk", "pen", "music", "play", "mouse", "mac", "windows", "microsoft", "c++", "java"};
            final Random rand = new Random();
            final String word = words[rand.nextInt(words.length)];
            final int count = 1;
            System.out.println(getName() + ":" + word);
            _channel.emit(word, count);
        }

        @Override
        public void end(HashMap<String, Object> conf) {

        }

        @Override
        public void toOutput() {
            _channel.addOutputTo(new WordOutputA());
            _channel.addOutputTo(new WordOutputB());
        }
    }

    public static class WordOutputA extends IOutput {

        /**
         * Output data to collector.
         */
        private StreamChannel _channel;

        @Override
        public void start(HashMap<String, Object> conf, StreamChannel channel) {
            _channel = channel;
        }

        @Override
        public void declareOutputFields(FieldsDeclarer declarer) {
            declarer.declare(new String[]{"word", "totalCount"}, null);
        }

        @Override
        public void execute(Tuple tuple) {
            String word = (String) tuple.get(0).toString();
            Integer defaultCount = Integer.parseInt(tuple.get(1).toString());
            _channel.emit(word, defaultCount);
        }

        @Override
        public void end(HashMap<String, Object> conf) {
            // Output map to data base or others.
        }

        @Override
        public void toOutput() {

        }
    }

    public static class WordOutputB extends IOutput {

        private TreeMap<String, Integer> map;

        /**
         * Output data to collector.
         */
        private StreamChannel _channel;

        @Override
        public void start(HashMap<String, Object> conf, StreamChannel channel) {
            map = new TreeMap<>();
            _channel = channel;
        }

        @Override
        public void declareOutputFields(FieldsDeclarer declarer) {
            declarer.declare(new String[]{"word", "totalCount"}, null);
        }

        @Override
        public void execute(Tuple tuple) {
            String word = (String) tuple.get(0).toString();
            System.out.println(this.getName() + ":" + word);
            Integer defaultCount = Integer.parseInt(tuple.get(1).toString());
            if (map.containsKey(word)) {
                int currentCount = map.get(word);
                map.put(word, defaultCount + currentCount);
            } else {
                map.put(word, defaultCount);
            }
        }

        @Override
        public void end(HashMap<String, Object> conf) {
            // Output map to data base or others.
        }

        @Override
        public void toOutput() {

        }
    }

    public static void main(String[] args) {
        TopologyBuilder builder = new TopologyBuilder("WordCount");
        builder.setInput(new WordInput(), 1);
        builder.setSlave(2);
        builder.isLocalMode = false;
        builder.setOutput(new WordOutputB(), 2);
        builder.setOutput(new WordOutputA(), 2);
        TopologySubmitter.submit(builder, args);
    }
}

其次,代码解析

代码中,包含了四个部分:

类WordInput:数据输入流,继承IInput,用于数据的产生。

类WordOutputA:数据输出流,继承IOutput , 用于数据的输出。

类WordOutputB:同上。

main函数:执行的入口。

第三,IInput

public void start(HashMap<String, Object> conf, StreamChannel channel)

类似于初始化函数,在开始产生数据之前,做的一些初始化,conf是一些配置文件,在main函数中的ToplogyBuilder中通过put方法设置进去,属于用户自定义数据。StreamChannel是流数据发送接口。在调用NextTuple的时候,会用到。本函数仅会调用一次。

public void declareOutputFields(FieldsDeclarer declarer)

用于声明本次产生数据的格式,declarer.declare(new String[]{"word", "defaultCount"}, new Integer[]{0, 1}); 表示输出数据有两个字段,第一个是word本身,第二个是默认的数量,后面new Integer[]{0}非常重要,是数据分发的分组,0表示,按照“word”本身进行分组。这样不同的word就会分发到不同的处理节点,本函数仅会调用一次。

public void nextTuple()

是数据的真正产生源,此方法会不断被调用,直到产生数据完成,产生完成是通过_channel.emitEnd();方法来表示完成,是需要手动调用,否则程序将会一直无休止运行下去,产生数据之后,一定要记得将数据发送出去:_channel.emit(word, count);。

public void end(HashMap<String, Object> conf)

当在nextTuple中调用_channel.emitEnd();之后,会调用此方法,此方法类似于程序推出前的清理工作,此方法仅调用一次。

public void toOutput()

此方法表示声明数据输出到的下一步流程。例如_channel.addOutputTo(new WordOutputA());表示输出的数据将会被WordOutoutA继续处理。当然可以addOutputTo到更多的IOutput。

第四,IOutput

在IOutput中,大部分均和IInput中类似,不同的在于IInput中有一个nextTuple,而在IOutput中,是 public void execute(Tuple tuple),此方法的调用方式为接收到数据之后才会触发。IOutput依然是可以输出到多个IOutput中去。

第五,main函数

main函数是程序的执行入口,对于我们的程序依然是,main函数中,包含两种模式,在这两种模式下,代码略有不同。

调试模式

用于本地调试,不需要部署Iveely Computing,可断点,跟调试一个普通程序一样,此刻main函数应该是这样。

public static void main(String[] args) {
        TopologyBuilder builder = new TopologyBuilder("WordCount");
        builder.setInput(new WordInput(), 1);
        builder.setSlave(2);
        builder.isLocalMode = true;
        builder.setOutput(new WordOutputB(), 2);
        builder.setOutput(new WordOutputA(), 2);
        TopologySubmitter.submit(builder, args);
    }

在builder参数中的isLocalMode设置为true即可。

部署模式

用于将程序提交到Iveely Computing,在调试模式下,确定程序无误之后,提交给Iveely Computing运行。

 public static void main(String[] args) {
        TopologyBuilder builder = new TopologyBuilder("WordCount");
        builder.setInput(new WordInput(), 1);
        builder.setSlave(2);
        builder.isLocalMode = false;
        builder.setOutput(new WordOutputB(), 2);
        builder.setOutput(new WordOutputA(), 2);
        TopologySubmitter.submit(builder, args);
    }

其余代码并无太大区别。其中setSlave表示期望给多个少机器节点运行,此处是两个,当不能满足这么多节点的时候,会根据当前最多的节点给予分配。 builder.setInput(new WordInput(), 1); 中的第二个参数表示给予1个线程读取数据。

编译文件到jar,参考这里的提交应用程序到Iveely Computing,并查看运行情况。

总结:上述是利用WordCount示例,大致对Iveely Computing的API做了一个介绍,如果有任何疑问,请邮件我:[email protected]。

背景:开源搜索引擎Iveely 0.8.0发布,终见天日

时间: 2024-10-25 22:00:19

如何快速写一个分布式实时应用程序的相关文章

JStorm 是一个分布式实时计算引擎

alibaba/jstorm JStorm 是一个分布式实时计算引擎. JStorm 是一个类似Hadoop MapReduce的系统, 用户按照指定的接口实现一个任务,然后将这个任务递交给JStorm系统,Jstorm将这个任务跑起来,并且按7 * 24小时运行起来,一旦中间一个worker 发生意外故障, 调度器立即分配一个新的worker替换这个失效的worker. 因此,从应用的角度,JStorm 应用是一种遵守某种编程规范的分布式应用.从系统角度, JStorm一套类似MapReduc

快速写一个babel插件

es6/7/8的出现,给我们带来了很多方便,但是浏览器并不怎么支持,目前chrome应该是支持率最高的,所以为了兼容我们只能把代码转成es5,这应该算是我们最初使用babel的一个缘由,随着业务的开发,我们会有很多自己定制化的需求,单纯的bebel并不能解决我们所有的问题,所以babel插件应用而来,本文将会采用较为通俗的语言来描述如何快速写一个babel插件. 一.babel的作用 babel的作用其实就是一个转换器,把我们的代码转成浏览器可以运行的代码,类似于加工厂的概念.解析代码都是一个文

思路:如何快速写一个全面的数据库 增备+全备+删除,自动化的脚本?

1.数据作用: 数据对我们来说是非常非常的重要,如果你是管理数据员.运维人员的,数据丢失了,找不回来,也就意味着,你的职业生涯就结束了,为了避免数据丢失,我们也做了很多的维护.备份,比如做主从复制.做全备.增备等等.这个可以参考我的另一篇文章Mysql主从复制. 我这里就不多说,今天我们要怎么写一个比较好的数据备份自动化脚本,让我们更好的管理数据库. 2.mysql备份脚本思路 首先要有一个思路,我的要求是每个的周一到周五做增量备份,周六做全备,对前两天的增备的日志删除,对两个星期前的全备也进行

如何部署Iveely.Computing分布式实时计算系统

Iveely.Computing是参考Storm的分布式实时计算系统的部分原理,用纯Java实现的轻量级.迷你型,适合于搜索引擎的实时计算系统, Iveely 搜索引擎是一款基于Iveely.Computing的搜索引擎,因此部署Iveely.Computing是使用Iveely搜索的关键,通过验证,Iveely搜索稳定在Iveely.Computing上运行了一个月. 一个完整的部署文件包含以下内容: Zookeeper 是必要的协调服务. Master 是Iveey.Computing的任务

用聚合数据API快速写出小程序(苏州实时公交)

利用聚合数据API快速写出小程序,过程简单. 1.申请小程序账号 2.进入开发 3.调用API.比如"苏州实时公交"小程序,选择的是苏州实时公交API. 苏州实时公交API文档:https://www.juhe.cn/docs/api/id/31 如下,是"苏州实时公交"小程序调用代码:  var url = "https://apis.juhe.cn/szbusline/bus";    //为了您的密钥安全,建议使用服务端代码中转请求,事例代

Elasticsearch是一个分布式可扩展的实时搜索和分析引擎,elasticsearch安装配置及中文分词

http://fuxiaopang.gitbooks.io/learnelasticsearch/content/  (中文) 在Elasticsearch中,文档术语一种类型(type),各种各样的类型存在于一个索引中.你也可以通过类比传统的关系数据库得到一些大致的相似之处: 关系数据库 ⇒ 数据库 ⇒ 表 ⇒ 行 ⇒ 列(Columns) Elasticsearch ⇒ 索引 ⇒ 类型 ⇒ 文档 ⇒ 字段(Fields)一个Elasticsearch集群可以包含多个索引(数据库),也就是说其

如何快速写出一个陌生人推荐系统

如何快速写出一个陌生人推荐系统 在社交游戏中,除了和好友互动,经常还会设计陌生人互动的游戏环节.下面两张图分别是QQ水浒和全民农场的陌生人推荐界面. QQ水浒陌生人界面 全民农场陌生人界面 那么,陌生人推荐系统一般是怎么做的呢?下面以全民农场的陌生人推荐系统为例来阐述如何快速构建一个陌生人推荐系统,由于采用了boost::multi_index库,整个推荐系统代码在400行左右,非常简洁. 首先,我们简单介绍一下全民农场的陌生人推荐系统规则: 1. 等级相近 2. 城市相近 3. 性别相反 4.

【整理学习Hadoop】H D F S 一个分布式文件系统

Hadoop分布式文件系统(HDFS)被设计成适合运行在通用硬件(commodity hardware)上的分布式文件系统.它和现有的分布式文件系统有很多共同点.但同时,它和其他的分布式文件系统的区别也是很明显的.HDFS是一个高度容错性的系统,适合部署在廉价的机器上.HDFS能提供高吞吐量的数据访问,非常适合大规模数据集上的应用.HDFS放宽了一部分POSIX约束,来实现流式读取文件系统数据的目的.HDFS在最开始是作为Apache Nutch搜索引擎项目的基础架构而开发的.HDFS是Apac

【整理学习HDFS】Hadoop Distributed File System 一个分布式文件系统

Hadoop分布式文件系统(HDFS)被设计成适合运行在通用硬件(commodity hardware)上的分布式文件系统.它和现有的分布式文件系统有很多共同点.但同时,它和其他的分布式文件系统的区别也是很明显的.HDFS是一个高度容错性的系统,适合部署在廉价的机器上.HDFS能提供高吞吐量的数据访问,非常适合大规模数据集上的应用.HDFS放宽了一部分POSIX约束,来实现流式读取文件系统数据的目的.HDFS在最开始是作为Apache Nutch搜索引擎项目的基础架构而开发的.HDFS是Apac