No output fields defined for component:xxx::defaul

学习jstorm过程中,碰到一问题:



ERROR com.alibaba.jstorm.daemon.supervisor.SyncProcessEvent  - Failed Sync Process

java.lang.IllegalArgumentException: No output fields defined for component:stream wordNormalizer_componentId:default

at backtype.storm.task.GeneralTopologyContext.getComponentOutputFields(GeneralTopologyContext.java:114)

at backtype.storm.task.TopologyContext.getThisOutputFields(TopologyContext.java:157)

at com.alibaba.jstorm.cluster.Common.outbound_components(Common.java:600)

at com.alibaba.jstorm.task.Task.makeSendTargets(Task.java:133)

at com.alibaba.jstorm.task.Task.echoToSystemBolt(Task.java:162)

at com.alibaba.jstorm.task.Task.execute(Task.java:244)

at com.alibaba.jstorm.task.Task.mk_task(Task.java:289)

at com.alibaba.jstorm.daemon.worker.Worker.createTasks(Worker.java:123)

at com.alibaba.jstorm.daemon.worker.Worker.execute(Worker.java:218)

at com.alibaba.jstorm.daemon.worker.Worker.mk_worker(Worker.java:258)

at com.alibaba.jstorm.daemon.supervisor.SyncProcessEvent.launchWorker(SyncProcessEvent.java:402)

at com.alibaba.jstorm.daemon.supervisor.SyncProcessEvent.startNewWorkers(SyncProcessEvent.java:828)

at com.alibaba.jstorm.daemon.supervisor.SyncProcessEvent.run(SyncProcessEvent.java:157)

at com.alibaba.jstorm.event.EventManagerImpExecute.run(EventManagerImpExecute.java:38)

at java.lang.Thread.run(Thread.java:745)



最后折腾了一下,改变了一下数据流分组就不报这个错误了。

情况如下:

1. spout产生的数据流带了streamId,

即:

collector.emit(Chapter2CommonConstant.wordProducer_streamId, new Values(random));

	declarer.declareStream(Chapter2CommonConstant.wordProducer_streamId, new Fields(Chapter2CommonConstant.wordProducer_fields));

2.bolt产生的数据流也定义了入spout的数据流定义。

3. 拓扑数据流流向配置:

topologyBuilder.setBolt(Chapter2CommonConstant.wordNormalizer_componentId, new WordNormalizerBolt(), 1)
				.fieldsGrouping(Chapter2CommonConstant.wordProducer_componentId, new
						Fields(Chapter2CommonConstant.wordProducer_fields));

		topologyBuilder.setBolt(Chapter2CommonConstant.wordCounter_componentId, new WordCounterBolt(), 1)
				.fieldsGrouping(Chapter2CommonConstant.wordNormalizer_componentId, new
						Fields(Chapter2CommonConstant.wordNormalizer_fields));

这种情况下会报本文给出的异常。

解决办法:

3中拓扑数据流流向配置改为:就不会出现异常。

topologyBuilder.setBolt(Chapter2CommonConstant.wordNormalizer_componentId, new WordNormalizerBolt(), 1)
				.shuffleGrouping(Chapter2CommonConstant.wordProducer_componentId, Chapter2CommonConstant.wordProducer_streamId);

		topologyBuilder.setBolt(Chapter2CommonConstant.wordCounter_componentId, new WordCounterBolt(), 1)
				.shuffleGrouping(Chapter2CommonConstant.wordNormalizer_componentId, Chapter2CommonConstant.wordNormalizer_streamId);

完整代码:

package com.doctor.ebook.getting_started_with_storm;

/**
 * @author doctor
 *
 * @time 2015年4月25日 下午10:59:20
 */
public final class Chapter2CommonConstant {

	public static final String wordProducer_componentId = "wordProducer_componentId";
	public static final String wordProducer_streamId = "wordProducer_streamId";
	public static final String wordProducer_fields = "wordProducer_randomString";

	public static final String wordNormalizer_componentId = "wordNormalizer_componentId";
	public static final String wordNormalizer_streamId = "wordNormalizer_streamId";
	public static final String wordNormalizer_fields = "wordNormalizer_fields";

	public static final String wordCounter_componentId = "wordCounter_componentId";
}
package com.doctor.ebook.getting_started_with_storm;

import java.util.Map;
import java.util.concurrent.TimeUnit;

import org.apache.commons.lang.RandomStringUtils;

import backtype.storm.spout.SpoutOutputCollector;
import backtype.storm.task.TopologyContext;
import backtype.storm.topology.OutputFieldsDeclarer;
import backtype.storm.tuple.Fields;
import backtype.storm.tuple.Values;

import com.doctor.common.ContextBaseRichSpout;

/**
 * @author doctor
 *
 * @time 2015年4月25日 下午10:39:21
 */
public class WordProducerSpout extends ContextBaseRichSpout {
	private static final long serialVersionUID = -930888930597360858L;
	private String content = "A spout emits a list of defined fields. This architecture allows you to have" +
			"different kinds of bolts reading the same spout stream, which can then" +
			"define fields for other bolts to consume and so on";

	/**
	 * open is the first method called in any spout.
	 * 
	 * The parameters it receives are the TopologyContext, which contains all our topology data; the conf object, which is created
	 * in the topology definition; and the SpoutOutputCollector, which enables us to emit the data that will be processed by the
	 * bolts.
	 */
	@Override
	public void open(Map conf, TopologyContext context, SpoutOutputCollector collector) {
		super.open(conf, context, collector);
	}

	/**
	 * from this method, we’ll emit values to be processed by the bolts.
	 */
	@Override
	public void nextTuple() {
		String random = RandomStringUtils.random(6, content);
		try {
			TimeUnit.SECONDS.sleep(1);
			collector.emit(Chapter2CommonConstant.wordProducer_streamId, new Values(random));
			log.info("WordProducerSpout:" + random);
		} catch (InterruptedException e) {
			log.error("TimeUnit.SECONDS.sleep.error", e);
		}

	}

	@Override
	public void declareOutputFields(OutputFieldsDeclarer declarer) {
		declarer.declareStream(Chapter2CommonConstant.wordProducer_streamId, new Fields(Chapter2CommonConstant.wordProducer_fields));

	}

}
package com.doctor.ebook.getting_started_with_storm;

import java.util.Map;

import backtype.storm.task.OutputCollector;
import backtype.storm.task.TopologyContext;
import backtype.storm.topology.OutputFieldsDeclarer;
import backtype.storm.tuple.Fields;
import backtype.storm.tuple.Tuple;
import backtype.storm.tuple.Values;

import com.doctor.common.ContextBaseRichBolt;

/**
 * @author doctor
 *
 * @time 2015年4月25日 下午11:14:27
 */
public class WordNormalizerBolt extends ContextBaseRichBolt {
	private static final long serialVersionUID = -1244951787400604294L;

	@Override
	public void prepare(@SuppressWarnings("rawtypes") Map stormConf, TopologyContext context, OutputCollector collector) {
		super.prepare(stormConf, context, collector);
	}

	/**
	 * The bolt will receive the line from the words file and process it to Normalize this line
	 * 
	 * The normalize will be put the words in lower case
	 */
	@Override
	public void execute(Tuple input) {
		if (Chapter2CommonConstant.wordProducer_componentId.equals(input.getSourceComponent()) &&
				Chapter2CommonConstant.wordProducer_streamId.equals(input.getSourceStreamId())) {
			String field = input.getStringByField(Chapter2CommonConstant.wordProducer_fields);
			log.info("WordNormalizer.execute:" + field);
			field = field.toLowerCase();
			collector.emit(Chapter2CommonConstant.wordNormalizer_streamId, new Values(field));
		}

	}

	@Override
	public void declareOutputFields(OutputFieldsDeclarer declarer) {
		declarer.declareStream(Chapter2CommonConstant.wordNormalizer_streamId, new Fields(Chapter2CommonConstant.wordNormalizer_fields));

	}

}
package com.doctor.ebook.getting_started_with_storm;

import java.util.HashMap;
import java.util.Map;

import backtype.storm.task.OutputCollector;
import backtype.storm.task.TopologyContext;
import backtype.storm.topology.OutputFieldsDeclarer;
import backtype.storm.tuple.Tuple;

import com.doctor.common.ContextBaseRichBolt;

/**
 * @author doctor
 *
 * @time 2015年4月25日 下午11:35:05
 */
public class WordCounterBolt extends ContextBaseRichBolt {
	private static final long serialVersionUID = 8157872805076023917L;
	private Map<String, Integer> counters;

	@Override
	public void prepare(@SuppressWarnings("rawtypes") Map stormConf, TopologyContext context, OutputCollector collector) {
		super.prepare(stormConf, context, collector);
		counters = new HashMap<>();
	}

	@Override
	public void execute(Tuple input) {
		if (Chapter2CommonConstant.wordNormalizer_componentId.equals(input.getSourceComponent()) &&
				Chapter2CommonConstant.wordNormalizer_streamId.equals(input.getSourceStreamId())) {

			String field = input.getStringByField(Chapter2CommonConstant.wordNormalizer_fields);
			if (counters.containsKey(field)) {
				Integer num = counters.get(field);
				counters.put(field, num + 1);
				log.info("WordCounterBolt.execute:" + field + ":" + num + 1);
			} else {
				counters.put(field, 1);
				log.info("WordCounterBolt.execute:" + field + ":" + 1);

			}
		}

	}

	@Override
	public void cleanup() {
		counters.clear();
		super.cleanup();
	}

	@Override
	public void declareOutputFields(OutputFieldsDeclarer declarer) {
		// TODO Auto-generated method stub

	}

}
package com.doctor.ebook.getting_started_with_storm;

import backtype.storm.Config;
import backtype.storm.LocalCluster;
import backtype.storm.topology.TopologyBuilder;

/**
 * @author doctor
 *
 * @time 2015年4月25日 下午10:34:14
 */
public class Chapter2TopologyMain {

	/**
	 * @param args
	 */
	public static void main(String[] args) {
		TopologyBuilder topologyBuilder = new TopologyBuilder();
		topologyBuilder.setSpout(Chapter2CommonConstant.wordProducer_componentId, new WordProducerSpout(), 1);

		// topologyBuilder.setBolt(Chapter2CommonConstant.wordNormalizer_componentId, new WordNormalizerBolt(), 1)
		// .fieldsGrouping(Chapter2CommonConstant.wordProducer_componentId, new
		// Fields(Chapter2CommonConstant.wordProducer_fields));
		//
		// topologyBuilder.setBolt(Chapter2CommonConstant.wordCounter_componentId, new WordCounterBolt(), 1)
		// .fieldsGrouping(Chapter2CommonConstant.wordNormalizer_componentId, new
		// Fields(Chapter2CommonConstant.wordNormalizer_fields));

		topologyBuilder.setBolt(Chapter2CommonConstant.wordNormalizer_componentId, new WordNormalizerBolt(), 1)
				.shuffleGrouping(Chapter2CommonConstant.wordProducer_componentId, Chapter2CommonConstant.wordProducer_streamId);

		topologyBuilder.setBolt(Chapter2CommonConstant.wordCounter_componentId, new WordCounterBolt(), 1)
				.shuffleGrouping(Chapter2CommonConstant.wordNormalizer_componentId, Chapter2CommonConstant.wordNormalizer_streamId);

		LocalCluster localCluster = new LocalCluster();

		Config conf = new Config();
		conf.setDebug(true);
		conf.setNumWorkers(1);
		localCluster.submitTopology("Chapter2TopologyMain", conf, topologyBuilder.createTopology());

	}

}

注释掉的数据流配置会出现本文给出的异常。

这个由于对jstorm源码没研究,也不知道是怎么回事。

时间: 2024-08-28 04:34:14

No output fields defined for component:xxx::defaul的相关文章

storm运行异常之No output fields defined for component:stream XxxBolt:null

错误log: 2015-01-05 17:22:49 [Thread-38-log] ERROR backtype.storm.daemon.executor - java.lang.RuntimeException: java.lang.IllegalArgumentException: No output fields defined for component:stream XxxBolt:null at backtype.storm.utils.DisruptorQueue.consum

storm运行异常之No output fields defined for component:stream XxxBolt:null疑案追踪

前言 上一篇写了 storm运行异常之No output fields defined for component:stream XxxBolt:null 发现是多线程导致的,但是也有可能是其他原因,今天就来追踪一下. 反查蛛丝马迹 错误log: Caused by: java.lang.IllegalArgumentException: No output fields defined for component:stream XxxBolt:null at backtype.storm.ta

struts2的一个异常:No result defined for action XXX and result input

同事在使用struts2的时候,出现两个问题: 1.No result defined for action XXX and result input: 2.前端js使用jQuery的i18n一直显示的是英语: 而且关键是,在大部分的浏览器上都是正常的,只有在个别电脑上的浏览器会出现这个问题. 网上很多资料显示是因为前端form表单提交的数据类型和后台action中的字段类型不匹配引起的,可是检查了发现了后台action中字段的类型除了String就是Date,没有什么int等,这就奇怪了.于是

Can&#39;t create component &#39;xxx.xxx.xxx&#39; as it has dependencies to be satisfied

问题描述: Can't create component 'xxx.xxx.xxx' as it has dependencies to be satisfied. 问题原由: 没有对新建的实体映射类创建对应的数据库表. 执行add-migration  xxx      update-database 在add-migration前一定要在DbContext中去配置即将映射的实体.搞忘了 Can't create component 'xxx.xxx.xxx' as it has depend

Unable to instantiate Action, xxxAction, defined for &amp;#39;xxx&amp;#39; in namespace &amp;#39;/&amp;#39;xxxAction解决方式

出现这个问题的解决办法主要有两个 1.假设项目没有使用Spring,则struts.xml配置文件里,这个action的class属性的路径没有写完整,应该是包名.类名 2.假设项目使用了Spring.那就是applicationContext.xml里面没有为这个action定义bean.这样strus.xml中的相应action的class属性的值就是Spring配置文件里bean的id.比方: applicationContext.xml <bean id="adminAction&

SQL Server get SP parameters and get output fields type information

Summary 本文主要介绍一下,SQL里面的两个很实用的两个操作: 获取存储过程的参数信息 SELECT * FROM INFORMATION_SCHEMA.PARAMETERS WHERE SPECIFIC_NAME='proc_name' ORDER BY ORDINAL_POSITION 获取SQL 语句的执行结果元数据 对应的存储过程是: sp_describe_first_result_set 例子: sp_describe_first_result_set @tsql = N'SE

详解User Defined Java Class步骤(二)

 详解User Defined Java Class步骤(二) kettle中的"user defined java class"步骤,也称UDJC步骤,从4.0版本就有,功能非常强大,无所不能:可以在其中写任意代码,却不影响效率.本文将详细介绍在不同场景中用示例展示如果使用该步骤,由于内容非常多,便于阅读方便,把内容分成三部分,请完整看完全部内容,示例代码在这里下载. 如果没有从第一部分开始,请访问第一部分. 使用步骤参数(Step Parameter) 如果你写了一段代码,如果

详解User Defined Java Class步骤(三)

 详解User Defined Java Class步骤(三) kettle中的"user defined java class"步骤,也称UDJC步骤,从4.0版本就有,功能非常强大,无所不能:可以在其中写任意代码,却不影响效率.本文将详细介绍在不同场景中用示例展示如果使用该步骤,由于内容非常多,便于阅读方便,把内容分成三部分,请完整看完全部内容,示例代码在这里下载. 如果没有看第二部分,请先访问第二部分. 错误处理 udjc步骤支持kettle的错误处理特性,从udjc步骤拖动

[Angular] Test component template

Component: import { Component, Input, ChangeDetectionStrategy, EventEmitter, Output } from '@angular/core'; @Component({ selector: 'stock-counter', changeDetection: ChangeDetectionStrategy.OnPush, template: ` <div class="stock-counter"> &l