jstorm在使用kafka作为spout的时候,高并发情况下会出现多线程报错问题需要对这两个类进行适当的修改来避免上述问题: storm.kafka.PartitionManager storm.kafka.ExponentialBackoffMsgRetryManager1.storm.kafka.PartitionManager的修改
//将变量 private SortedMap<Long, Long> _pending = new TreeMap(); //改为: private SortedMap<Long, Long> _pending = Collections.synchronizedSortedMap(new TreeMap<Long, Long>()); /**----------------------------------------------------------------------------------------------------**/ //将方法 public long lastCompletedOffset() { return this._pending.isEmpty()?this._emittedToOffset.longValue():((Long)this._pending.firstKey()).longValue(); } //改为: public long lastCompletedOffset() { synchronized (_pending) { if (_pending.isEmpty()) { return _emittedToOffset; } else { return _pending.firstKey(); } } }
2.storm.kafka.ExponentialBackoffMsgRetryManager的修改
//将 private Queue<ExponentialBackoffMsgRetryManager.MessageRetryRecord> waiting = new PriorityQueue(11, new ExponentialBackoffMsgRetryManager.RetryTimeComparator()); private Map<Long, ExponentialBackoffMsgRetryManager.MessageRetryRecord> records = new ConcurrentHashMap(); //改为: private Queue<MessageRetryRecord> waiting = new PriorityBlockingQueue<MessageRetryRecord>(11, new RetryTimeComparator()); private Map<Long,MessageRetryRecord> records = new ConcurrentHashMap<Long,MessageRetryRecord>();
时间: 2024-10-17 11:51:11