storm实时分析-示例2

一个更复杂的例子

上面的DRPC例子只是为了介绍DRPC概念的一个简单的例子。下面让我们看一个复杂的、确实需要storm的并行计算能力的例子, 这个例子计算twitter上面一个url的reach值。

首先介绍一下什么是reach值,要计算一个URL的reach值,我们需要:

  • 获取所有微薄里面包含这个URL的人
  • 获取这些人的粉丝
  • 把这些粉丝去重
  • 获取这些去重之后的粉丝个数 — 这就是reach

一个简单的reach计算可能会有成千上万个数据库调用,并且可能设计到百万数量级的微薄用户。这个确实可以说是CPU intensive的计算了。你会看到的是,在storm上面来实现这个是非常非常的简单。在单台机器上面, 一个reach计算可能需要花费几分钟。而在一个storm集群里面,即时是最男的URL, 也只需要几秒。

一个reach topolgoy的例子可以在这里找到(storm-starter)。reach topology是这样定义的:

[java] view plaincopy

  1. LinearDRPCTopologyBuilder builder
  2. =newLinearDRPCTopologyBuilder("reach");
  3. builder.addBolt(newGetTweeters(), 3);
  4. builder.addBolt(newGetFollowers(), 12)
  5. .shuffleGrouping();
  6. builder.addBolt(newPartialUniquer(), 6)
  7. .fieldsGrouping(newFields("id","follower"));
  8. builder.addBolt(newCountAggregator(), 2)
  9. .fieldsGrouping(newFields("id"));

这个topology分四步执行:

  • GetTweeters获取所发微薄里面包含制定URL的所有用户。它接收输入流: [id, url], 它输出:[id, tweeter]. 没一个URL tuple会对应到很多tweetertuple。
  • GetFollowers 获取这些tweeter的粉丝。它接收输入流: [id, tweeter], 它输出: [id, follower]
  • PartialUniquer 通过粉丝的id来group粉丝。这使得相同的分析会被引导到统一个task。因此不同的task接收到的粉丝是不同的 — 从而起到去重的作用。它的输出流:[id, count] 即输出这个task上统计的粉丝个数。
  • 最后,CountAggregator 接收到所有的局部数量, 把它们加起来就算出了我们要的reach值。

我们来看一下PartialUniquer的实现:

[java] view plaincopy

  1. publicstatic class PartialUniquer
  2. implementsIRichBolt, FinishedCallback {
  3. OutputCollector _collector;
  4. Map<Object, Set<String>> _sets
  5. =newHashMap<Object, Set<String>>();
  6. publicvoid prepare(Map conf,
  7. TopologyContext context,
  8. OutputCollector collector) {
  9. _collector = collector;
  10. }
  11. publicvoid execute(Tuple tuple) {
  12. Object id = tuple.getValue(0);
  13. Set<String> curr = _sets.get(id);
  14. if(curr==null) {
  15. curr = newHashSet<String>();
  16. _sets.put(id, curr);
  17. }
  18. curr.add(tuple.getString(1));
  19. _collector.ack(tuple);
  20. }
  21. publicvoid cleanup() {
  22. }
  23. publicvoid finishedId(Object id) {
  24. Set<String> curr = _sets.remove(id);
  25. intcount;
  26. if(curr!=null) {
  27. count = curr.size();
  28. }else{
  29. count = 0;
  30. }
  31. _collector.emit(newValues(id, count));
  32. }
  33. publicvoid declareOutputFields(OutputFieldsDeclarer declarer) {
  34. declarer.declare(newFields("id","partial-count"));
  35. }
  36. }

PartialUniquerexecute方法里面接收到一个粉丝tuple的时候, 它把这个tuple添加到当前request-id对应的Set里面去。

PartialUniquer同时也实现了FinishedCallback接口, 实现这个接口是告诉LinearDRPCTopologyBuilder 它想在接收到某个request-id的所有tuple之后得到通知,回调函数则是,code>finishedId方法。在这个回调函数里面PartialUniquer发射当前这个request-id在这个task上的粉丝数量。

在这个简单接口的背后,我们是使用CoordinatedBolt来检测什么时候一个bolt接收到某个request的所有的tuple的。CoordinatedBolt是利用direct stream来实现这种协调的。

这个topology的其余部分就非常的明了了。我们可以看到的是reach计算的每个步骤都是并行计算出来的,而且实现这个DRPC的topology是那么的简单。

非线性DRPC Topology

LinearDRPCTopologyBuilder只能搞定"线性"的DRPC topology。所谓的线性就是说你的计算过程是一步接着一步, 串联。我们不难想象还有其它的可能 -- 并联(回想一下初中物理里面学的并联电路吧), 现在你如果想解决这种这种并联的case的话, 那么你需要自己去使用CoordinatedBolt来处理所有的事情了。如果真的有这种use case的话, 在mailing list上大家讨论一下吧。

LinearDRPCTopologyBuilder的工作原理

  • DRPCSpout发射tuple: [args, return-info]。 return-info包含DRPC服务器的主机地址,端口以及当前请求的request-id
  • DRPC Topology包含以下元素:
    • DRPCSpout
    • PrepareRequest(生成request-id, return info以及args)
    • CoordinatedBolt
    • JoinResult -- 组合结果和return info
    • ReturnResult -- 连接到DRPC服务器并且返回结果
  • LinearDRPCTopologyBuilder是利用storm的原语来构建高层抽象的很好的例子。

高级特性

    • 如何利用KeyedFairBolt来同时处理多个请求
    • 如何直接使用CoordinatedBolt
时间: 2024-10-12 09:31:41

storm实时分析-示例2的相关文章

storm实时分析——示例1

Spout: spout对象必须是继承Serializable, 因此要求spout内所有数据结构必须是可序列化的 spout可以有构造函数,但构造函数只执行一次,是在提交任务时,创建spout对象,因此在task分配到具体worker之前的初始化工作可以在此处完成,一旦完成,初始化的内容将携带到每一个task内(因为提交任务时将spout序列化到文件中去,在worker起来时再将spout从文件中反序列化出来). open是当task起来后执行的初始化动作 close是当task被shutdo

storm trident 示例

Storm Trident的核心数据模型是一批一批被处理的“流”,“流”在集群的分区在集群的节点上,对“流”的操作也是并行的在每个分区上进行. Trident有五种对“流”的操作: 1.      不需要网络传输的本地批次运算 2.      需要网络传输的“重分布”操作,不改变数据的内容 3.      聚合操作,网络传输是该操作的一部分 4.      “流”分组(grouby)操作 5.      合并和关联操作 批次本地操作: 批次本地操作不需要网络传输,本格分区(partion)的运算

Storm Trident示例partitionBy

如下代码使用partitionBy做repartition, partitionBy即根据相应字段的值按一定算法,把tuple分配到目标partition当中(Target Partition = hash(fields) % (number of target partition)), 相同值会被分配到同一个partition当中,由于不同值有可能出现相同的hash, 根据上面的算法,不同的值,也可能分配到同一个partition中. 省略部分代码,省略部分可参考:https://blog.c

Storm Trident示例function, filter, projection

以下代码演示function, filter, projection的使用,可结合注释 省略部分代码,省略部分可参考:https://blog.csdn.net/nickta/article/details/79666918 FixedBatchSpout spout = new FixedBatchSpout(new Fields("user", "score"), 3, new Values("nickt1", 4), new Values(

Storm Trident示例Aggregator

Aggregator首先在输入流上运行全局重新分区操作(global)将同一批次的所有分区合并到一个分区中,然后在每个批次上运行的聚合功能,针对Batch操作.与ReduceAggregator很相似. 省略部分代码,省略部分可参考:https://blog.csdn.net/nickta/article/details/79666918 static class State { int count = 0; } FixedBatchSpout spout = new FixedBatchSpo

【JAVA版】Storm程序整合Kafka、Mongodb示例及部署

一.环境 一台Centos6.5主机 Mongo 3.0 Kafka_2.11-0.8.2.1 Storm-0.9.5 Zookeeper-3.4.6 java 1.7 (后因在mac上打包的jar由1.8编译没法儿运行,改为java 1.8) 其余环境暂略 二.运行启动 启动zookeeper 确认配置正确,配置相关可自行搜索. [[email protected] zookeeper-3.4.6]#pwd /data0/xxx/zookeeper-3.4.6 [[email protecte

Apache Storm 1.1.0 中文文档 | ApacheCN

前言  Apache Storm 是一个免费的,开源的,分布式的实时计算系统. 官方文档: http://storm.apache.org 中文文档: http://storm.apachecn.org ApacheCN 最近组织了翻译 Storm 1.1.0 中文文档 的活动,整体 翻译进度 为 96%. 感谢大家参与到该活动中来 感谢无私奉献的 贡献者,才有了这份 Storm 1.1.0 中文文档 感谢一路有你的陪伴,我们才可以做的更好,走的更快,走的更远,我们一直在努力 ... 网页地址:

实时分析之客户画像项目实践

客户画像的背景描写叙述 原来的互联网,以解决用户需求为目的.衍生出众多的网联网产品,以及产生呈数量级递增的海量数据.当用户需求基本得到满足的时候,须要分析这些海量的数据.得以达到最高效的需求实现,最智能的功能服务.以及最精准的产品推荐,最后提升产品的竞争力.简言之,产品由原来的需求驱动转换成数据驱动. 客户画像就是数据驱动的代表作之中的一个.详细点讲,客户画像就是用户的标签(使用该产品的群体),程序能自己主动调整.组合.生成这些标签,最后再通过这些标签.达到精准营销的目的. 当前流行的实时分析框

轻松用Storm进行实时大数据分析【翻译】

原文地址 简单易用,Storm让大数据分析变得轻而易举. 如今,公司在日常运作中经常会产生TB(terabytes)级的数据.数据来源包括从网络传感器捕获的,到Web,社交媒体,交易型业务数据,以及其他业务环境中创建的数据.考虑到数据的生成量,实时计算(real-time computation )已成为很多组织面临的一个巨大挑战.我们已经有效地使用了一个可扩展的实时计算系统--开源的 Storm 工具,它是有 Twitter 开发,通常被称为"实时 Hadoop(real-time Hadoo