天天看點

java kettle_Kettle轉換中的Java代碼步驟應用入門與實戰

編輯推薦:

本文來自于個人微信公衆号,本文通過JMS實戰,示範了如何通過Java代碼來擴充Kettle的功能,希望對您的學習有所幫助。

一、問題背景

在使用Kettle的過程中,有可能遇到現有步驟無法滿足需求的情況。解決此類問題,有諸如購買第三方插件、開發插件、自定義Java類等辦法。最後一種辦法因其代價小且門檻較低而成為最為常用的定制方法。本文将解釋Java代碼步驟的原理,并通過一個實際案例,快速掌握相關入門知識。

二、原理剖析

Java代碼步驟,位于Kettle轉換的核心對象/腳本類别中,屬于典型的需要程式設計基礎才能掌控的步驟類型。而Java代碼步驟,适用于熟悉Java語言的開發人員,用好這個步驟,需要對類、接口、多線程等語言相關知識有所掌握,并且需要對Kettle的基礎架構有所了解。Java語言的基礎知識不在本文讨論範圍,下面将着重對Kettle架構的核心部分進行解釋。

Kettle轉換的執行,有以下三個核心的生命周期節點:

1、初始化

Kettle轉換在執行前,會有一個各步驟的初始化動作,為步驟執行前的準備工作創造機會。為提高初始化的性能,Kettle為每個步驟啟用一個初始化線程,進而并行完成所有步驟的初始化。初始化的主要内容就是調用一次步驟的以下方法:

public boolean init( StepMetaInterface meta, StepDataInterfacedata)

此方法包含兩個參數。其中,meta為中繼資料,data為資料。如果傳回true,那麼代表初始化成功,否則代表初始化失敗。任何一個步驟初始化失敗,都會導緻整個轉換停止執行(在停止前,會調用每一個轉換的資源釋放方法dispose)。

2、執行

執行階段是每一個步驟實作特定價值的時候。為提高效率,Kettle為每一個步驟單獨啟動一個工作線程來執行任務。Java程式員都了解,線程的核心代碼是覆寫run方法。為簡化起見,我将不重要的代碼删除,得到工作線程run方法核心代碼:

java kettle_Kettle轉換中的Java代碼步驟應用入門與實戰

可以看出,線程一直在執行步驟的processRow方法,直到出現以下情況之一:

· processRow方法傳回false

· isStopped方法傳回true

· processRow方法執行過程中出現異常,

其中,第一種情況代表工作已經正常完成;第二種情況,代表步驟被強制停止;第三種情況,代表執行過程中出現錯誤,Kettle将調用stopAll方法,進而導緻整個轉換的所有工作線程停止執行。

執行方法的聲明如下:

public boolean processRow( StepMetaInterface meta,StepDataInterfacedata

) throws KettleException;

每一個步驟,都會在processRow方法中各顯神通。一般的過程是,從輸入行集中拿出一行,進行特定處理,然後将新的行放入輸出行集中。從輸入行集中取資料可以調用getRow方法。如果getRow方法傳回值不為null,則步驟應将該行資料進行處理,并調用putRow方法将處理結果存入輸出行集,然後傳回true,以繼續為下一行輸入資料處理提供機會。如果getRow方法傳回null,代表輸入行集已經處理完畢,這時可以調用setOutputDone,辨別本步驟執行完畢,并傳回false,以結束本工作線程的執行。

3、資源釋放

從上述工作線程的核心代碼可以看出,不管工作線程是正常執行完畢還是異常執行完畢,最終會調用dispose方法。該方法聲明如下:

public void dispose( StepMetaInterface meta, StepDataInterfacedata);

步驟應該在需要時覆寫此方法,并釋放相關資源。

了解上述原理後,撰寫Java類步驟中的代碼時将胸有成竹。綜上所述,一般情況下重寫processRow方法即可滿足需求,如果用到了一些重量級的資源,最好在init方法中初始化,并在dispose方法中釋放。

由于Kettle使用Janino架構為自定義Java轉換步驟類動态定義了類名,并指定父類為TransformClassBase,是以在撰寫代碼時,隻需要提供類的内容即可,無需class聲明。

既然自動建立了父類,那麼父類的成員、方法都可以在代碼中重用。父類常用的成員包括以下三個執行個體:

parent:代表容器對象

meta:代表容器中繼資料對象

data:代表容器資料對象

常用的方法包括:

getRow:從輸入行集中取一行資料

putRow:存銀行資料到輸出行集

stopAll:停止所有工作線程

setOutputDone:标記本步驟工作完成

logBasic:輸出基本日志

logError:輸出錯誤日志

getInputRowMeta:得到輸入行的中繼資料

createOutputRow:建立一個輸出行資料

其實,常用的方法(如下圖1所示),基本上都在步驟屬性對話框左側Code Snippits中。一般情況下,可以輕按兩下其中的Main節點,從processRow方法的重寫開始,需要其他代碼時,在左側找到對應代碼塊,輕按兩下即可加入。

java kettle_Kettle轉換中的Java代碼步驟應用入門與實戰

三、案例分享

本文使用一個Kettle內建JMS的案例來進行實戰演練。假設需要兩個轉換:一個轉換名為Send,實作從文本檔案輸入流讀取資料,并發送到ActiveMQ的隊列;另外一個轉換名為Receive,實作從隊列讀取資料,并發送到文本檔案輸出流。兩個轉換截圖如下:

java kettle_Kettle轉換中的Java代碼步驟應用入門與實戰

由于兩個轉換中用到的文本檔案輸入、輸出都非常簡單,這裡隻做簡單描述。Send轉換中,S01讀取本地文本檔案,包含兩個String類型的字段ID、MENU_NAME。Receive轉換中,S02輸出檔案到轉換所在目錄,僅包含一個名為MENU_NAME的String類型字段。

下文着重描述兩個Java代碼步驟。第一個步驟是Send轉換中的S02,其主要代碼注釋如下:

java kettle_Kettle轉換中的Java代碼步驟應用入門與實戰

第二個步驟是Receive轉換中的S01步驟。主要代碼注釋如下:

java kettle_Kettle轉換中的Java代碼步驟應用入門與實戰

注意,由于本文使用了ActiveMQ作為JMS伺服器,是以為保證執行個體能夠正常運作,需要自行下載下傳伺服器安裝程式,并将對應jar檔案拷貝到Kettle的lib目錄下(本例使用activemq-all-5.8.0.jar)。代碼中,需要的import指令如下:

import java.util.

* ;

import javax.jms.Connection;

import javax.jms.ConnectionFactory;

import javax.jms.DeliveryMode;

import javax.jms.Destination;

import javax.jms.MapMessage;

import javax.jms.MessageProducer;

import javax.jms.Session;

import org.apache.activemq.ActiveMQConnection;

import org.apache.activemq.ActiveMQConnectionFactory;

四、總結

本文在具備程式員背景知識的資料工程師在運用Kettle進行定制開發時,可以參考。