Storm可支持多种语言,其中就有python .
首先需要创建一个类,
public static class BasieCalculateBolt extends ShellBolt implements IRichBolt { public BasieCalculateBolt() { super("python", "bolt_base_calculate.py"); } @Override public void declareOutputFields(OutputFieldsDeclarer declarer) { declarer.declare(new Fields("word")); } @Override public Map<String, Object> getComponentConfiguration() { return null; } }
引用的bolt_base_calculate.py放置的目录必须为本项目的resources目录,本项目的py文件放置于mutilang/resources目录下,则要在maven的pom.xml文件中将其设置为resource目录。
<build> <sourceDirectory>src/jvm</sourceDirectory> <testSourceDirectory>test/jvm</testSourceDirectory> <resources> <resource> <directory>${basedir}/multilang</directory> </resource> </resources> <plugins> <plugin> <artifactId>maven-assembly-plugin</artifactId> </plugin> </plugins> </build>
一个最简单的Python bolt如下所示:
import storm class SplitSentenceBolt(storm.BasicBolt): def process(self, tup): words = tup.values[0].split(" ") for word in words: storm.emit([word]) SplitSentenceBolt().run()
在resources目录下还需放置在官网上下载的最新storm.py文件,https://github.com/apache/storm/blob/master/bin/storm.py。
python的bolt中不可有print语句,因为storm中Python bolt和其他bolt之间数据的传递的便是通过监控console输出的数据。但是我们在Python中需要打印一些log来查看程序的运行,此时可使用log,即创建一个log.py
import logging import logging.config import os logging.config.fileConfig(‘logging.conf‘) # create logger 下面是你工程的名称 logger = logging.getLogger(‘calculateEngine‘)
logging.conf的配置可设置为
[loggers] keys=root,calculateEngine [handlers] keys=fileHandler,consoleHandler [formatters] keys=simpleFormatter [logger_root] level=DEBUG handlers=consoleHandler [logger_calculateEngine] level=INFO handlers=fileHandler qualname=calculateEngine propagate=0 [handler_consoleHandler] class=StreamHandler level=WARN formatter=simpleFormatter args=(sys.stdout,) [handler_fileHandler] class=FileHandler level=DEBUG maxBytes=10485760 backupCount=20 encoding=utf8 formatter=simpleFormatter args=(os.path.join(os.path.dirname(__file__),‘asien_calculate.log‘),‘a‘) [formatter_simpleFormatter] format=%(asctime)s - %(name)s - %(levelname)s - %(message)s
在python中的使用,只需from log import logger ,log.info("")即可
时间: 2024-10-08 15:44:41