java 调度 Kettle 时使用 jndi 连接数据库

本贴最后更新于 2003 天前,其中的信息可能已经时移世异

看了 Kettle 好久了,不得不说,国内资料真的是少啊,又是买书,又是自己翻译官方文档,又是看源码,好歹算是有些眉目了,今天有解决一大难题,在此记录下来,算是为国内 Kettle 圈奉献点力量吧。

Java 调度 Kettle

关于 Java 调度 Kettle,在网上还是能搜到一些内容的,其实还是比较简单的,Kettle 在 Spoon 中设计以后,会生成.ktr 和.kjb 文件,分别对应 Kettle 的转换和作业。Java 正是通过 Kettle 提供的 API 调用这些文件的。

准备工作

在通过 Java 调用 Kettle 时,需要下面几个 jar 包:

<dependency>
		  <groupId>pentaho-kettle</groupId>
		  <artifactId>kettle-engine</artifactId>
		  <version>${kettle.version}</version>
	  </dependency>
	  <dependency>
		  <groupId>pentaho</groupId>
		  <artifactId>metastore</artifactId>
		  <version>${kettle.version}</version>
	  </dependency>
	  <dependency>
		  <groupId>pentaho-kettle</groupId>
		  <artifactId>kettle-core</artifactId>
		  <version>${kettle.version}</version>
		  <exclusions>
			  <exclusion>
				  <groupId>jug-lgpl</groupId>
				  <artifactId>jug-lgpl</artifactId>
			  </exclusion>
			  <exclusion>
				  <groupId>secondstring</groupId>
				  <artifactId>secondstring</artifactId>
			  </exclusion>
			  <exclusion>
				  <artifactId>xercesImpl</artifactId>
				  <groupId>xerces</groupId>
			  </exclusion>
			  <exclusion>
				  <groupId>org.apache.xmlgraphics</groupId>
				  <artifactId>batik-js</artifactId>
			  </exclusion>
		  </exclusions>
	  </dependency>

这些 jar 包在 maven 仓库是没有的,所以还需要配上 kettle 的 maven 仓库地址:

 <!-- kettle中央仓库 -->
    <repositories>
        <repository>
            <id>pentaho-public</id>
            <name>Pentaho Public</name>
            <url>http://nexus.pentaho.org/content/groups/omni</url>
            <releases>
                <enabled>true</enabled>
                <updatePolicy>always</updatePolicy>
            </releases>
            <snapshots>
                <enabled>true</enabled>
                <updatePolicy>always</updatePolicy>
            </snapshots>
        </repository>
    </repositories>

配置好 maven 依赖以后,就可以通过 Java 来操作 kettle 的.kjb 和.ktr 文件了。

连接资源库

在 kettle 中,有一个很重要的概念就是资源库,资源库通俗理解就是管理 kettle 元数据,在多人开发的时候尤为重要,可以把资源库放在公共的地方供大家访问,目前 kettle 支持三种资源库:

  • 数据库资源库
  • 文件资源库
  • Pentaho 资源库

本文主要记录 Java 调用时使用 jndi 连接数据库的问题,Kettle 概念就不多赘述了如果需要了解,可以查看本人呢还未翻译完全的非官方文档.

在 Java 调用时,首先就是连接资源库,本人为了方便 svn 版本控制,使用的是文件资源库。

获取资源库

以下代码仅仅是获取文件资源库的:

    /**
     *
     * @param basePath 主路径
     * @return KettleFileRepository
     */
    public static KettleFileRepository getFileRepository(String basePath){

        KettleFileRepositoryMeta meta = new KettleFileRepositoryMeta();
        URL url = KettleUtils.class.getClassLoader().getResource(basePath);
        String baseDirectory = url == null ? null : url.getPath();
        meta.setBaseDirectory(baseDirectory);
        KettleFileRepository kettleFileRepository = new KettleFileRepository();
        kettleFileRepository.init(meta);
        return kettleFileRepository;
    }

其中 ,通过

URL url = KettleUtils.class.getClassLoader().getResource(basePath);
        String baseDirectory = url == null ? null : url.getPath();

获取到资源库路径,我的资源库是直接放在项目目录的,也可以使用绝对路径。

连接资源库

在连接资源库进行操作之前,需要先进行初始化,而想要使用 jndi 连接数据库,也就需要在这个地方做文章了,首先,kettle 初始化的代码是
KettleEnvironment.init(true); 就这么简单一句,即可,kettle 会帮你完成初始化。
在这个 init 方法,需要一个 boolean 类型的参数,这个参数就是告诉 kettle 需要初始化 jndi 数据库配置文件。
首先看以下这个方法的源码:

 /**
   * Initializes the Kettle environment. This method performs the following operations:
   * <p/>
   * - Creates a Kettle "home" directory if it does not already exist - Reads in the kettle.properties file -
   * Initializes the logging back-end - Sets the console log level to debug - If specified by parameter, configures
   * Simple JNDI - Registers the native types and the plugins for the various plugin types - Reads the list of variables
   * - Initializes the Lifecycle listeners
   *
   * @param simpleJndi true to configure Simple JNDI, false otherwise
   * @throws KettleException Any errors that occur during initialization will throw a KettleException.
   */
  public static void init( boolean simpleJndi ) throws KettleException {
    init( Arrays.asList(
      RowDistributionPluginType.getInstance(),
      StepPluginType.getInstance(),
      StepDialogFragmentType.getInstance(),
      PartitionerPluginType.getInstance(),
      JobEntryPluginType.getInstance(),
      JobEntryDialogFragmentType.getInstance(),
      LogTablePluginType.getInstance(),
      RepositoryPluginType.getInstance(),
      LifecyclePluginType.getInstance(),
      KettleLifecyclePluginType.getInstance(),
      ImportRulePluginType.getInstance(),
      CartePluginType.getInstance(),
      CompressionPluginType.getInstance(),
      AuthenticationProviderPluginType.getInstance(),
      AuthenticationConsumerPluginType.getInstance(),
      EnginePluginType.getInstance()
    ), simpleJndi );
  }

可以看到,这个参数被传递到下个方法,我们继续往下看:
下面这个方法代码有点多,我只取主要的部分:

// Configure Simple JNDI when we run in stand-alone mode (spoon, pan, kitchen, carte, ... NOT on the platform
        //
        if ( simpleJndi ) {
          JndiUtil.initJNDI();
        }

可以看到,在代码里,判断了前面那个参数的值,如果是 true,就调用 JndiUtil.initJNDI(); 进行 jndi 的初始化,JndiUtil 类的代码:

public class JndiUtil {

  public static void initJNDI() throws KettleException {
    String path = Const.JNDI_DIRECTORY;

    if ( path == null || path.equals( "" ) ) {
      try {
        File file = new File( "simple-jndi" );
        path = file.getCanonicalPath();
      } catch ( Exception e ) {
        throw new KettleException( "Error initializing JNDI", e );
      }
      Const.JNDI_DIRECTORY = path;
    }

    System.setProperty( "java.naming.factory.initial", "org.osjava.sj.SimpleContextFactory" );
    System.setProperty( "org.osjava.sj.root", path );
    System.setProperty( "org.osjava.sj.delimiter", "/" );
  }

}

这里就是初始化 jndi 的部分了,这里 String path = Const.JNDI_DIRECTORY; 是 kettle 默认的 jndi 配置文件的位置,所以我们只需要将这个 值 修改了,就可以让 kettle 加载我们的配置文件,也就可以使用 jndi 数据源了。

于是我进行了尝试,直接在初始化之前,给 Const.JNDI_DIRECTORY 赋值,证明猜想确实是对的。

URL url = KettleUtils.class.getClassLoader().getResource(JNDI_PATH);
Const.JNDI_DIRECTORY = url == null ? null : url.getPath();

JNDI_PATH 是我 jndi 配置文件的目录名。

下面,将 jndi 连接名作为变量传给 kettle,就可以实现通过 通过变量连接 jndi 数据库了。

job.setVariable("jndiName","ODS");

之后,在 kettle 中,只需要获取变量名对应的连接即可:
image.png

这样即可实现多个数据源通过变量切换。

在获取 到资源库对象,并且已经 初始化 kettle 以后,直接通过 connect 方法就可以进行连接了,文件资源库默认是没有密码的,如果是数据库资源库,默认的账户和密码是 admin/admin

//连接资源库
repository.connect(null,null);

下面,就可以开心的调用 kettle 文件了:

获取作业

 RepositoryDirectoryInterface directoryInterface = repository.loadRepositoryDirectoryTree();
            JobMeta jobMeta = repository.loadJob(jobName,directoryInterface,null,null);

设置变量并执行 kettle 作业

Job job = new Job(repository,jobMeta);
            job.setVariable("jndiName","ODS");
            //接口参数设置为变量
            for(Map.Entry<String,String> entry:params.entrySet()){
                job.setVariable(entry.getKey(),entry.getValue());
                System.out.println("正在设置变量,key:"  + entry.getKey() + ",值为:" + entry.getValue());
            }
            //日志级别
            job.setLogLevel(LogLevel.ROWLEVEL);
            //执行作业

现在主要做大数据,kettle 只作为入门的工具,后面可能还会设计 Hadoop、spark 之类的,欢迎一起沟通交流。

  • Java

    Java 是一种可以撰写跨平台应用软件的面向对象的程序设计语言,是由 Sun Microsystems 公司于 1995 年 5 月推出的。Java 技术具有卓越的通用性、高效性、平台移植性和安全性。

    3190 引用 • 8214 回帖 • 1 关注
  • Kettle

    Kettle 是一款国外开源的 ETL 工具,纯 java 编写,可以在 Windows、Linux、Unix 上运行,数据抽取高效稳定。
    Kettle 中文名称叫水壶,该项目的主程序员 MATT 希望把各种数据放到一个壶里,然后以一种指定的格式流出。

    7 引用 • 13 回帖 • 1 关注
  • JNDI
    3 引用 • 7 回帖

相关帖子

欢迎来到这里!

我们正在构建一个小众社区,大家在这里相互信任,以平等 • 自由 • 奔放的价值观进行分享交流。最终,希望大家能够找到与自己志同道合的伙伴,共同成长。

注册 关于
请输入回帖内容 ...
  • EricTao2

    赞一个!并且提一嘴,Java 可以直接写代码动态生成 Kettle 作业执行的,我们使用的是这种。当初也是资料找不到没使用你这种方法。

  • 其他回帖
  • someone


  • someone 1 评论

    ppp

    someone9891
  • someone
    作者

    附上全部的核心调用代码:

     /**
         *
         * @param basePath 主路径
         * @return KettleFileRepository
         */
        public static KettleFileRepository getFileRepository(String basePath){
    
            KettleFileRepositoryMeta meta = new KettleFileRepositoryMeta();
            URL url = KettleUtils.class.getClassLoader().getResource(basePath);
            String baseDirectory = url == null ? null : url.getPath();
            meta.setBaseDirectory(baseDirectory);
            KettleFileRepository kettleFileRepository = new KettleFileRepository();
            kettleFileRepository.init(meta);
            return kettleFileRepository;
        }
    
        /**
         * 执行作业
         * @param params 参数
         * @param repository 仓库
         * @param jobName 作业名
         */
        public static void executeJob(KettleParams params, KettleFileRepository repository , String jobName){
            try {
    
                URL url = KettleUtils.class.getClassLoader().getResource(JNDI_PATH);
    
                Const.JNDI_DIRECTORY = url == null ? null : url.getPath();
    
                KettleEnvironment.init(true);
                //连接资源库
                repository.connect(null,null);
                RepositoryDirectoryInterface directoryInterface = repository.loadRepositoryDirectoryTree();
                JobMeta jobMeta = repository.loadJob(jobName,directoryInterface,null,null);
    
                Job job = new Job(repository,jobMeta);
                job.setVariable("jndiName","ODS");
                //接口参数设置为变量
                for(Map.Entry<String,String> entry:params.entrySet()){
                    job.setVariable(entry.getKey(),entry.getValue());
                    System.out.println("正在设置变量,key:"  + entry.getKey() + ",值为:" + entry.getValue());
                }
                //日志级别
                job.setLogLevel(LogLevel.ROWLEVEL);
                //执行作业
                job.run();
            }catch (KettleException ke){
                logger.error("执行Kettle作业发生错误:",ke);
            }
        }
    

  • 查看全部回帖