Kettle 调用方式

在使用Kettle执行Job或者转换的时候有多种方式,例如:

一:通过命令行来执行

kitchen.sh来执行指定的Job,例如:

kitchen.bat -file="../work/start.kjb"

通过"Pan.bat"来执行指定的转换,例如:

Pan.bat -file="../work/start.ktr"

二:通过Java API 接口在代码中执行


KettleEnvironment.init();

EnvUtil.environmentInit();

JobMeta jodMeta = new JobMeta("start.kjb",null);

Repository repository = null ;

Job job = new Job(repository,jodMeta);

job.setVariable("key", "value");

job.start();

job.waitUntilFinished();


KettleEnvironment.init();

EnvUtil.environmentInit();

TransMeta transMeta = new TransMeta("start.ktr");

Trans trans = new Trans(transMeta);

trans.setMonitored(true);

trans.setInitializing(true);

trans.setPreparing(true);

trans.setRunning(true);

trans.setSafeModeEnabled(true);

trans.setVariable("key", "value");

trans.execute(null);

trans.waitUntilFinished();

三:远程调用

1,我们通过Kettle的可视化界面配置了许多的Job和转换任务,并保存为了xml格式的数据。

2,初始化环境


/*

* 运行kettle软件的时候:

* the kettle.properties file is located in

*         [home dir of user running kettle]/.kettle/kettle.properties

* On a Mac: /Users/slawo/.kettle/kettle.properties

* On Linux: /home/slawo/.kettle/kettle.properties

* On Windows XP: C:\Documents and Settings\slawo\.kettle\kettle.properties

* On other systems (especially flavors of windows) just locate the users home dir,

* and look into the .kettle subfolder.

*

* 在Java应用中:

* Kettle会在该路径下查找kettle.properteis文件:${user.dir}/.kettle/kettle.properties

* 如何设置了“KETTLE_HOME”环境变量,Kettle将会在该路径下查找kettle.properties文件:

* ${KETTLE_HOME}/.kettle/kettle.properties

*

* 对于Web项目,必须要更改一下用户目录,否则直接初始化第一次要加载很久才会有响应,

* 更改完之后要改回来。

* WEB项目应该是在 WEB-INF目录下,会产生一个 .kettle 的文件夹,

* 里面包含一个kettle.properties 配置文件

*

* 其他变量介绍

* 1,PENTAHO_JAVA_HOME : 指定kettle使用的jre,例如:

* PENTAHO_JAVA_HOME=C:\Program Files\Java\jre7

* 2,PENTAHO_INSTALLED_LICENSE_PATH : 指定包含了licenses的路径,例如:

* SET PENTAHO_INSTALLED_LICENSE_PATH=C:\pentaho\.pentaho\.installedLicenses.xml

* 3,MONETDB_INSTALL_DIR : MonetDB安装路径,例如:

* MONETDB_INSTALL_DIR=C:\PE0A28~1\monetdb

* */

//获得执行类的当前工作目录

String userDir = System.getProperty("user.dir") ;

//用户的主目录

String userHome = System.getProperty("user.home") ;

log.info("user.dir:{}, uer.home:{}",userDir,userHome);

try {

String kettleHome = PathUtil.WEBINF_PATH+"KettleHome" ;

if(!KettleEnvironment.isInitialized()){

//Kettle初始化需要修改相应的配置路径

System.setProperty("KETTLE_HOME",kettleHome);

System.setProperty("user.dir",kettleHome) ;

//运行环境初始化(设置主目录、注册必须的插件等)

KettleEnvironment.init();

EnvUtil.environmentInit();

}

} catch (KettleException e) {

e.printStackTrace();

log.info("--kettle--初始化环境失败:{}",e.getMessage());

}

//Kettle初始化完毕,还原执行类的当前路径

System.setProperty("user.dir",userDir);

3,创建SlaveServer


@Value("${kettleHostName}")

private String kettleHostName = "192.168.30.65";

@Value("${kettlePort:8081}")

private String kettlePort = "8081";

@Value("${kettleUserName:cluster}")

private String kettleUserName = "cluster" ;

@Value("${kettlePassword:cluster}")

private String kettlePassword = "cluster";

private SlaveServer getRemoteServer() {

// 创建工作服务器的连接

if(remoteServer == null){

synchronized(SlaveServerExecutor.class){

if(remoteServer != null){

return remoteServer ;

}

try {

remoteServer = new SlaveServer();

remoteServer.setHostname(kettleHostName);

remoteServer.setPort(kettlePort);

remoteServer.setUsername(kettleUserName);

remoteServer.setPassword(kettlePassword);

return remoteServer;

} catch (Exception e) {

remoteServer = null ;

log.error("创建SlaveServer失败:{}", e.getMessage());

}

return remoteServer;

}

}

return remoteServer;

}

4,执行Job


/**

* @return carteObjectId

* */

public String executeJob(String jobName,Map<String, String> variablesMap) {

SlaveServer slaveServer = getRemoteServer() ;

if(slaveServer == null){

return null ;

}

/*

添加全局参数 start

KettleGlobalVariable 自己创建的用于读取全局参数配置文件,并缓存值

*/

Iterator<Entry<String, String>> iter = KettleGlobalVariable.kettleArgsMap.entrySet().iterator() ;

Entry<String, String> entry ;

while(iter.hasNext()){

entry = iter.next() ;

if(variablesMap.containsKey(entry.getKey())){

continue ;

}

variablesMap.put(entry.getKey(),entry.getValue()) ;

}

//添加全局参数 end

// 资源库元对象

String id= "",name= "",description= "无描述" ;

//Job文件存放路径

String localKettleBaseDirectory = PathUtil.WEBINF_PATH + "KettleHome"  ;

KettleFileRepositoryMeta repinfo = new KettleFileRepositoryMeta(id,name,description,localKettleBaseDirectory );

// 文件形式的资源库

KettleFileRepository rep = new KettleFileRepository();

rep.init(repinfo);

RepositoryDirectoryInterface tree = null;

try {

tree = rep.loadRepositoryDirectoryTree();

} catch (KettleException e) {

log.error("加载文件仓库路径失败,{}",e.getMessage()) ;

return null ;

}

RepositoryDirectoryInterface rd = tree.findDirectory("");

JobMeta jobMeta = null ;

try {

jobMeta = rep.loadJob(jobName, rd, null, null);

} catch (KettleException e) {

log.error("加载job任务失败,jobName:{},error:{}",jobName,e.getMessage());

return null ;

}

JobExecutionConfiguration jobExecutionConfiguration = new JobExecutionConfiguration();

jobExecutionConfiguration.setExecutingLocally(false);

jobExecutionConfiguration.setExecutingRemotely(true);

jobExecutionConfiguration.setLogLevel(LogLevel.BASIC);

jobExecutionConfiguration.setPassingExport(false);

jobExecutionConfiguration.setRemoteServer(slaveServer);

jobExecutionConfiguration.setRepository(rep);

//设置全局参数

jobExecutionConfiguration.setVariables(variablesMap);

String carteObjectId = null ;

try {

carteObjectId = Job.sendToSlaveServer(jobMeta, jobExecutionConfiguration, rep, null);

} catch (KettleException e) {

log.error("发送job到slaveServer失败,{}",e.getMessage());

}finally{

rep.disconnect();

}

return carteObjectId ;

}



Kettle环境服务器

目录结构如下:


--kettle

--data-integration

--Care.bat

--care.sh

-- .... ...

--work

--*.ktr

--*.kjb

执行指令启动服务:

carte.bat localhost 8081 

远程方式调用注意事项:

1,要执行某个Job的时候,只需要传递Job的入口文件即可,其他依赖的文件都应该在Kettle环境服务器的work路径下。

2,当使用carte.bat启动服务的时候,其工作路径其实就是carte.bat所在的路径,当寻找其依赖文件的时候,有可能就会在当前路径下寻找。所以我们在配置“转换文件名”的时候可以这样:

../work/dependency.ktr

3,在可视化界面配置Job的时候,“转换文件名”的路径还可能是这样:

${Internal.Job.Filename.Directory}/dependency.ktr

这本是说在job文件同路径下寻找,但是通过以上方式来执行的时候,会跑到根路径下去寻找,至少我在测试的时候是这样的。

时间: 2024-11-05 21:48:01

Kettle 调用方式的相关文章

线程安全的事件调用方式

通常事件调用方式为 //版本1 public event NewEventHandler NewEvent;protected virtual void OnNewEvent(EventArgs e){ if (NewEvent != null) NewEvent(this, e);} 但这种方式的问题在于,在做NewEvent != null 检测之后,NewEvent事件调用之前,事件取消了注册,即NewEvent重新变成null,此时再进行调用,将会抛出异常 线程安全的做法, //版本2

《C++反编译与逆向分析技术揭秘》之学习笔记03--函数的调用方式

※函数的调用方式 EBP:扩展基址指针寄存器(extended base pointer) 其内存放一个指针,该指针指向系统栈最上面一个栈帧的底部. ESP:(Extended stack pointer)是指针寄存器的一种,用于指向栈的栈顶. _cdecl:C/C++默认的调用方式,调用方平衡栈,不定参数的函数可以试用. 调用方:1.参数压栈.esp-=42.调用函数.3.实现栈平衡.esp+=4 此处的printf也是同样道理0x004010CB.0x004010CC两处压入参数,共8个字节

js实现类似于add(1)(2)(3)调用方式的方法

群里有人说实现类似add(1)(2)(3)调用方式的方法,结果马上有人回答: var add = function(a){ return function(b){ return function(c){ return a+b+c; }; }; }; add(1)(2)(3); //6 没错!那要是add(1)(2)(3)(4) 这样4个调用呢,那这个肯定不适用了. 这种就是类似于执行一个函数返回函数自身值: function add(x) { var sum = x; var tmp = fun

简述自己用过的几种异步调用方式

直接上代码 1.BeginInvoke和EndInvoke方式 private static void BeginInvoke1() { Func<int,string> fun = Todo; for (int i = 0; i < 5; i++) { //fun.BeginInvoke(i,TodoCallBack, fun); /* 异步调用委托BeginInvoke * handler.EndInvoke(x)为执行委托的结果 */ fun.BeginInvoke(i, x =&

同步(Sync)/异步(Async),阻塞(Block)/非阻塞(Unblock)四种调用方式

1. 概念理解        在进行网络编程时,我们常常见到同步(Sync)/异步(Async),阻塞(Block)/非阻塞(Unblock)四种调用方式:   同步/异步主要针对C端: 同步:      所谓同步,就是在c端发出一个功能调用时,在没有得到结果之前,该调用就不返回.也就是必须一件一件事做,等前一件做完了才能做下一件事.   例如普通B/S模式(同步):提交请求->等待服务器处理->处理完毕返回 这个期间客户端浏览器不能干任何事 异步:      异步的概念和同步相对.当c端一个

简单解析三种JAVA调用方式-同步,异步,回调

模块之间有三种调用方式: 1.同步调用 同步调用很简单,就是直接调用方法B,必须等到方法A执行完才会继续执行原程序. 2.异步调用 异步调用,就是在A程序中给程序B一个实现,当B运行的时候,在满足条件的情况下能够调用A程序中的实现.举例说明 public interface Food { void eat(); } public class Cow implements Food { @override void eat() { syso("eat cow"); } } public

Saltstack的API接口与调用方式

saltstack看起来是成为一个大规模自己主动化运维和云计算管理的一个框架,类似于SDK,并非像puppet仅仅成为一个工具.基于良好设计的API和清楚的思路,让salt的二次开发变得非常easy.写非常少的代码就能够将salt跟现有的运维系统结合.saltstack是用python语言实现的,假设对saltstack本身进行二次开发,就必须得会python语言. Saltstack是通过多个独立的模块构成的,这些都能够当做saltstack的api.然后在上层做基础开发能够通过调用这些sal

Webservice 调用方式整理

前一段时间搞webservice,简单的记录了一下几种经常使用的调用方式,供大家參考. Java proxy 1).用过eclipse的创建web service client来完毕 2).在eclipse里面new一个web service client,会提示输入一个wsdl地址. 3).输入地址后直接点完毕,会自己主动生成代码.该方式和手动编写是一样的. 眼下该方式还没通:会咨询一下RD. Wsdl: http://10.81.21.92:8080/MailAdmin/ws/mailBox

java实现WebService 以及客户端不同的调用方式

java 实现WebService 以及不同的调用方式 webservice:    就是应用程序之间跨语言的调用    wwww.webxml.com.cn    1.xml    2.    wsdl: webservice description language web服务描述语言        通过xml格式说明调用的地址方法如何调用,可以看错webservice的说明书        3.soap simple object access protoacl (简单对象访问协议)