jstorm开发指南-写个简单的jstorm应用

jstorm开发指南-写个简单的jstorm应用

发表于 2015-07-18   |   分类于 大数据   |   暂无评论

jstorm 是阿里巴巴开源的基于storm采用Java重写的一套分布式实时流计算框架,使用简单,特点如下:

  • 开发非常迅速: 接口简单,容易上手,只要遵守Topology,Spout, Bolt的编程规范即可开发出一个扩展性极好的应用,底层rpc,worker之间冗余,数据分流之类的动作完全不用考虑。
  • 扩展性极好:当一级处理单元速度,直接配置一下并发数,即可线性扩展性能
  • 健壮:当worker失效或机器出现故障时, 自动分配新的worker替换失效worker
  • 数据准确性: 可以采用Acker机制,保证数据不丢失。 如果对精度有更多一步要求,采用事务机制,保证数据准确。

为什么要选择jstorm,而不采用twitter的storm呢?jstorm对比storm有如下优点:

  • Nimbus 实现HA
  • 彻底解决Storm雪崩问题:底层RPC采用netty + disruptor保证发送速度和接受速度是匹配的
  • 新增supervisor、Supervisor shutdown时、提交新任务,worker数不够时,均不自动触发任务rebalance
  • 新topology不影响现有任务,新任务无需去抢占老任务的cpu,memory,disk和net
  • 减少对ZK的访问量:去掉大量无用的watch;task的心跳时间延长一倍;Task心跳检测无需全ZK扫描
  • Worker 内部全流水线模式:Spout nextTuple和ack/fail运行在不同线程
  • 性能:采用ZeroMq, 比storm快30%;采用netty时, 和storm快10%,并且稳定非常多

总之,Jstorm 比Storm 更稳定,功能更强大,更快。而且Storm上跑的程序可以一行代码不变运行在Jstorm上,零成本,推荐所有使用storm的兄弟们搭建个jstorm集群缓过来。

jstorm 集群的搭建过程,可以参考另一篇文章:分布式实时日志系统(一)环境搭建之 Jstorm 集群搭建过程/Jstorm集群一键安装部署

jstorm 开发实例

上面也说过了,jstorm使用起来很简单,遵循Topology,Spout, Bolt的编程规范就可以,在下面的例子中将一步步完成这些。例子也很简单,在spout中不断产生自增的int数组,bolt接受到数值后打印出日志,并插入到hbase中。(如果没有hbase环境的,这一步可以继续注释掉,不用打开,只看到跑到日志打印的地方就好了)

spout 的开发只需要继承BaseRichSpout,实现继承的方法即可:

12345678910111213141516171819202122232425262728293031323334353637383940414243444546474849505152535455
public class TestSpout extends BaseRichSpout {    private static final Logger LOGGER = LoggerFactory.getLogger(TestSpout.class);    static AtomicInteger sAtomicInteger = new AtomicInteger(0);    static AtomicInteger pendNum = new AtomicInteger(0);    private int sqnum;    SpoutOutputCollector collector;

    @Override    public void open(Map conf, TopologyContext context,                     SpoutOutputCollector collector) {        sqnum = sAtomicInteger.incrementAndGet();        this.collector = collector;    }

    @Override    public void nextTuple() {       while (true) {            int a = pendNum.incrementAndGet();            LOGGER.info(String.format("spount %d,pendNum %d", sqnum, a));            this.collector.emit(new Values("xxxxx:"+a));

            try {                Thread.sleep(10000);            } catch (InterruptedException e) {                e.printStackTrace();            }        }    }

    @Override    public void declareOutputFields(OutputFieldsDeclarer declarer) {        declarer.declare(new Fields("log"));

    }

    /**     * 启用 ack 机制,详情参考:https://github.com/alibaba/jstorm/wiki/Ack-%E6%9C%BA%E5%88%B6     * @param msgId     */    @Override    public void ack(Object msgId) {        super.ack(msgId);    }

    /**     * 消息处理失败后需要自己处理     * @param msgId     */    @Override    public void fail(Object msgId) {        super.fail(msgId);        LOGGER.info("ack fail,msgId"+msgId);    }

}

bolt 同理,继承 BaseRichBolt 实现其相应的方法:

1234567891011121314151617181920212223242526272829
public  class TestBolt extends BaseRichBolt {

    private static final Logger LOGGER = CustomerLoggerFactory.LOGGER(TestBolt.class);    OutputCollector collector;

    @Override    public void prepare(Map stormConf, TopologyContext context,                        OutputCollector collector) {        this.collector = collector;    }

    @Override    public void execute(Tuple input) {        String xx = input.getString(0);        LOGGER.info(String.format("receive from spout ,num is : %d", xx));

        // 发送ack信息告知spout 完成处理的消息 ,如果下面的hbase的注释代码打开了,则必须等到插入hbase完毕后才能发送ack信息,这段代码需要删除        this.collector.ack(input);        try {            Thread.sleep(10000);        } catch (InterruptedException e) {            e.printStackTrace();        }    }    @Override    public void declareOutputFields(OutputFieldsDeclarer declarer) {

    }}

topology 的开发同理:

12345678910111213141516
public class TestTopology implements ILogTopology {    @Override    public void start(Properties properties) throws AlreadyAliveException, InvalidTopologyException, InterruptedException, IOException {

        TopologyBuilder builder = new TopologyBuilder();        builder.setSpout("testspout", new TestSpout(), 1);        builder.setBolt("testbolt", new TestBolt(), 2).shuffleGrouping("testspout");

        Config conf = ConfigUtils.getStormConfig(properties);        conf.setNumAckers(1);

        StormSubmitter.submitTopology("testtopology", conf, builder.createTopology());        System.out.println("storm cluster will start");    }

}

经过上面的三个步骤,一个最简单的jstorm应用就开发完成了,接下来通过编译、打包完后,生成jar文件 jstorm-hbase-demo-0.1.jar ,将此jar文件在jstorm集群的nimbus机器上提交即可:jstorm jar jstorm-hbase-demo-0.1.jar com.xirong.demo.BootStrap config.properties

demo运行效果

从jstorm集群的监控图赏可以看到,对应topology的运行情况:

bolt 的执行效率,及ack数量,占用机器内存等:

源码已经上传到github上面,喜欢研究的同学,可以fork后自己修改练习。地址为:https://github.com/xirong/jstorm-hbase-demo
源码中使用到的Phoenix组件,hbase上层的中间件,使得开发人员可以使用sql的方式来对hbase进行相应的操作,感兴趣的可以阅读:使用Phoenix通过sql语句更新操作hbase数据 ,此文中介绍了如何安装及使用。
另外想对hbase的有所了解的可以查看:列式存储hbase系统架构学习

原文http://www.ixirong.com/2015/07/18/develop-the-first-jstorm-demo/

时间: 2024-08-04 14:01:23

jstorm开发指南-写个简单的jstorm应用的相关文章

DuiVision开发教程(2)-如何写一个简单的界面程序

基于DuiVision界面库开发的界面程序主要包括如下几部分内容: 1.资源定义,包括图片资源.各个窗口界面的xml定义文件 2.事件处理类代码,用于处理界面响应消息 3.其他业务逻辑代码 下面举例说明如何写一个简单的界面程序. 第一步:使用VC向导创建一个有两个tab页面的DuiVision工程 向导生成的解决方案文件如下: 默认有两个工程,分别是DuiVision库和应用程序工程.自动生成的代码目录中bin目录下的内容那个如下,bkimg目录存放窗口背景图片,skins目录存放图片资源,xm

转:Oculus Unity Development Guide开发指南(2015-7-21更新)

http://forum.exceedu.com/forum/forum.php?mod=viewthread&tid=34175 Oculus Unity Development Guide开发指南转载请保留原始地   http://t.cn/RAblKoh Oculus/GearVR开发者群 302294234 Welcometo the Unity Development GuideIntroduction简介Welcometo the Oculus Unity Developer Gui

Jdon框架开发指南

Jdon框架快速开发指南 开发主要步骤如下: JdonFramework6.0以上两步开发见这里. 快速配置指南 新增/查询/修改/删除(CRUD); 批量查询和分页显示 本文Step By Step详细讲解如何使用Jdon框架基于领域模型快速开发这两个功能,通过Jdon框架的可以快速完成系统原型(ArcheType),使得开发者将真正精力集中在每个项目系统的特殊业务处理. 本案例源码下载 按这里查看更详细全面文档 快速配置指南 Jdon框架有一个配置文件叫jdonframework.xml,其

JNI/NDK开发指南(八)——调用构造方法和父类实例方法

转载请注明出处:http://blog.csdn.net/xyang81/article/details/44002089 在第6章我们学习到了在Native层如何调用Java静态方法和实例方法,其中调用实例方法的示例代码中也提到了调用构造函数来实始化一个对象,但没有详细介绍,一带而过了.还没有阅读过的同学请移步<JNI/NDK开发指南(六)--C/C++访问Java实例方法和静态方法>阅读.这章详细来介绍下初始一个对象的两种方式,以及如何调用子类对象重写的父类实例方法. 我们先回过一下,在J

Boost程序库完全开发指南——深入C++“准”标准库(第3版)

内容简介  · · · · · · Boost 是一个功能强大.构造精巧.跨平台.开源并且完全免费的C++程序库,有着“C++‘准’标准库”的美誉. Boost 由C++标准委员会部分成员所设立的Boost 社区开发并维护,使用了许多现代C++编程技术,内容涵盖字符串处理.正则表达式.容器与数据结构.并发编程.函数式编程.泛型编程.设计模式实现等许多领域,极大地丰富了C++的功能和表现力,能够使C++软件开发更加简捷.优雅.灵活和高效. <Boost程序库完全开发指南——深入C++“准”标准库(

HelloX操作系统网络功能简介及使用和开发指南

HelloX网络功能简介及使用和开发指南 HelloX网络功能简介 作为物联网操作系统,网络功能是必备的核心功能之一.按照规划,HelloX实现了两个不同类型的TCP/IP协议栈,一个面向资源受限的嵌入式应用,移植了业界成熟使用的lwIP协议栈.该协议栈简洁明了,功能相对简单,同时专门面向嵌入式领域进行设计和优化,对硬件资源要求很低.另外一个协议栈来自BSD操作系统的协议栈,面向复杂的网络功能丰富的应用场景,比如家庭网关,物联网网关等.为了适应HelloX本身的机制,对BSD协议栈做了一些更改和

关于《Swift开发指南》背后的那些事

时间轴(倒叙)2014年8月底在图灵出版社的大力支持下,全球第一本全面.系统.科学的,包含本人多年经验的呕心沥血之作<Swift开发指南>(配有同步视频课程和同步练习)全线重磅推出2014年7月5日苹果宣布Swift语言二十天后,<Swift开发指南>第一稿交予图灵出版社2014年6月9日苹果宣布Swift语言三天后,启动<Swift开发指南>撰写2014年6月2日凌晨1点(北京时间:)在苹果开发者大会WWDC 2014上,苹果宣布了全新的iOS及OS X平台开发语言S

iBATIS开发指南笔记

第一部分  概述 (一)目标和初衷 1. iBATIS的目标是:用少量的代码获得大量的数据访问功能 2. 初衷是让程序员将如下过程做的更好更简单: Separating SQL code from programming code 将SQL代码从程序代码中分离 Passing input parameters to the library classes and extracting the output 对类库传递输入参数来提取输出结果 Separating data access class

Spring Security 入门(1-14)Spring Security 开发指南(转)

开发指南:http://www.cnblogs.com/xingxueliao/p/5911292.html Spring OAuth2.0 提供者实现原理: Spring OAuth2.0提供者实际上分为: 授权服务 Authorization Service. 资源服务 Resource Service. 虽然这两个提供者有时候可能存在同一个应用程序中,但在Spring Security OAuth中你可以把 他它们各自放在不同的应用上,而且你可以有多个资源服务,它们共享同一个中央授权服 务