kettle processRow

public boolean processRow(StepMetaInterface smi, StepDataInterface sdi) throws KettleException { 

    meta = (LookupStepMeta) smi;
    data = (LookupStepData) sdi; 

    Object[] r = getRow(); // get row, blocks when needed!
    if (r == null) // no more input to be expected...
    {
        setOutputDone();
        return false;
    } 

    if (first) { 

        first = false; 

        // the size of the incoming rows
        data.inputSize = getInputRowMeta().size(); 

        // determine output field structure
        data.outputRowMeta = (RowMetaInterface) getInputRowMeta().clone();
        meta.getFields(data.outputRowMeta, getStepname(), null, null, this); 

        // stores default values in correct format
        data.defaultObjects = new Object[meta.getKeyField().length]; 

        // stores the indices where to look for the key fields in the input rows
        data.keyFieldIndex = new int[meta.getKeyField().length];
        data.conversionMeta = new ValueMetaInterface[meta.getKeyField().length]; 

        for (int i=0;i<meta.getKeyField().length;i++){ 

            // get output and from-string conversion format for each field
            ValueMetaInterface returnMeta = data.outputRowMeta.getValueMeta(i+data.inputSize); 

            ValueMetaInterface conversionMeta = returnMeta.clone();
            conversionMeta.setType(ValueMetaInterface.TYPE_STRING);
            data.conversionMeta[i] = conversionMeta; 

            // calculate default values
            if (!Const.isEmpty(meta.getOutputDefault()[i])){
                data.defaultObjects[i] = returnMeta.convertData(data.conversionMeta[i], meta.getOutputDefault()[i]);
            }
            else{
                data.defaultObjects[i] = null;
            } 

            // calc key field indices
            data.keyFieldIndex[i] = data.outputRowMeta.indexOfValue(meta.getKeyField()[i]);
            if (data.keyFieldIndex[i]<0)
            {
                throw new KettleStepException(BaseMessages.getString(PKG, "VoldemortStep.Error.UnableFindField",meta.getKeyField()[i],""+(i+1)));
            } 

        } 

    } 

    // generate output row, make it correct size
    Object[] outputRow = RowDataUtil.resizeArray(r, data.outputRowMeta.size()); 

    // fill the output fields with look up data
    for (int i = 0, outi=data.inputSize;i<meta.getKeyField().length;outi++, i++){ 

        // try to get the value from voldemort
        String value = data.voldemortClient.getValue(r[data.keyFieldIndex[i]]); 

        // if nothing is there, return the default
        if (value == null){
            outputRow[outi] = data.defaultObjects[i];
        }
        // else convert the value to desired format
        else{
            outputRow[outi] = data.outputRowMeta.getValueMeta(outi).convertData(data.conversionMeta[i], value);
        } 

    } 

    // copy row to possible alternate rowset(s)
    putRow(data.outputRowMeta, outputRow);  

    // Some basic logging
    if (checkFeedback(getLinesRead())) {
        if (log.isBasic()) logBasic("Linenr " + getLinesRead());
    } 

    return true;
}

http://type-exit.org/adventures-with-open-source-bi/2010/06/developing-a-custom-kettle-plugin-looking-up-values-in-voldemort/

kettle processRow,布布扣,bubuko.com

时间: 2024-10-11 23:23:38

kettle processRow的相关文章

Kettle变量和自定义java代码的实例应用

1  kettle.properties参数配置数据源连接和FTP连接 由于测试环境和生产环境中数据库连接FTP等配置会在部署过程中变更,所以预先定义成配置项,在配置文件中修改,这样测试和发布将会变得简单,下面以数据库为例说明这类配置的使用. (1)      首先要找到配置文件,不同的操作系统路径也不一样,本人用win7进行开发,配置文件的路径为"C:\Users\chenpeng\.kettle\kettle.properties",如下: (2)      配置文件中的具体配置如

用 Kettle UDJC 组件实现用户行为组序列

用户登录APP,然后浏览了一些列页面.如果获取用户行为数据时没有对每次登陆分组,那么想知道用户每次登陆的行为数据就有点难了.下面使用 Kettle UDJC 组件给用户行为序列分组添加序列. private String diffListId = ""; private String diffFuncNo = ""; private int fieldSeq = 0; public boolean processRow(StepMetaInterface smi,

关于NLPIR应用在KETTLE中的探索

一:什么是NLPIR? NLPIR汉语分词系统(自然语言处理与信息检索共享平台),主要功能包括中文分词:词性标注:命名实体识别:用户词典功能:支持GBK编码.UTF8编码.BIG5编码.新增微博分词.新词发现与关键词提取:张华平博士先后倾力打造十余年,内核升级10次. 全球用户突破20万,先后获得了2010年钱伟长中文信息处理科学技术奖一等奖,2003年国际SIGHAN分词大赛综合第一名,2002年国内973评测综合第一名. 二:我们使用NLPIR可以做什么? 三:如何把分词算法用到我们的ETL

用Kettle的一套流程完成对整个数据库迁移

需求: 1.你是否遇到了需要将mysql数据库中的所有表与数据迁移到Oracle. 2.你是否还在使用kettle重复的画着:表输入-表输出.创建表,而烦恼. 下面为你实现了一套通用的数据库迁移流程. 技术引导: 实现之初,在kettle提供的例子中找到了一个类似的(samples\jobs\process all tables). 通过相关改造,终于达到目标. 实现过程解剖: 整套流程分为:2个job,4个trans. 使用到的Trans插件:表输入.字段选择.复制记录到结果.从结果获取记录.

Kettle plugin 插件开发

Kettle本身提供了很多组件,多个组件一起构成一个transformation(转换),多个转换一起构成一个job(任务).kettle的组件已经非常丰富,在组件不满足需求时可以在kettle上面开发自己的组件,kettle支持的组件开发如下: Kettle中的插件包含两部分: 一是系统本身就已经实现的功能点,在源码目录src中说明,如kettle-steps.xml: 二是系统之外开发的插件,在plugins目录对应插件目录下的plugins.xml说明,如plugins/steps/S3C

hbase0.96数据导入以及Kettle操作hbase问题

版本: cdh5.0.0+hadoop2.3.0+hbase0.96.1.1+Spoon5.0.1 一.HBase数据导入 HBase数据导入使用org.apache.hadoop.hbase.mapreduce.ImportTsv 的两种方式,一种是直接导入,一种是转换为HFile,然后再次导入. 1. HDFS数据为(部分): [[email protected] data]# hadoop fs -ls /input Found 1 items -rwxrwxrwx 1 hdfs supe

【Kettle】4、SQL SERVER到SQL SERVER数据转换抽取实例

1.系统版本信息 System:Windows旗舰版 Service Pack1 Kettle版本:6.1.0.1-196 JDK版本:1.8.0_72 2.连接数据库 本次实例连接数据库时使用全局变量. 2.1 创建新转换:spoon启动后,点击Ctrl+N创建新转换 2.2 在新转换界面中,右键点击DB连接,系统会弹出[数据库连接]界面. windows系统环境下,可用${}获取变量的内容. 说明: 连接名称:配置数据源使用名称.(必填) 主机名称:数据库主机IP地址,此处演示使用本地IP(

在Linux下部署kettle的Job

关于如何用kettle创建job以及如何部署kettle到linux上,我就不细说了,大家应该都会,下面重点说一下,如何让job执行起来先将创建好的脚本上传到kettle指定目录下面,创建调用job执行的脚本文件, export JAVA_HOME=/usr/java/jre1.6.0_23 export PATH=$JAVA_HOME/bin:$PATH export CLASSPATH=.:$JAVA_HOME/lib/dt.jar:$JAVA_HOME/lib/tools.jar /opt

kettle 安装mysql 驱动

错误连接数据库 [mysql] : org.pentaho.di.core.exception.KettleDatabaseException: Error occurred while trying to connect to the database Driver class 'org.gjt.mm.mysql.Driver' could not be found, make sure the 'MySQL' driver (jar file) is installed.org.gjt.mm