在使用Kettle执行Job或者转换的时候有多种方式,例如:
一:通过命令行来执行
kitchen.sh来执行指定的Job,例如:
kitchen.bat -file="../work/start.kjb" |
通过"Pan.bat"来执行指定的转换,例如:
Pan.bat -file="../work/start.ktr" |
二:通过Java API 接口在代码中执行
KettleEnvironment.init(); EnvUtil.environmentInit(); JobMeta jodMeta = new JobMeta("start.kjb",null); Repository repository = null ; Job job = new Job(repository,jodMeta); job.setVariable("key", "value"); job.start(); job.waitUntilFinished(); |
KettleEnvironment.init(); EnvUtil.environmentInit(); TransMeta transMeta = new TransMeta("start.ktr"); Trans trans = new Trans(transMeta); trans.setMonitored(true); trans.setInitializing(true); trans.setPreparing(true); trans.setRunning(true); trans.setSafeModeEnabled(true); trans.setVariable("key", "value"); trans.execute(null); trans.waitUntilFinished(); |
三:远程调用
1,我们通过Kettle的可视化界面配置了许多的Job和转换任务,并保存为了xml格式的数据。
2,初始化环境
/* * 运行kettle软件的时候: * the kettle.properties file is located in * [home dir of user running kettle]/.kettle/kettle.properties * On a Mac: /Users/slawo/.kettle/kettle.properties * On Linux: /home/slawo/.kettle/kettle.properties * On Windows XP: C:\Documents and Settings\slawo\.kettle\kettle.properties * On other systems (especially flavors of windows) just locate the users home dir, * and look into the .kettle subfolder. * * 在Java应用中: * Kettle会在该路径下查找kettle.properteis文件:${user.dir}/.kettle/kettle.properties * 如何设置了“KETTLE_HOME”环境变量,Kettle将会在该路径下查找kettle.properties文件: * ${KETTLE_HOME}/.kettle/kettle.properties * * 对于Web项目,必须要更改一下用户目录,否则直接初始化第一次要加载很久才会有响应, * 更改完之后要改回来。 * WEB项目应该是在 WEB-INF目录下,会产生一个 .kettle 的文件夹, * 里面包含一个kettle.properties 配置文件 * * 其他变量介绍 * 1,PENTAHO_JAVA_HOME : 指定kettle使用的jre,例如: * PENTAHO_JAVA_HOME=C:\Program Files\Java\jre7 * 2,PENTAHO_INSTALLED_LICENSE_PATH : 指定包含了licenses的路径,例如: * SET PENTAHO_INSTALLED_LICENSE_PATH=C:\pentaho\.pentaho\.installedLicenses.xml * 3,MONETDB_INSTALL_DIR : MonetDB安装路径,例如: * MONETDB_INSTALL_DIR=C:\PE0A28~1\monetdb * */ //获得执行类的当前工作目录 String userDir = System.getProperty("user.dir") ; //用户的主目录 String userHome = System.getProperty("user.home") ; log.info("user.dir:{}, uer.home:{}",userDir,userHome); try { String kettleHome = PathUtil.WEBINF_PATH+"KettleHome" ; if(!KettleEnvironment.isInitialized()){ //Kettle初始化需要修改相应的配置路径 System.setProperty("KETTLE_HOME",kettleHome); System.setProperty("user.dir",kettleHome) ; //运行环境初始化(设置主目录、注册必须的插件等) KettleEnvironment.init(); EnvUtil.environmentInit(); } } catch (KettleException e) { e.printStackTrace(); log.info("--kettle--初始化环境失败:{}",e.getMessage()); } //Kettle初始化完毕,还原执行类的当前路径 System.setProperty("user.dir",userDir); |
3,创建SlaveServer
@Value("${kettleHostName}") private String kettleHostName = "192.168.30.65"; @Value("${kettlePort:8081}") private String kettlePort = "8081"; @Value("${kettleUserName:cluster}") private String kettleUserName = "cluster" ; @Value("${kettlePassword:cluster}") private String kettlePassword = "cluster"; private SlaveServer getRemoteServer() { // 创建工作服务器的连接 if(remoteServer == null){ synchronized(SlaveServerExecutor.class){ if(remoteServer != null){ return remoteServer ; } try { remoteServer = new SlaveServer(); remoteServer.setHostname(kettleHostName); remoteServer.setPort(kettlePort); remoteServer.setUsername(kettleUserName); remoteServer.setPassword(kettlePassword); return remoteServer; } catch (Exception e) { remoteServer = null ; log.error("创建SlaveServer失败:{}", e.getMessage()); } return remoteServer; } } return remoteServer; } |
4,执行Job
/** * @return carteObjectId * */ public String executeJob(String jobName,Map<String, String> variablesMap) { SlaveServer slaveServer = getRemoteServer() ; if(slaveServer == null){ return null ; } /* 添加全局参数 start KettleGlobalVariable 自己创建的用于读取全局参数配置文件,并缓存值 */ Iterator<Entry<String, String>> iter = KettleGlobalVariable.kettleArgsMap.entrySet().iterator() ; Entry<String, String> entry ; while(iter.hasNext()){ entry = iter.next() ; if(variablesMap.containsKey(entry.getKey())){ continue ; } variablesMap.put(entry.getKey(),entry.getValue()) ; } //添加全局参数 end // 资源库元对象 String id= "",name= "",description= "无描述" ; //Job文件存放路径 String localKettleBaseDirectory = PathUtil.WEBINF_PATH + "KettleHome" ; KettleFileRepositoryMeta repinfo = new KettleFileRepositoryMeta(id,name,description,localKettleBaseDirectory ); // 文件形式的资源库 KettleFileRepository rep = new KettleFileRepository(); rep.init(repinfo); RepositoryDirectoryInterface tree = null; try { tree = rep.loadRepositoryDirectoryTree(); } catch (KettleException e) { log.error("加载文件仓库路径失败,{}",e.getMessage()) ; return null ; } RepositoryDirectoryInterface rd = tree.findDirectory(""); JobMeta jobMeta = null ; try { jobMeta = rep.loadJob(jobName, rd, null, null); } catch (KettleException e) { log.error("加载job任务失败,jobName:{},error:{}",jobName,e.getMessage()); return null ; } JobExecutionConfiguration jobExecutionConfiguration = new JobExecutionConfiguration(); jobExecutionConfiguration.setExecutingLocally(false); jobExecutionConfiguration.setExecutingRemotely(true); jobExecutionConfiguration.setLogLevel(LogLevel.BASIC); jobExecutionConfiguration.setPassingExport(false); jobExecutionConfiguration.setRemoteServer(slaveServer); jobExecutionConfiguration.setRepository(rep); //设置全局参数 jobExecutionConfiguration.setVariables(variablesMap); String carteObjectId = null ; try { carteObjectId = Job.sendToSlaveServer(jobMeta, jobExecutionConfiguration, rep, null); } catch (KettleException e) { log.error("发送job到slaveServer失败,{}",e.getMessage()); }finally{ rep.disconnect(); } return carteObjectId ; } |
Kettle环境服务器
目录结构如下:
--kettle --data-integration --Care.bat --care.sh -- .... ... --work --*.ktr --*.kjb |
执行指令启动服务:
carte.bat localhost 8081 |
远程方式调用注意事项:
1,要执行某个Job的时候,只需要传递Job的入口文件即可,其他依赖的文件都应该在Kettle环境服务器的work路径下。
2,当使用carte.bat启动服务的时候,其工作路径其实就是carte.bat所在的路径,当寻找其依赖文件的时候,有可能就会在当前路径下寻找。所以我们在配置“转换文件名”的时候可以这样:
../work/dependency.ktr |
3,在可视化界面配置Job的时候,“转换文件名”的路径还可能是这样:
${Internal.Job.Filename.Directory}/dependency.ktr |
这本是说在job文件同路径下寻找,但是通过以上方式来执行的时候,会跑到根路径下去寻找,至少我在测试的时候是这样的。