高并发计算总数和去重

spout:

package com.storm.WordCount;

import backtype.storm.spout.SpoutOutputCollector;
import backtype.storm.task.TopologyContext;
import backtype.storm.topology.OutputFieldsDeclarer;
import backtype.storm.topology.base.BaseRichSpout;
import backtype.storm.tuple.Fields;
import backtype.storm.tuple.Values;
import backtype.storm.utils.Utils;

import java.util.Map;
import java.util.Random;

public class MyRandomSentenceSpout extends BaseRichSpout {

    private static final long serialVersionUID = 1L;

      SpoutOutputCollector _collector;
      Random _rand;
      String[] sentences = new String[]{ "a b c d","b c d e f","a d e f"};

  @Override
  public void nextTuple() {
    for (String sen : sentences) {
        _collector.emit(new Values(sen));
    }
    Utils.sleep(1000*5);
  }

  @Override
  public void open(Map conf, TopologyContext context, SpoutOutputCollector collector) {
      _collector = collector;
      _rand = new Random();
  }

  @Override
  public void ack(Object id) {
  }

  @Override
  public void fail(Object id) {
  }

  @Override
  public void declareOutputFields(OutputFieldsDeclarer declarer) {
    declarer.declare(new Fields("word"));
  }

}

splitBolt

package com.storm.WordCount;

import java.util.Map;

import backtype.storm.task.TopologyContext;
import backtype.storm.topology.BasicOutputCollector;
import backtype.storm.topology.FailedException;
import backtype.storm.topology.OutputFieldsDeclarer;
import backtype.storm.topology.base.BaseBasicBolt;
import backtype.storm.tuple.Fields;
import backtype.storm.tuple.Tuple;
import backtype.storm.tuple.Values;

public class MySplit  extends BaseBasicBolt{
    private static final long serialVersionUID = 1L;

    //切割符
    String patton;
    public MySplit(String patton){
        this.patton = patton;
    }

    /**
     * ask 被管理 ,不需要显式调
     */
    @Override
    public void execute(Tuple input, BasicOutputCollector collector) {

        try {
            String sen = input.getStringByField("word");
            if (sen != null) {
                for (String word : sen.split(patton)) {
                    //必须Values List类型
                    collector.emit(new Values(word));
                }
            }
        } catch (Exception e) {
            throw new FailedException("split fail!");
        }
    }

    @Override
    public void prepare(Map stormConf, TopologyContext context) {

    }

    @Override
    public void declareOutputFields(OutputFieldsDeclarer declarer) {

        declarer.declare(new Fields("word"));

    }

    @Override
    public Map<String, Object> getComponentConfiguration() {
        return null;
    }

    @Override
    public void cleanup() {

    }

}

wordCountBolt:

package com.storm.WordCount;

import java.util.HashMap;
import java.util.Map;

import backtype.storm.topology.BasicOutputCollector;
import backtype.storm.topology.OutputFieldsDeclarer;
import backtype.storm.topology.base.BaseBasicBolt;
import backtype.storm.tuple.Fields;
import backtype.storm.tuple.Tuple;
import backtype.storm.tuple.Values;

public class WordCount  extends BaseBasicBolt {
    private static final long serialVersionUID = 1L;

    Map<String, Integer> counts = new HashMap<String, Integer>();

    @Override
    public void execute(Tuple tuple, BasicOutputCollector collector) {

      String word = tuple.getString(0);
      Integer count = counts.get(word);
      if (count == null)
        count = 0;

      count++;
      counts.put(word, count);
      collector.emit(new Values(word, count));
    }

    @Override
    public void declareOutputFields(OutputFieldsDeclarer declarer) {
      declarer.declare(new Fields("word", "count"));
    }
}

sumBolt:

package com.storm.WordCount;

import java.util.HashMap;
import java.util.Iterator;
import java.util.Map;

import backtype.storm.task.TopologyContext;
import backtype.storm.topology.BasicOutputCollector;
import backtype.storm.topology.FailedException;
import backtype.storm.topology.IBasicBolt;
import backtype.storm.topology.OutputFieldsDeclarer;
import backtype.storm.tuple.Tuple;

public class SumBolt implements IBasicBolt {

    private static final long serialVersionUID = 1L;

    Map<String, Integer> counts = new HashMap<String, Integer>();

    @Override
    public void execute(Tuple input, BasicOutputCollector collector) {

        try {
            //总数个数
            Long words_um =0L;
            //去重总数个数
            long word_count =0L;

            String word = input.getString(0);
            Integer count = input.getInteger(1);
            counts.put(word, count);

            Iterator<Integer> i = counts.values().iterator();

            //获取总数,遍历counts的values,进行sum
            while (i.hasNext()) {
                words_um += i.next();
            }

            //获取word去重的个数,遍历counts的keySet,取count
            Iterator<String> i2 = counts.keySet().iterator();
            while (i2.hasNext()) {

                String oneWord =i2.next();
                if(oneWord != null){
                    word_count ++;
                }
            }

            System.err.println("  总数  =  "+words_um +" 去重总数   = "+word_count);
        } catch (Exception e) {
            throw new FailedException("split fail!");
        }
    }

    @Override
    public void declareOutputFields(OutputFieldsDeclarer declarer) {

    }

    @Override
    public Map<String, Object> getComponentConfiguration() {
        return null;
    }

    @Override
    public void prepare(Map stormConf, TopologyContext context) {

    }
    @Override
    public void cleanup() {

    }

}

Topo:

package com.storm.WordCount;

import backtype.storm.Config;
import backtype.storm.LocalCluster;
import backtype.storm.StormSubmitter;
import backtype.storm.topology.TopologyBuilder;
import backtype.storm.tuple.Fields;

public class WordCountTopology {
  public static void main(String[] args) throws Exception {

    TopologyBuilder builder = new TopologyBuilder();

    builder.setSpout("spout", new MyRandomSentenceSpout(), 1);

    //分割
    builder.setBolt("split", new MySplit(" "), 8).shuffleGrouping("spout");
    builder.setBolt("count", new WordCount(), 12).fieldsGrouping("split", new Fields("word"));

    //全局汇总必须开单线程
    builder.setBolt("sum", new SumBolt(), 1).shuffleGrouping("count");

    Config conf = new Config();
    conf.setDebug(true);

    if (args != null && args.length > 0) {
      conf.setNumWorkers(3);

      StormSubmitter.submitTopology(args[0], conf, builder.createTopology());
    }
    else {
      conf.setMaxTaskParallelism(3);

      LocalCluster cluster = new LocalCluster();
      cluster.submitTopology("word-count", conf, builder.createTopology());

//      Thread.sleep(10000);
//
//      cluster.shutdown();
    }
  }
}
时间: 2024-08-01 10:29:00

高并发计算总数和去重的相关文章

高并发计算服务器数量

每秒查询率QPS:对一个特定的查询服务器在规定时间内所处理流量多少的衡量标准,即每秒请求数,即最大谈吐能力. 并发数:并发数和QPS是不同的概念,一般说QPS会说多少并发用户下QPS,当QPS相同时,并发用户数越大,网站并发处理能力越好.当并发用户数过大时,会造成进程(线程)频繁切换,反正真正用于处理请求的时间变少,每秒能够处理的请求数反而变少,同时用户的请求等待时间也会变大. 找到最佳线程数能够让web系统更稳定,效率更高. 通过QPS和pv计算部署服务器台数:  单台服务器每天PV计算 公式

Storm流计算从入门到精通之技术篇(高并发策略、批处理事务、Trident精解、运维监控、企业场景)

对这个课程有兴趣的可以加我qq2059055336和我联系 Storm是什么? 为什么学习Storm? Storm是Twitter开源的分布式实时大数据处理框架,被业界称为实时版Hadoop. 随着越来越多的场景对Hadoop的MapReduce高延迟无法容忍,比如网站统计.推荐系统.预警系统.金融系统(高频交易.股票)等等, 大数据实时处理解决方案(流计算)的应用日趋广泛,目前已是分布式技术领域最新爆发点,而Storm更是流计算技术中的佼佼者和主流. 按照storm作者的说法,Storm对于实

探索 ConcurrentHashMap 高并发性的实现机制

简介 ConcurrentHashMap 是 util.concurrent 包的重要成员.本文将结合 Java 内存模型,分析 JDK 源代码,探索 ConcurrentHashMap 高并发的具体实现机制. 由于 ConcurrentHashMap 的源代码实现依赖于 Java 内存模型,所以阅读本文需要读者了解 Java 内存模型.同时,ConcurrentHashMap 的源代码会涉及到散列算法和链表数据结构,所以,读者需要对散列算法和基于链表的数据结构有所了解. Java 内存模型 由

【转载】千万级规模高性能、高并发的网络架构经验分享

在开始谈我对架构本质的理解之前,先谈谈对今天技术沙龙主题的个人见解,千万级规模的网站感觉数量级是非常大的,对这个数量级我们战略上 要重视它,战术上又要藐视它.先举个例子感受一下千万级到底是什么数量级?现在很流行的优步(Uber),从媒体公布的信息看,它每天接单量平均在百万左右, 假如每天有10个小时的服务时间,平均QPS只有30左右.对于一个后台服务器,单机的平均QPS可以到达800-1000,单独看写的业务量很简单 .为什么我们又不能说轻视它?第一,我们看它的数据存储,每天一百万的话,一年数据

【并发与负载】千万级规模高性能、高并发的网络架构经验分享

架构以及我理解中架构的本质 在开始谈我对架构本质的理解之前,先谈谈对今天技术沙龙主题的个人见解,千万级规模的网站感觉数量级是非常大的,对这个数量级我们战略上 要重 视 它 , 战术上又 要 藐 视 它.先举个例子感受一下千万级到底是什么数量级?现在很流行的优步(Uber),从媒体公布的信息看,它每天接单量平均在百万左右, 假如每天有10个小时的服务时间,平均QPS只有30左右.对于一个后台服务器,单机的平均QPS可以到达800-1000,单独看写的业务量很简单 .为什么我们又不能说轻视它?第一,

每天近百亿条用户数据,携程大数据高并发应用架构涅槃

互联网二次革命的移动互联网时代,如何吸引用户.留住用户并深入挖掘用户价值,在激烈的竞争中脱颖而出,是各大电商的重要课题.通过各类大数据对用户进行研究,以数据驱动产品是解决这个课题的主要手段,携程的大数据团队也由此应运而生;经过几年的努力,大数据的相关技术为业务带来了惊人的提升与帮助. 以基础大数据的用户意图服务为例,通过将广告和栏位的"千人一面"变为"千人千面",在提升用户便捷性,可用性,降低费力度的同时,其转化率也得到了数倍的提升,体现了大数据服务的真正价值. 在

ConcurrentHashMap高并发性的实现原理

ConcurrentHashMap是Java5中新增加的一个线程安全的Map集合,可以用来替代HashTable.HashTable容器在竞争激烈的并发环境下表现出效率低下的原因是所有访问HashTable的线程都必须竞争同一把锁,那假如容器里有多把锁,每一把锁用于锁容器其中一部分数据,那么当多线程访问容器里不同数据段的数据时,线程间就不会存在锁竞争,从而可以有效的提高并发访问效率,这就是ConcurrentHashMap所使用的锁分段技术,首先将数据分成一段一段的存储,然后给每一段数据配一把锁

如何在高并发分布式系统中生成全局唯一Id

我了解的方案如下-------------------------- 1.  使用数据库自增Id 优势:编码简单,无需考虑记录唯一标识的问题. 缺陷: 1)         在大表做水平分表时,就不能使用自增Id,因为Insert的记录插入到哪个分表依分表规则判定决定,若是自增Id,各个分表中Id就会重复,在做查询.删除时就会有异常. 2)         在对表进行高并发单记录插入时需要加入事物机制,否则会出现Id重复的问题. 3)         在业务上操作父.子表(即关联表)插入时,需要

如何在高并发分布式系统中生成全局唯一Id(转)

http://www.cnblogs.com/heyuquan/p/global-guid-identity-maxId.html 又一个多月没冒泡了,其实最近学了些东西,但是没有安排时间整理成博文,后续再奉上.最近还写了一个发邮件的组件以及性能测试请看 <NET开发邮件发送功能的全面教程(含邮件组件源码)> ,还弄了个MSSQL参数化语法生成器,会在9月整理出来,有兴趣的园友可以关注下我的博客. 分享原由,最近公司用到,并且在找最合适的方案,希望大家多参与讨论和提出新方案.我和我的小伙伴们也