storm 事务和DRPC结合

示例代码:

package com.lky.topology;

import backtype.storm.Config;
import backtype.storm.LocalCluster;
import backtype.storm.LocalDRPC;
import backtype.storm.coordination.BatchOutputCollector;
import backtype.storm.drpc.LinearDRPCTopologyBuilder;
import backtype.storm.task.TopologyContext;
import backtype.storm.topology.BasicOutputCollector;
import backtype.storm.topology.OutputFieldsDeclarer;
import backtype.storm.topology.base.BaseBasicBolt;
import backtype.storm.topology.base.BaseBatchBolt;
import backtype.storm.tuple.Fields;
import backtype.storm.tuple.Tuple;
import backtype.storm.tuple.Values;
import backtype.storm.utils.Utils;

import java.util.*;

import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;

/**
 * @Title: ReachTopology.java
 * @Package com.lky.topology
 * @Description: 计算一个包含特定url的微博,最终能被多少人看到
 * @author lky
 * @date 2015年10月23日 下午9:09:22
 * @version V1.0
 */
public class ReachTopology {
    private static Log log = LogFactory.getLog(ReachTopology.class);
    public static Map<String, List<String>> TWEETERS_DB = new HashMap<String, List<String>>() {
        {
            put("foo.com/blog/1", Arrays.asList("sally", "bob", "tim", "george", "nathan"));
            put("engineering.twitter.com/blog/5", Arrays.asList("adam", "david", "sally", "nathan"));
            put("tech.backtype.com/blog/123", Arrays.asList("tim", "mike", "john"));
        }
    };

    public static Map<String, List<String>> FOLLOWERS_DB = new HashMap<String, List<String>>() {
        {
            put("sally", Arrays.asList("bob", "tim", "alice", "adam", "jim", "chris", "jai"));
            put("bob", Arrays.asList("sally", "nathan", "jim", "mary", "david", "vivian"));
            put("tim", Arrays.asList("alex"));
            put("nathan", Arrays.asList("sally", "bob", "adam", "harry", "chris", "vivian", "emily", "jordan"));
            put("adam", Arrays.asList("david", "carissa"));
            put("mike", Arrays.asList("john", "bob"));
            put("john", Arrays.asList("alice", "nathan", "jim", "mike", "bob"));
        }
    };

    /**
     * @Title: ReachTopology.java
     * @Package com.lky.topology
     * @Description: 获取包含该特定url的所有用户,随机发放到下游bolt中
     * @author lky
     * @date 2015年10月23日 下午11:46:19
     * @version V1.0
     */
    @SuppressWarnings("serial")
    public static class GetTweeters extends BaseBasicBolt {

        @Override
        public void execute(Tuple input, BasicOutputCollector collector) {
            Object id = null;
            String url = null;
            try {
                id = input.getValue(0);
                url = input.getString(1);
                List<String> tweeters = new ArrayList<String>();//获取包含该url的所有用户

                if (null != id && null != url) {
                    tweeters = TWEETERS_DB.get(url);
                    if (null != tweeters) {
                        for (String tweeter : tweeters) {
                            log.info("execute1------>[id = " + id + " ]["+url+"---->tweeter=" + tweeter + "]");
                            collector.emit(new Values(id, tweeter));
                        }
                    }
                }
            } catch (Exception e) {
                log.error("execute 发射消息错误!!!!!");
            }
        }

        @Override
        public void declareOutputFields(OutputFieldsDeclarer declarer) {
            declarer.declare(new Fields("id", "tweeter"));
        }

    }

    /**
     * @Title: ReachTopology.java
     * @Package com.lky.topology
     * @Description:获取每一个用户的粉丝,然后按字段分组(id,fllower)到下游bolt中,保证同一类url的相同用户被分到相同的批次
     * @author lky
     * @date 2015年10月23日 下午11:47:45
     * @version V1.0
     */
    @SuppressWarnings("serial")
    public static class GetFollowers extends BaseBasicBolt {

        @Override
        public void execute(Tuple input, BasicOutputCollector collector) {
            Object id = null;
            String _follower = null;

            try {
                id = input.getValue(0);
                _follower = input.getString(1);
                List<String> followers = new ArrayList<String>();

                if (null != id && null != _follower) {
                    followers = FOLLOWERS_DB.get(_follower);//获取该用户的所有粉丝
                    if (null != followers) {
                        for (String follower : followers) {
                            log.info("execute2------>[id = " + id + " ]["+_follower+"------>follower=" + follower + "]");
                            collector.emit(new Values(id, follower));
                        }
                    }
                }

            } catch (Exception e) {
                log.error("execute 发射消息异常!!!");
            }

        }

        @Override
        public void declareOutputFields(OutputFieldsDeclarer declarer) {
            declarer.declare(new Fields("id", "follower"));
        }

    }

    /**
     * @Title: ReachTopology.java
     * @Package com.lky.topology
     * @Description: 按批次统计粉丝数量
     * @author lky
     * @date 2015年10月23日 下午11:50:51
     * @version V1.0
     */
    @SuppressWarnings({ "serial", "rawtypes" })
    public static class PartialUniquer extends BaseBatchBolt {
        private BatchOutputCollector collector;
        private Object id;
        private Set<String> _followerSet = new HashSet<String>();

        @Override
        public void prepare(Map conf, TopologyContext context, BatchOutputCollector collector, Object id) {
            this.collector = collector;
            this.id = id;
        }

        @Override
        public void execute(Tuple tuple) {
            String uname = null;

            try {
                uname = tuple.getString(1);
                if (null != uname) {
                    log.info("execute3------>[id = " + tuple.getValue(0) + " ][ uname=" + uname + "]");
                    _followerSet.add(uname);
                }
            } catch (Exception e) {
                log.error("execute 接收消息异常!!!");
            }
        }

        @Override
        public void finishBatch() {
            log.info("execute4------>[id = " + id + " ][ size=" + _followerSet.size() + "]");
            collector.emit(new Values(id, _followerSet.size()));
        }

        @Override
        public void declareOutputFields(OutputFieldsDeclarer declarer) {
            declarer.declare(new Fields("id", "count"));
        }

    }

    /**
     * @Title: ReachTopology.java
     * @Package com.lky.topology
     * @Description: 按相同的id汇总批次
     * @author lky
     * @date 2015年10月23日 下午11:51:49
     * @version V1.0
     */
    @SuppressWarnings({ "serial", "rawtypes" })
    public static class CountAggregator extends BaseBatchBolt {
        private BatchOutputCollector collector;
        private Object id;
        private int _count = 0;

        @Override
        public void prepare(Map conf, TopologyContext context, BatchOutputCollector collector, Object id) {
            this.collector = collector;
            this.id = id;
        }

        @Override
        public void execute(Tuple tuple) {
            Integer count = null;
            try {
                count = tuple.getInteger(1);
                log.info("execute5------>[id = " + tuple.getValue(0) + " ][ count=" + count + "]");
                _count += count;
            } catch (Exception e) {
                log.error("execute 接收消息异常");
            }
        }

        @Override
        public void finishBatch() {
            collector.emit(new Values(id, _count));
        }

        @Override
        public void declareOutputFields(OutputFieldsDeclarer declarer) {
            declarer.declare(new Fields("id", "result"));
        }
    }

    @SuppressWarnings("deprecation")
    public static LinearDRPCTopologyBuilder construct() {
        LinearDRPCTopologyBuilder builder = new LinearDRPCTopologyBuilder("reach");
        builder.addBolt(new GetTweeters(), 4);
        builder.addBolt(new GetFollowers(), 12).shuffleGrouping();
        builder.addBolt(new PartialUniquer(), 6).fieldsGrouping(new Fields("id", "follower"));
        builder.addBolt(new CountAggregator(), 3).fieldsGrouping(new Fields("id"));
        return builder;
    }

    @SuppressWarnings("deprecation")
    public static void main(String[] args) {
        LinearDRPCTopologyBuilder builder = construct();

        Config conf = new Config();
        conf.setMaxTaskParallelism(3);
        LocalDRPC drpc = new LocalDRPC();
        LocalCluster cluster = new LocalCluster();
        cluster.submitTopology("reach-drpc", conf, builder.createLocalTopology(drpc));

        String[] urlsToTry = new String[] { "foo.com/blog/1", "engineering.twitter.com/blog/5", "notaurl.com" };
        for (String url : urlsToTry) {
            System.out.println("Reach of " + url + ": " + drpc.execute("reach", url));
        }

        Utils.sleep(1000 * 10);
        cluster.shutdown();
        drpc.shutdown();
    }
}
时间: 2024-10-21 14:37:34

storm 事务和DRPC结合的相关文章

storm事务

1. storm 事务 对于容错机制,Storm通过一个系统级别的组件acker,结合xor校验机制判断一个msg是否发送成功,进而spout可以重发该msg,保证一个msg在出错的情况下至少被重发一次.但是在一些事务性要求比较高的场景中,需要保障一次只有一次的语义,比如需要精确统计tuple的数量等等,torm0.7.0实现了一个新特性---事务性拓扑,这一特性使消息在语义上确保你可以安全的方式重发消息,并保证它们只会被处理一次.在不支持事务性拓扑的情况下,你无法在准确性,可扩展性,以空错性上

Storm事务Topology的接口介绍

ITransactionalSpout 基本事务Topology的Spout接口,内含两部分接口:协调Spout接口以及消息发送Blot接口. TransactionalSpoutBatchExecutor Bolt类型,用于执行ITransactionalSpout中的消息发送Bolt节点. TransactionalSpoutCoordinator Spout类型,用于执行ITransactionalSpout中的协调Spout节点,是系统中唯一的Spout节点,具体功能为初始化事务以及产生

Storm入门(十二)Twitter Storm: DRPC简介

作者: xumingming | 可以转载, 但必须以超链接形式标明文章原始出处和作者信息及版权声明网址: http://xumingming.sinaapp.com/756/twitter-storm-drpc/ 本文翻译自: https://github.com/nathanmarz/storm/wiki/Distributed-RPC . Storm里面引入DRPC主要是利用storm的实时计算能力来并行化CPU intensive的计算.DRPC的storm topology以函数的参数

Storm高级原语(二) -- DRPC详解

Storm里面引入DRPC主要是利用storm的实时计算能力来并行化CPU密集型(CPU intensive)的计算任务.DRPC的stormtopology以函数的参数流作为输入,而把这些函数调用的返回值作为topology的输出流. DRPC其实不能算是storm本身的一个特性, 它是通过组合storm的原语stream.spout.bolt. topology而成的一种模式(pattern).本来应该把DRPC单独打成一个包的, 但是DRPC实在是太有用了,所以我们我们把它和storm捆绑

1 storm基本概念 + storm编程规范及demo编写

本博文的主要内容有 .Storm的单机模式安装 .Storm的分布式安装(3节点)   .No space left on device .storm工程的eclipse的java编写 http://storm.apache.org/ 分布式的一个计算系统,但是跟mr不一样,就是实时的,实时的跟Mr离线批处理不一样. 离线mr主要是做数据挖掘.数据分析.数据统计和br分析. Storm,主要是在线的业务系统.数据像水一样,源源不断的来,然后,在流动的过程中啊,就要把数据处理完.比如说,一些解析,

storm transaction

storm transaction storm的事务主要用于对数据准确性要求非常高的环境中,尤其是在计算交易金额或笔数,数据库同步的场景中. storm 事务逻辑是挺复杂的,而且坦白讲,代码写的挺烂的. JStorm下一步将重新设计基于Meta 1 和Meta3 的事务模型,让使用者更简便,代码更清晰. 一个基本的例子 你可以通过使用TransactionalTopologyBuilder来创建transactional topology. 下面就是一个transactional topolog

2 storm的topology提交执行

本博文的主要内容有 .storm单机模式,打包,放到storm集群 .Storm的并发机制图 .Storm的相关概念 .附PPT 打包,放到storm集群去.我这里,是单机模式下的storm. weekend110-storm  ->   Export   ->   JAR file   -> 当然,这边,肯定是,准备工作已经做好了.如启动了zookeeper,storm集群. 上传导出的jar sftp> cd /home/hadoop/ sftp> put c:/d de

storm项目实战开发 Storm视频教程下载

Storm流计算从入门到精通-技术篇 课程分类:大数据 适合人群:初级 课时数量:25课时 用到技术:Storm集群.Zookeeper集群等 涉及项目:网站PV.UV案例实战.其他案例 更新程度:40% 持续更新中 storm项目实战开发 Storm视频教程下载地址:http://pan.baidu.com/s/1hq7Sqag Storm是什么? 为什么学习Storm? Storm是Twitter开源的分布式实时大数据处理框架,被业界称为实时版Hadoop. 随着越来越多的场景对Hadoop

Storm学习入门视频教程

Storm流计算从入门到精通之技术篇(高并发策略.批处理事务.Trident精解.运维监控.企业场景)课程讲师:Cloudy课程分类:大数据适合人群:初级课时数量:28课时用到技术:Storm集群.Zookeeper集群等涉及项目:网站PV.UV案例实战.其他案例咨询qq:1840215592课程亮点:1.Storm全面.系统.深入讲解,采用最新的稳定版本Storm 0.9.0.1 :2.注重实践,对较抽象难懂的技术点如Grouping策略.并发度及线程安全.批处理事务.DRPC.Storm T