jstorm在使用kafka作为spout的时候多线程问题

  jstorm在使用kafka作为spout的时候,高并发情况下会出现多线程报错问题需要对这两个类进行适当的修改来避免上述问题:  storm.kafka.PartitionManager  storm.kafka.ExponentialBackoffMsgRetryManager1.storm.kafka.PartitionManager的修改
//将变量
private SortedMap<Long, Long> _pending = new TreeMap();
//改为:
private SortedMap<Long, Long> _pending = Collections.synchronizedSortedMap(new TreeMap<Long, Long>());

/**----------------------------------------------------------------------------------------------------**/

//将方法
public long lastCompletedOffset() {
    return this._pending.isEmpty()?this._emittedToOffset.longValue():((Long)this._pending.firstKey()).longValue();
}
//改为:
public long lastCompletedOffset() {
    synchronized (_pending) {
        if (_pending.isEmpty()) {
            return _emittedToOffset;
        } else {
            return _pending.firstKey();
        }
    }
}
2.storm.kafka.ExponentialBackoffMsgRetryManager的修改
//将
private Queue<ExponentialBackoffMsgRetryManager.MessageRetryRecord> waiting = new PriorityQueue(11, new ExponentialBackoffMsgRetryManager.RetryTimeComparator());
private Map<Long, ExponentialBackoffMsgRetryManager.MessageRetryRecord> records = new ConcurrentHashMap();
//改为:
private Queue<MessageRetryRecord> waiting = new PriorityBlockingQueue<MessageRetryRecord>(11, new RetryTimeComparator());
private Map<Long,MessageRetryRecord> records = new ConcurrentHashMap<Long,MessageRetryRecord>();
时间: 2024-07-30 11:04:44

jstorm在使用kafka作为spout的时候多线程问题的相关文章

jstorm集成kafka

本人是spark的拥趸,因为工作中需要用到jstorm,作记录如下. pom.xml <dependencies> <dependency> <groupId>com.alibaba.jstorm</groupId> <artifactId>jstorm-core</artifactId> <version>2.1.1</version> <scope>provided</scope>

(二) kafka-jstorm集群实时日志分析 之 ---------jstorm集成spring

后面为了引入Dubbo RPC框架(用spring配置),先把spring 引入jstorm中,请先了解一下jsorm多线程方面的文档:http://storm.apache.org/documentation/Understanding-the-parallelism-of-a-Storm-topology.html . A worker process executes a subset of a topology. A worker process belongs to a specifi

storm消费kafka实现实时计算

大致架构 * 每个应用实例部署一个日志agent * agent实时将日志发送到kafka * storm实时计算日志 * storm计算结果保存到hbase storm消费kafka 创建实时计算项目并引入storm和kafka相关的依赖 <dependency> <groupId>org.apache.storm</groupId> <artifactId>storm-core</artifactId> <version>1.0.

trident教程

(一)理论基础更多理论以后再补充,或者参考书籍1.trident是什么?Trident is a high-level abstraction for doing realtime computing on top of Storm. It allows you to seamlessly intermix high throughput (millions of messages per second), stateful stream processing with low latency

storm-kafka教程

一.原理介绍 ? ? 本文内容参考:https://github.com/apache/storm/tree/master/external/storm-kafka#brokerhosts (一)使用storm-kafka的关键步骤 1.创建ZkHosts 当storm从kafka中读取某个topic的消息时,需要知道这个topic有多少个分区,以及这些分区放在哪个kafka节点(broker)上, ZkHosts就是用于这个功能. 关于kafka信息在zk中的内容请参考:http://blog

编写脚本实用工具

1.查看哪个文件占用最大 查看前十名磁盘空间用户,到第11行,sed会删除列表的剩余部分,然后给列表中每行一个行号.要让行号和磁盘空间文本位于同一行,用N命令将文本行合并在一行.然后用gawk命令清理,在行号后,加一个冒号(:),还给每行文本的输出行中的每个字段放了一个制表符.这样就生成了一个格式精致的前十名磁盘空间用户列表了 [[email protected] qingyun]# du -Sh /home/*| sort -rn | sed '{11,$D;=}' | sed 'N;s/\n

实时分析之客户画像项目实践

客户画像的背景描写叙述 原来的互联网,以解决用户需求为目的.衍生出众多的网联网产品,以及产生呈数量级递增的海量数据.当用户需求基本得到满足的时候,须要分析这些海量的数据.得以达到最高效的需求实现,最智能的功能服务.以及最精准的产品推荐,最后提升产品的竞争力.简言之,产品由原来的需求驱动转换成数据驱动. 客户画像就是数据驱动的代表作之中的一个.详细点讲,客户画像就是用户的标签(使用该产品的群体),程序能自己主动调整.组合.生成这些标签,最后再通过这些标签.达到精准营销的目的. 当前流行的实时分析框

trident介绍

(一)理论基础 很多其它理论以后再补充,或者參考书籍 1.trident是什么? Trident is a high-level abstraction for doing realtime computing on top of Storm. It allows you to seamlessly intermix high throughput (millions of messages per second), stateful stream processing with low lat

storm的trident编程模型

storm的基本概念别人总结的, https://blog.csdn.net/pickinfo/article/details/50488226 编程模型最关键最难就是实现局部聚合的业务逻辑聚合类实现Aggregator接口重写方法aggregate,聚合使用存储中间聚合过程状态的类,本地hashmap的去重逻辑还有加入redis后进行的一些去重操作,数据的持久(判断三天内的带播控量) public class SaleSum implements Aggregator<SaleSumState