加入收藏 | 设为首页 | 会员中心 | 我要投稿 瑞安网 (https://www.ruian888.cn/)- 科技、建站、经验、云计算、5G、大数据,站长网!
当前位置: 首页 > 大数据 > 正文

开源ETL工具kettle系列之在应用程序中集成

发布时间:2021-02-07 13:24:10 所属栏目:大数据 来源:网络整理
导读:摘要 本文主要讨论如何在你自己的Java应用程序中集成Kettle 集成 如果你需要在自己的Java应用程序中集成Kettle,一般来说有两种应用需求,一种是通过纯设计器来设计ETL转换任务,然后保存成某种格式,比如xml或者在数据库中都可以,然后自己调用程序解析这个

执行Job任务之前还是会读取Job任务的描述文件,然后把这个描述文件(kettle的 .ktr文件)变成一个xml文档的dom :

org.w3c.dom.Document doc = XmlW3CHelper.getDomFromString(jobXmlStr);

之后也是初始化对应的元数据对象JobMeta
jobMeta = new JobMeta(logWriter,doc.getFirstChild(),repository);

得到了jobMeta 之后就可以执行这个Job了,这里跟trans是一样的。
job = new Job(logWriter,StepLoader.getInstance(),jobMeta);
由于Job一般都没有什么返回值,所以Job不需要初始化它下面的对象,直接开始运行就可以了

job.start();
job.waitUntilFinished(5000000);

连接资源库

连接资源库使用的是connectToRepository()方法,先取得RepositoriesMeta对象,然后根据你在setting.xml文件里面定义的repository的名字来连接对应的repository.理论上来说我们一般都只使用一个 repository,但如果在产品中需要使用多个repository的话,你需要自己配置多个repository的名字和对应的用户名和密码。只列出几行关键代码,

repositoriesMeta = new RepositoriesMeta(logWriter);
//从$HOME/.kettle/repositories.xml 读数据.
repositoriesMeta.readData(); 
repositoryMeta = repositoriesMeta.findRepository(repositoryName);
repository = new Repository(logWriter,repositoryMeta,userInfo);
userInfo = new UserInfo(repository,username,password);

从资源库读取Trans
连接到资源库之后自然是想办法读取数据库的表,把里面的记录转换成为Trans 对象,使用的是loadTransformFromRepository,这个方法的函数原型需要解释一下:

TransMetaloadTransformFromRepository(String directoryName,String transformationName,Repository repository,LogWriter logWriter)

第一个参数String directoryName 代表是你储存转换的目录,当你使用kettle 图形界面的时候,点击repository菜单的explorer repository,你会发现你所有的东西都是存储在一个虚拟的类似与目录结构的地方,其中包括database connections,transformations,job,users 等,所以你需要的是指定你连接的目录位置,你也可以在目录里面再创建目录。
String transformationName 自然指的就是你转换的名字.
Repository repository 指的是你连接的资源库。
LogWriter logWriter 指定你的日志输出,这个log 指的是你kettle 转换的日志输出,不是应用程序本身的输出。
读取TransMeta的步骤也相对比较简单

repositoryDirectory=repository.getDirectoryTree().findDirectory(directoryName);
transMeta = new TransMeta(repository,transformationName,repositoryDirectory);

从资源库读取Job
从资源库读取Job跟Trans 的步骤基本是一样的,同样需要指定你存储Job的目录位置.

JobMeta loadJobFromRepository(String directoryName,String jobName,LogWriter logWriter)

读取结果集
一般Job都是不会返回任何结果集的,大部分Trans也不会返回结果集,应为结果集一般都会直接从一个数据库到另一个数据库了,但是如果你需要返回转换的结果集,那么这一小结将会向你解释如何从一个Trans里面读取这些结果集
首先,你需要一个容纳Result的容器,就是类似与JDBC里面的resultSet,resultSet当然会有一个resultSetMetadata跟它相关联,在本文所举的实例中,使用的是pentaho私有的memoryResultSet,
你可以不用关心它的细节,并且它的类型正如它的名字一样是存在与Memory的,所以它不能被持久化,这个里面储存的是一个二维的Object数组,里面的数据就是从kettle转化之后来的。
要从kettle的转换中读取结果集,要实现RowListener 接口,Row 是kettle里面表示一行数据的一个类,RowListener 自然是指在转换数据转换的时候发生的事件,它有三个方法需要实现,
void rowReadEvent(Row)
void rowWrittenEvent(Row)
void errorRowWrittenEvent(Row)
分别对应读数据时的事件,写数据事的时间,出错时的时间,我们需要取得结果集,所以只需要实现rowWrittenEvent(Row)就可以了,Row对象是通过TransMeta取得的,
Row row = transMeta.getStepFields(stepName);
下面给出具体实现取Row转换成resultSet的代码:

Object pentahoRow[] = new Object[results.getColumnCount()];
    for (int columnNo = 0; columnNo < results.getColumnCount(); columnNo++) {
        Value value = row.getValue(columnNo);
        switch (value.getType()) {
            case Value.VALUE_TYPE_BIGNUMBER:
            pentahoRow[columnNo] = value.getBigNumber();
            break;
    ........
results.addRow(pentahoRow);

(编辑:瑞安网)

【声明】本站内容均来自网络,其相关言论仅代表作者个人观点,不代表本站立场。若无意侵犯到您的权利,请及时与联系站长删除相关内容!

热点阅读