storm 中的Python bolt的注意事项

Storm可支持多种语言,其中就有python .

首先需要创建一个类,

    public static class BasieCalculateBolt extends ShellBolt implements
            IRichBolt {

        public BasieCalculateBolt() {
            super("python", "bolt_base_calculate.py");
        }

        @Override
        public void declareOutputFields(OutputFieldsDeclarer declarer) {
            declarer.declare(new Fields("word"));
        }

        @Override
        public Map<String, Object> getComponentConfiguration() {
            return null;
        }
    }
引用的bolt_base_calculate.py放置的目录必须为本项目的resources目录,本项目的py文件放置于mutilang/resources目录下,则要在maven的pom.xml文件中将其设置为resource目录。
    <build>
        <sourceDirectory>src/jvm</sourceDirectory>
        <testSourceDirectory>test/jvm</testSourceDirectory>
        <resources>
            <resource>
                <directory>${basedir}/multilang</directory>
            </resource>
        </resources>
        <plugins>
            <plugin>
                <artifactId>maven-assembly-plugin</artifactId>
            </plugin>
        </plugins>
    </build>

一个最简单的Python bolt如下所示:

import storm

class SplitSentenceBolt(storm.BasicBolt):
    def process(self, tup):
        words = tup.values[0].split(" ")
        for word in words:
          storm.emit([word])

SplitSentenceBolt().run()

在resources目录下还需放置在官网上下载的最新storm.py文件,https://github.com/apache/storm/blob/master/bin/storm.py。

python的bolt中不可有print语句,因为storm中Python bolt和其他bolt之间数据的传递的便是通过监控console输出的数据。但是我们在Python中需要打印一些log来查看程序的运行,此时可使用log,即创建一个log.py

    import logging
    import logging.config
    import os

    logging.config.fileConfig(‘logging.conf‘)
    # create logger 下面是你工程的名称
    logger = logging.getLogger(‘calculateEngine‘) 

logging.conf的配置可设置为

    [loggers]
    keys=root,calculateEngine

    [handlers]
    keys=fileHandler,consoleHandler

    [formatters]
    keys=simpleFormatter

    [logger_root]
    level=DEBUG
    handlers=consoleHandler

    [logger_calculateEngine]
    level=INFO
    handlers=fileHandler
    qualname=calculateEngine
    propagate=0

    [handler_consoleHandler]
    class=StreamHandler
    level=WARN
    formatter=simpleFormatter
    args=(sys.stdout,)

    [handler_fileHandler]
    class=FileHandler
    level=DEBUG
    maxBytes=10485760
    backupCount=20
    encoding=utf8
    formatter=simpleFormatter
    args=(os.path.join(os.path.dirname(__file__),‘asien_calculate.log‘),‘a‘)

    [formatter_simpleFormatter]
    format=%(asctime)s - %(name)s - %(levelname)s - %(message)s

在python中的使用,只需from log import logger ,log.info("")即可

时间: 2024-12-13 11:11:48

storm 中的Python bolt的注意事项的相关文章

storm中的一些概念

1.topology 一个topolgy是spouts和bolts组成的图,通过stream groupings将图中的spout和bolts连接起来:如图所示: 一个topology会一直运行知道你手动kill掉,Storm自动重新分配执行失败的任务,并且Storm可以保证你不会有数据丢失(如果开启了高可靠性的话).如果一些机器意外停机它上面的所有任务会被转移到其他机器上: 运行一个toplogy很简单,首先,把你所有的代码以及所依赖的jar打进一个jar中.然后运行类似下面的命令: stor

Storm中Spout使用注意事项小结

Storm中Spout用于读取并向计算拓扑中发送数据源,最近在调试一个topology时遇到了系统qps低,处理速度达不到要求的问题,经过排查后发现是由于对Spout的使用模式不当导致的多线程同步等待.这里罗列几点个人觉得编写Spout代码时需要特别注意的地方: 1. 最常用的模式是使用一个线程安全的queue,如BlockingQueue,spout主线程从queue中读取数据:另外的一个或多个线程负责从数据源(如各种消息中间件.db等)读取数据并放入queue中. 2. 如果不关心数据是否丢

转:storm中一个Bolt发emit多次相同类型消息

在storm中的Bolt中可以处理完成逻辑后,向后面的Blot继续发送消息. 可以发送多个不同的消息,如: collector.emit("update-delivered-status",new Values(emailDeliverStatus)); collector.emit("save-request",new Values(udsn)); 也可以同一个类型的消息发送多个不同内容如; for (int i = 0; i < emailParamVo.

storm源码之理解Storm中Worker、Executor、Task关系【转】

[原]storm源码之理解Storm中Worker.Executor.Task关系 Storm在集群上运行一个Topology时,主要通过以下3个实体来完成Topology的执行工作:1. Worker(进程)2. Executor(线程)3. Task 下图简要描述了这3者之间的关系:                                                    1个worker进程执行的是1个topology的子集(注:不会出现1个worker为多个topology服

Storm中的可靠性

我们知道Storm有一个很重要的特性,那就是Storm API能够保证它的一个Tuple能够被完全处理,这一点尤为重要,其实storm中的可靠性是由spout和bolt组件共同完成的,下面就从spout和bolt两个方便给大家介绍一下storm中的可靠性,最后会给出一个实现了可靠性的例子. 1.Spout的可靠性保证 在Storm中,消息处理可靠性从Spout开始的.storm为了保证数据能正确的被处理, 对于spout产生的每一个tuple,storm都能够进行跟踪,这里面涉及到了ack/fa

在C#中实现Python的切割技术

在C#中实现Python的切割技术 前言 之前在学习Python的时候发现Python中的切割技术超好玩的,本人也是正则表达式热爱狂,平时用C#比较多,所以决定把Python中的切割技术在C#中实现,添加到个人类库中,以便日后在写C#代码的时候能舔一舔Python的味道. 效果展示   Python版:  C#版: 切割技术讲解 这里先简要讲解一下Python中的切割技术,其他Python前辈也对此技术有丰富多彩的讲解文章,这里只是简要说明一下,好让读者们能知道下怎么回事,如果想更深入了解Pyt

storm中的基本概念

Storm是一个流计算框架,处理的数据是实时消息队列中的,所以需要我们写好一个topology逻辑放在那,接收进来的数据来处理,所以是通过移动数据平均分配到机器资源来获得高效率. Storm的优点是全内存计算,因为内存寻址速度是硬盘的百万倍以上,所以Storm的速度相比较Hadoop非常快(瓶颈是内存,cpu).其缺点就是不够灵活:必须要先写好topology结构来等数据进来分析. Storm 关注的是数据多次处理一次写入,而 Hadoop 关注的是数据一次写入,多次查询使用.Storm系统运行

eclipse中集成python开发环境

Eclipse简介 Eclipse是java开发最常用的IDE,功能强大,可以在MAC和Windos上运行,学习简单. pydev简介 一个功能强大的 Eclipse插件,用户可以完全利用 Eclipse 来进行 Python 应用程序的开发和调试.它提供了一些很好的功能,如:语法错误提示.源代码编辑助手.Quick Outline.Globals Browser.Hierarchy View.运行和调试等等.拥有诸多强大的功能,同时也非常易于使用. 详细步骤 1.安装python环境 下载地址

Python 3.X 调用多线程C模块,并在C模块中回调python函数的示例

由于最近在做一个C++面向Python的API封装项目,因此需要用到C扩展Python的相关知识.在此进行简要的总结. 此篇示例分为三部分.第一部分展示了如何用C在Windows中进行多线程编程:第二部分将第一部分的示例进行扩展,展示了如何在python中调用多线程的C模块:第三部分扩展了第二部分,增加了在C模块的线程中回调python的演示. 本文所用的环境为:64位Win7 + python 3.4 x86 + vs2010 一.windows下的C语言多线程程序 windows下多线程编程