storm坑之---同步问题

  最近在做一个监控系统,用来监控网站上各个业务功能的调用量以及处理时间,以便及时发现问题,及时处理。做这种实时统计处理系统,自然首先想到了storm,于是现学现用,自然遇到了一些坑,而且不少是网上也难以找到的问题。在这里就做个记录,记录下这个最让我苦恼的错误。

  首先我的业务逻辑是按分钟统计一分钟中的调用次数的数据,所以我在bolt里跑了一个定时器,定时将统计数据发到下一个bolt入库。所在我在定时器执行的代码里调用了OutputCollector发射到下一个bolt。本地调试没啥问题,就部署到外网环境测试。通常也没发现问题,但是偶尔会出现这种错误,作为开发人员最讨厌的就是这种可复现率很低的错误 。

  这里是错误日志:

5675 [Thread-7-disruptor-executor[2 2]-send-queue] ERROR backtype.storm.daemon.executor -
java.lang.RuntimeException: java.lang.NullPointerException
	at backtype.storm.utils.DisruptorQueue.consumeBatchToCursor(DisruptorQueue.java:128) ~[storm-core-0.9.3.jar:0.9.3]
	at backtype.storm.utils.DisruptorQueue.consumeBatchWhenAvailable(DisruptorQueue.java:99) ~[storm-core-0.9.3.jar:0.9.3]
	at backtype.storm.disruptor$consume_batch_when_available.invoke(disruptor.clj:80) ~[storm-core-0.9.3.jar:0.9.3]
	at backtype.storm.disruptor$consume_loop_STAR_$fn__1460.invoke(disruptor.clj:94) ~[storm-core-0.9.3.jar:0.9.3]
	at backtype.storm.util$async_loop$fn__464.invoke(util.clj:463) ~[storm-core-0.9.3.jar:0.9.3]
	at clojure.lang.AFn.run(AFn.java:24) [clojure-1.5.1.jar:na]
	at java.lang.Thread.run(Thread.java:722) [na:1.7.0_15]
Caused by: java.lang.NullPointerException: null
	at clojure.lang.RT.intCast(RT.java:1087) ~[clojure-1.5.1.jar:na]
	at backtype.storm.daemon.worker$mk_transfer_fn$fn__3549.invoke(worker.clj:129) ~[storm-core-0.9.3.jar:0.9.3]
	at backtype.storm.daemon.executor$start_batch_transfer__GT_worker_handler_BANG_$fn__3283.invoke(executor.clj:258) ~[storm-core-0.9.3.jar:0.9.3]
	at backtype.storm.disruptor$clojure_handler$reify__1447.onEvent(disruptor.clj:58) ~[storm-core-0.9.3.jar:0.9.3]
	at backtype.storm.utils.DisruptorQueue.consumeBatchToCursor(DisruptorQueue.java:125) ~[storm-core-0.9.3.jar:0.9.3]
	... 6 common frames omitted

5697 [Thread-7-disruptor-executor[2 2]-send-queue] ERROR backtype.storm.util - Halting process: ("Worker died")
java.lang.RuntimeException: ("Worker died")
	at backtype.storm.util$exit_process_BANG_.doInvoke(util.clj:325) [storm-core-0.9.3.jar:0.9.3]
	at clojure.lang.RestFn.invoke(RestFn.java:423) [clojure-1.5.1.jar:na]
	at backtype.storm.daemon.worker$fn__3808$fn__3809.invoke(worker.clj:452) [storm-core-0.9.3.jar:0.9.3]
	at backtype.storm.daemon.executor$mk_executor_data$fn__3274$fn__3275.invoke(executor.clj:240) [storm-core-0.9.3.jar:0.9.3]
	at backtype.storm.util$async_loop$fn__464.invoke(util.clj:473) [storm-core-0.9.3.jar:0.9.3]
	at clojure.lang.AFn.run(AFn.java:24) [clojure-1.5.1.jar:na]
	at java.lang.Thread.run(Thread.java:722) [na:1.7.0_15]

  如果你也遇到这个问题,相信你第一次看到这个错误一定很痛苦,因为错误日志中没有任何与自己的业务代码相关的记录。所以实在是无从定位问题的所在。痛苦至极的是复现还不那么容易。

  经过我多次猜测尝试,终于测出了问题的所在。下面我先贴出一个会报这个错误的例子代码:

public class Main {

	public static void main(String[] args) {
		TopologyBuilder builder = new TopologyBuilder();
		builder.setSpout("spout",new TestWordSpout());

		builder.setBolt("dispatch", new WordDispatchBolt()).shuffleGrouping("spout");
		builder.setBolt("print",new PrintBolt()).fieldsGrouping("dispatch", new Fields("word"));

		Config conf = new Config();

		conf.setDebug(false);
		conf.setNumWorkers(1);
		//conf.put(Config.TOPOLOGY_MAX_SPOUT_PENDING, 1);
		LocalCluster cluster = new LocalCluster();
		cluster.submitTopology("test-kafka-1", conf, builder.createTopology());
	}

}

  

public class TestWordSpout extends BaseRichSpout {

  private static final long serialVersionUID = 1L;
    boolean _isDistributed;
    SpoutOutputCollector _collector;
    String[] words = new String[] {"nathan", "mike", "jackson", "golda", "bertels"};
    public TestWordSpout() {
        this(true);
    }

    public TestWordSpout(boolean isDistributed) {
        _isDistributed = isDistributed;
    }

    public void open(Map conf, TopologyContext context, SpoutOutputCollector collector) {
        _collector = collector;
    }

    public void close() {

    }

    public void nextTuple() {
        Utils.sleep(1000);
        final Random rand = new Random();
        final String word = words[rand.nextInt(words.length)];
        _collector.emit(new Values(word), word+new Random().nextDouble());
    }

    public void ack(Object msgId) {
    	System.out.println("### ack:"+msgId);
    }

    public void fail(Object msgId) {
        System.out.println("### fail:"+msgId);
    }

    public void declareOutputFields(OutputFieldsDeclarer declarer) {
        declarer.declare(new Fields("word"));
    }

  

public class WordDispatchBolt extends BaseRichBolt{

	private OutputCollector collector;

	@Override
	public void prepare(Map stormConf, TopologyContext context,
			OutputCollector collector) {
		this.collector = collector;

		new Thread(new Runnable() {

			@Override
			public void run() {
				while(true){
					send();//不做sleep休眠,否则抛出此异常的几率太小,不容易观察到
				}
			}
		}).start();
	}

	public void send(){
		this.collector.emit(new Values(new Random().nextDouble()));
	}
	@Override
	public void execute(Tuple input) {
		String word = input.getStringByField("word");
		this.collector.emit(new Values(word));
		this.collector.ack(input);
	}

	@Override
	public void declareOutputFields(OutputFieldsDeclarer declarer) {
		declarer.declare(new Fields("word"));
	}

}

  

public class PrintBolt extends BaseRichBolt {

	private static final long serialVersionUID = 1L;

	@Override
	public void prepare(Map stormConf, TopologyContext context,
			OutputCollector collector) {
	}

	@Override
	public void execute(Tuple input) {
		System.out.println(input.getValue(0));
	}

	@Override
	public void declareOutputFields(OutputFieldsDeclarer declarer) {

	}

}

  这个代码很简单,就不做详细介绍了。在WordDispatchBolt类里我启动了另一个线程来发射数据到下一个bolt。我的业务代码中与此类似,是通过Timer定时发送数据的(Timer底层其实也是线程,就不多说了)。但是Timer是按分钟调用的,所以出现问题的几率小的可怜,这里我故意零停顿的调用,让此异常发生的几率更大一些。

  如果运行以上例子代码,你也肯定遇到前边贴出的错误异常。如果不知道是OutputCollector的同步问题,相信解决起来绝对让人痛不欲生。既然知道了是同步问题,要么避免在别的线程里调用collector,要么改成同步的。以下是我简单想到的解决方案。(如果有大神还有更好的,希望留言指教)

  对WordDispatchBolt类做如下修改:

public class WordDispatchBolt extends BaseRichBolt{

	private OutputCollector collector;

	@Override
	public void prepare(Map stormConf, TopologyContext context,
			OutputCollector collector) {
		this.collector = collector;

		new Thread(new Runnable() {

			@Override
			public void run() {
				while(true){
					send(new Values(new Random().nextDouble()));//不做sleep休眠,否则抛出此异常的几率太小,不容易观察到
				}
			}
		}).start();
	}

	public synchronized void send(List<Object> tuple){
		this.collector.emit(tuple);
	}
	@Override
	public void execute(Tuple input) {
		String word = input.getStringByField("word");
		send(new Values(word));
		this.collector.ack(input);
	}

	@Override
	public void declareOutputFields(OutputFieldsDeclarer declarer) {
		declarer.declare(new Fields("word"));
	}

}

  到这里,这个坑算是基本得到解决了。之后可能还要大量使用到storm,遇到坑是再做记录。

  ”把遇到的坑记录下来,让后遇到者可以有更多的网络资源查询,以减少排查问题的时间和纠结“

  

时间: 2024-11-05 06:19:15

storm坑之---同步问题的相关文章

storm坑之---传递对象

继之前遇到的那个同步问题的坑之后(storm坑之---同步问题),最近对代码又做了调整和重构,并且又遇到了另一个storm开发中应该值得警惕的坑.接下来说说这个坑的大体情况. 在我的storm程序中,Abolt需要将数据封装成一个对象同时发送给Bbolt和Cbolt各一份,Bbolt和Cbolt分别对对象做一定的处理后,更新到数据库.在查看日志时,意外的发现有些数据是不正确的诡异的,我先是怀疑算法问题,但又发现有部分数据又是正确的.算法应该没啥问题.纠结之下之后打印了更详细的日志,通过观察诡异数

UiAutomator2.0升级填坑记

UiAutomator2.0升级填坑记 SkySeraph May. 28th 2017 Email:[email protected] 更多精彩请直接访问SkySeraph个人站点:www.skyseraph.com 啰嗦 Google Android Developers 在2015年3月就发布了UiAutomator 2.0版本(下文简称U2),而公司的核心产品中用到还是UiAutomator老版本(下文简称U1),业界用U2的也不是很多,虽然有诸多问题和不便(如高版本OS中不支持Remo

如何快速处理线上故障

概述 线上故障通常是指大规模的影响线上服务可用性的问题或者事件,通俗点讲就是:掉'坑'里了,这个'坑'就是线上故障!线上故障的处理过程可以形象地表达为:'踩坑'.'跳坑'.'填坑'.'避坑'. 线上故障的处理不仅是一项技术活,更是对技术人员/技术团队反应能力.决策能力.判定能力.组织能力的考验.面对突发的生产故障,需要快速定位问题,找到解决方案,快速实施解决方案并不是一件容易的事情.本文主要包括如下内容:线上故障处理的目标.思路.步骤.基础设施.本文是依据平时经历的生产故障排查和处理,总结一些肤

ajax同步与异步的坑

之前工作中一个需求,需要动态的添加一组下拉菜单并为这个菜单绑定一个插件,很明显获取数据用Ajax,这本身是没错的,坑就坑在我用了 同步请求,当服务器端正确返回数据时再去执行下一个方法,这逻辑本身没有问题.所以当我写完之后,我发现页面加载速度及其的慢,我删除了一些多余的和http请求,仍然不能解决这个问题,然后我突然想到了Ajax同步和异步的问题,改成异步之后页面数据加载就快了很多,希望能正确的使用同步和异步. 在Jquery中ajax方法中async用于控制同步和异步,当async值为true时

.Net4.6 Task 异步OA现金盘平台出租函数 比 同步函数 慢5倍 踩坑经历

异步Task简单介绍本标题有点 哗众取宠OA现金盘平台出租QQ2952777280[话仙源码论坛]hxforum.com[木瓜源码论坛]papayabbs.com ,各位都别介意(不排除个人技术能力问题) -- 接下来:我将会用一个小Demo 把 本文思想阐述清楚. .Net 4.0 就有了 Task 函数 -- 异步编程模型 .Net 4.6 给 Task 增加了好几个 特别实用的方法,而且引入了 await async 语法糖 当然,这是非常不错的技术,奈何我有自己的线程队列封装,也就没有着

Log4j2同步异步性能比较以及教程和问题(坑)汇总

线程数:500个   每个线程日志输出次数: 500次 log4j2其实有两个输出异步日志的方式:AsyncLogger和AsyncAppend 他两的区别在于: AsyncLogger使用的是无锁高性能队列disruptor,底层是依赖数组实现的RingBuffer和CAS改变下标实现,并且不会出现伪共享缓存,关于disruptor的详情可以看下面这篇 https://ifeve.com/disruptor/     此网站貌似有点不稳定:) AsyncAppend使用的则是ArrayBloc

Oracle DBLink跨数据库访问SQL server数据同步 踩坑实录

项目需求:这里暂且叫A公司吧,A公司有一套人事管理软件,需要与我们公司的软件做人员信息同步,A公司用的是SQL server数据库,我们公司用的Oracle,接口都不会开发(一万句"fuck you"),就单单给我们公司提供了一个SQL server的账户和密码,还有一个视图.后来百度一番,可以通过DBLink跨数据库访问,然后做数据信息同步功能. 安装过程中,踩了不少的坑,需要配置很多的东西,QQ群里也请教不少人,都很少人听说还有这玩意,现在做数据对接,都是走到接口,传JSON字符串

storm kafkaSpout 踩坑问题记录! offset问题!

整合kafka和storm例子网上很多,自行查找 问题描述: kafka是之前早就搭建好的,新建的storm集群要消费kafka的主题,由于kafka中已经记录了很多消息,storm消费时从最开始消费 问题解决: 下面是摘自官网的一段话: How KafkaSpout stores offsets of a Kafka topic and recovers in case of failures As shown in the above KafkaConfig properties, you

Storm的ack机制在项目应用中的坑

正在学习storm的大兄弟们,我又来传道授业解惑了,是不是觉得自己会用ack了.好吧,那就让我开始啪啪打你们脸吧. 先说一下ACK机制: 为了保证数据能正确的被处理, 对于spout产生的每一个tuple, storm都会进行跟踪. 这里面涉及到ack/fail的处理,如果一个tuple处理成功是指这个Tuple以及这个Tuple产生的所有Tuple都被成功处理, 会调用spout的ack方法: 如果失败是指这个Tuple或这个Tuple产生的所有Tuple中的某一个tuple处理失败, 则会调