天天看點

Nifi之ExecuteScript使用方法 (二)

本文是一系列文章中的第二篇,描述了如何使用ExecuteScript完成某些任務的各種"方法".

本文介紹了如何使用NiFi處理器ExecuteScript完成某些任務的各種"recipes",以及 Groovy,Jython,Javascript(Nashorn)和JRuby中給出的示例。這是本系列的第2部分,我将讨論讀取和寫入流檔案内容以及錯誤處理。

第1部分- NiFi API和FlowFiles簡介

  • 從傳入隊列擷取流檔案
  • 建立新的流檔案
  • 使用流檔案屬性
  • 傳輸流檔案
  • 記錄

第2部分 - FlowFile I / O和錯誤處理

  • 從流檔案中讀取
  • 寫入流檔案
  • 讀取和寫入流檔案
  • 錯誤處理

第3部分- 進階功能

  • 使用動态屬性
  • 添加子產品
  • 國家管理
  • 通路控制器服務

FlowFile I / O簡介

NiFi中的流檔案由兩個主要元件構成,即屬性和内容。屬性是關于内容/流檔案的中繼資料,我們在本系列的第一部分看到了如何使用ExecuteScript來操作它們.

流檔案的内容本質上隻是一個位元組集合,沒有固有的結構,模式,格式等。各種NiFi處理器假設傳入的流檔案具有特定的模式/格式(或者從屬性中确定它作為“mime.type”或以其他方式推斷它)。然後,這些處理器可以基于檔案确實具有該格式的假設來對内容起作用(并且如果它們不這樣,則經常轉移到“Failure”關系隊列)。處理器也可以輸出指定格式的流檔案,這在處理器中有描述。NiFi文檔。

流檔案内容的輸入和輸出(I/O)通過ProcessSession API提供(有關更多資訊,請參閱第1部分)同時對外開放了"session 變量"以供調用。我們可以通過調用session.read()獲得FlowFile的輸入流對象或調用session.write()獲得FlowFile的輸出流對象.這兩個方法會調用相應的回調接口和回調函數,并傳回InputStream和/或OutputStream引用以供回調使用。有三個主要的回調接口,每個接口都有自己的用例:

InputStreamCallback

session.read(flowFile,inputStreamCallback)方法會調用輸入流回調接口,該方法會傳回一個從FlowFile中讀取到資料的輸入流.這個回調函數隻有一個方法

void process(InputStream in) throws IOException
           

這個接口提供了一個managed輸入流供調用者使用,這個流會自動的打開和關閉,也支援手動關閉.如果您隻是從特定的流檔案中讀取資料,而不是将資料寫回該檔案,那麼您将使用這種辦法.

比如我們想做的事情是接收到一條流資料,将其切分成多條輸出出去.這是一個很常用也很簡單的場景,此時我們就可以使用上面的辦法.

OutputStreamCallback

輸出流回調接口在調用session.write(FlowFile,outputStreamCallback)的時候被調用,它會傳回一個向FlowFile中寫資料的OutputSteam.這個接口也隻有一個方法

void process(OutputStream out) throws IOException
           

此接口提供輸出流以供使用。盡管可以手動關閉流,但輸出流會自動打開和關閉 - 如果包含這些流的任何流打開應該清除的資源,這一點就變得非常重要了。

使用方法 : 比如ExecuteScript将從内部或外部檔案生成資料,但不生成流檔案。然後你将使用session.create()建立一個新的FlowFile,再使用session.write(flowFile,outputStreamCallback)來插入内容。

StreamCallback

當調用session.write(FlowFile,StreamCallback)的時候,會傳回InputStream和OutputStream,從中讀寫流檔案的内容.接口方法如下:

void process (InputStream in ,OutputStream out )抛出IOException 
           

同上邊的接口相同,改接口也會自動關閉流也支援手動關閉.

使用場景 : 當您想要處理傳入的流檔案并用新的東西覆寫其内容時,例如 EncryptContent處理器。

由于這些回調是Java對象,是以腳本必須建立并将其傳遞給會話方法,這些recipes将針對各種腳本語言進行說明。其他讀取和寫入流檔案的方法:

  • 使用session.read(flowFile)傳回一個InputStream。這減輕了對InputStreamCallback的需求,而是傳回可以讀取的InputStream。作為交換,您必須手動管理(關閉,例如)InputStream。
  • 使用session.importFrom(inputStream,flowFile)從InputStream寫入FlowFile。這取代了傳遞了OutputStreamCallback的session.write()的需要。

代碼樣例

應用場景1 :使用回調讀取傳入流檔案的内容,您有連接配接到ExecuteScript的傳入連接配接,并希望從隊列中檢索流檔案的内容以進行處理。

方案:使用來自session對象的方法read(flowFile,inputStreamCallback)。read()方法需要傳入一個InputStreamCallback對象。請注意,因為InputStreamCallback是一個對象,是以預設情況下内容隻對該對象可見。如果需要使用read()方法之外的資料,請使用全局範圍的變量。這些示例将傳入流檔案的完整内容存儲到String中(使用Apache Commons的IOUtils類)。注意:對于大流量檔案,這不是最好的技術; 相反,您應該隻讀取您需要的資料,并根據需要進行處理。對于像SplitText這樣的東西,你可以一次讀取一行并在InputStreamCallback中處理它,或者使用前面提到的session.read(flowFile)方法來獲得在回調之外使用的InputStream引用。

Groovy

import org.apache.commons.io.IOUtils
import java.nio.charset.StandardCharsets
 
flowFile = session.get()
if(!flowFile)return
def text = ''
// Cast a closure with an inputStream parameter to InputStreamCallback
session.read(flowFile, {inputStream ->
  text = IOUtils.toString(inputStream, StandardCharsets.UTF_8)
  // Do something with text here
} as InputStreamCallback)
           

Jython

from org.apache.commons.io import IOUtils
from java.nio.charset import StandardCharsets
from org.apache.nifi.processor.io import InputStreamCallback
 
# Define a subclass of InputStreamCallback for use in session.read()
class PyInputStreamCallback(InputStreamCallback):
  def __init__(self):
        pass
  def process(self, inputStream):
    text = IOUtils.toString(inputStream, StandardCharsets.UTF_8)
    # Do something with text here
# end class
flowFile = session.get()
if(flowFile != None):
    session.read(flowFile, PyInputStreamCallback())
# implicit return at the end
           

 JavaScript

var InputStreamCallback =  Java.type("org.apache.nifi.processor.io.InputStreamCallback")
var IOUtils = Java.type("org.apache.commons.io.IOUtils")
var StandardCharsets = Java.type("java.nio.charset.StandardCharsets")
 
var flowFile = session.get();
if(flowFile != null) {
  // Create a new InputStreamCallback, passing in a function to define the interface method
  session.read(flowFile,
    new InputStreamCallback(function(inputStream) {
        var text = IOUtils.toString(inputStream, StandardCharsets.UTF_8);
        // Do something with text here
    }));
}
           

JRuby

java_import org.apache.commons.io.IOUtils
java_import org.apache.nifi.processor.io.InputStreamCallback
 
# Define a subclass of InputStreamCallback for use in session.read()
class JRubyInputStreamCallback
  include InputStreamCallback
  def process(inputStream)
    text = IOUtils.toString(inputStream)
    # Do something with text here
  end
end
jrubyInputStreamCallback = JRubyInputStreamCallback.new
flowFile = session.get()
if flowFile != nil
  session.read(flowFile, jrubyInputStreamCallback)
end
           

應用場景2 : 使用回調函數将内容寫入傳出流檔案,我們希望對傳入的流檔案除了讀取内容,還要填充新的内容.

方案 : 使用會話對象中的write(flowFile,outputStreamCallback)方法。傳遞給write()方法需要一個OutputStreamCallback對象。請注意,因為OutputStreamCallback是一個對象,是以預設情況下内容隻對該對象可見。如果需要使用write()方法之外的資料,請使用更全局範圍的變量。這些示例将示例String寫入flowFile。

Groovy

import org.apache.commons.io.IOUtils
import java.nio.charset.StandardCharsets
 
flowFile = session.get()
if(!flowFile) return
def text = 'Hello world!'
// Cast a closure with an outputStream parameter to OutputStreamCallback
flowFile = session.write(flowFile, {outputStream ->
  outputStream.write(text.getBytes(StandardCharsets.UTF_8))
} as OutputStreamCallback)
           

Jython

from org.apache.commons.io import IOUtils
from java.nio.charset import StandardCharsets
from org.apache.nifi.processor.io import OutputStreamCallback
 
# Define a subclass of OutputStreamCallback for use in session.write()
class PyOutputStreamCallback(OutputStreamCallback):
  def __init__(self):
        pass
  def process(self, outputStream):
    outputStream.write(bytearray('Hello World!'.encode('utf-8')))
# end class
flowFile = session.get()
if(flowFile != None):
    flowFile = session.write(flowFile, PyOutputStreamCallback())
# implicit return at the end
           

JavaScript

var OutputStreamCallback =  Java.type("org.apache.nifi.processor.io.OutputStreamCallback");
var IOUtils = Java.type("org.apache.commons.io.IOUtils");
var StandardCharsets = Java.type("java.nio.charset.StandardCharsets");
 
var flowFile = session.get();
if(flowFile != null) {
  // Create a new OutputStreamCallback, passing in a function to define the interface method
  flowFile = session.write(flowFile,
    new OutputStreamCallback(function(outputStream) {
        outputStream.write("Hello World!".getBytes(StandardCharsets.UTF_8))
    }));
}
           

JRuby

java_import org.apache.commons.io.IOUtils
java_import java.nio.charset.StandardCharsets
java_import org.apache.nifi.processor.io.OutputStreamCallback
 
# Define a subclass of OutputStreamCallback for use in session.write()
class JRubyOutputStreamCallback
  include OutputStreamCallback
  def process(outputStream)
    outputStream.write("Hello World!".to_java.getBytes(StandardCharsets::UTF_8))
  end
end
jrubyOutputStreamCallback = JRubyOutputStreamCallback.new
flowFile = session.get()
if flowFile != nil
  flowFile = session.write(flowFile, jrubyOutputStreamCallback)
end
           

應用場景3 : 使用回調覆寫帶有更新内容的傳入流檔案,您希望重用傳入的流檔案,但希望修改其傳出流檔案的内容。

方案 : 使用來自session對象的write(flowFile,streamCallback)方法。傳遞給write()方法需要StreamCallback對象。StreamCallback提供InputStream(來自傳入流檔案)和outputStream(用于該流檔案的下一個版本),是以您可以使用InputStream擷取流檔案的目前内容,然後修改它們并将它們寫回到流檔案。這會覆寫流檔案的内容,是以對于追加,您必須通過附加讀入内容來處理它,或者使用不同的方法(使用session.append()而不是session.write())。請注意,由于StreamCallback是一個對象,是以預設情況下内容僅對該對象可見。如果需要使用write()方法之外的資料,請使用更全局範圍的變量。

Groovy

import org.apache.commons.io.IOUtils
import java.nio.charset.StandardCharsets
 
flowFile = session.get()
if(!flowFile) return
def text = 'Hello world!'
// Cast a closure with an inputStream and outputStream parameter to StreamCallback
flowFile = session.write(flowFile, {inputStream, outputStream ->
  text = IOUtils.toString(inputStream, StandardCharsets.UTF_8)
  outputStream.write(text.reverse().getBytes(StandardCharsets.UTF_8))
} as StreamCallback)
session.transfer(flowFile, REL_SUCCESS)
           

Jython

from org.apache.commons.io import IOUtils
from java.nio.charset import StandardCharsets
from org.apache.nifi.processor.io import StreamCallback
 
# Define a subclass of StreamCallback for use in session.write()
class PyStreamCallback(StreamCallback):
  def __init__(self):
        pass
  def process(self, inputStream, outputStream):
    text = IOUtils.toString(inputStream, StandardCharsets.UTF_8)
    outputStream.write(bytearray('Hello World!'[::-1].encode('utf-8')))
# end class
flowFile = session.get()
if(flowFile != None):
    flowFile = session.write(flowFile, PyStreamCallback())
# implicit return at the end
           

JavaScript

var StreamCallback =  Java.type("org.apache.nifi.processor.io.StreamCallback");
var IOUtils = Java.type("org.apache.commons.io.IOUtils");
var StandardCharsets = Java.type("java.nio.charset.StandardCharsets");
 
var flowFile = session.get();
if(flowFile != null) {
  // Create a new StreamCallback, passing in a function to define the interface method
  flowFile = session.write(flowFile,
    new StreamCallback(function(inputStream, outputStream) {
        var text = IOUtils.toString(inputStream, StandardCharsets.UTF_8)
        outputStream.write(text.split("").reverse().join("").getBytes(StandardCharsets.UTF_8))
    }));
}
           

JRuby

java_import org.apache.commons.io.IOUtils
java_import java.nio.charset.StandardCharsets
java_import org.apache.nifi.processor.io.StreamCallback
 
# Define a subclass of StreamCallback for use in session.write()
class JRubyStreamCallback
  include StreamCallback
  def process(inputStream, outputStream)
    text = IOUtils.toString(inputStream)
    outputStream.write((text.reverse!).to_java.getBytes(StandardCharsets::UTF_8))
  end
end
jrubyStreamCallback = JRubyStreamCallback.new
flowFile = session.get()
if flowFile != nil
  flowFile = session.write(flowFile, jrubyStreamCallback)
end
           

應用場景4 : 處理腳本處理過程中的錯誤.腳本中發生錯誤(通過資料驗證或抛出異常),并且您希望腳本正常處理它。

方案 : 對于異常,使用腳本語言的異常處理機制(通常是try / catch塊)。對于資料驗證,您可以使用類似的方法,但定義一個布爾變量,如“valid”和if / else子句。ExecuteScript定義“成功”和“失敗”關系; 通常,您的處理将“好”流檔案轉移到成功,“壞”流檔案轉換為失敗(在後一種情況下記錄錯誤).

Groovy

flowFile = session.get()
if(!flowFile) return
try {
  // Something that might throw an exception here
 
  // Last operation is transfer to success (failures handled in the catch block)
  session.transfer(flowFile, REL_SUCCESS)
} catch(e) {
  log.error('Something went wrong', e)
  session.transfer(flowFile, REL_FAILURE)
}
           

Jython

flowFile = session.get()
if(flowFile != None):
    try:
        # Something that might throw an exception here
       
        # Last operation is transfer to success (failures handled in the catch block)
        session.transfer(flowFile, REL_SUCCESS)
    except:
        log.error('Something went wrong', e)
        session.transfer(flowFile, REL_FAILURE)
# implicit return at the end
           

JavaScript

var flowFile = session.get();
if(flowFile != null) {
  try {
    // Something that might throw an exception here
 
    // Last operation is transfer to success (failures handled in the catch block)
    session.transfer(flowFile, REL_SUCCESS)
} catch(e) {
  log.error('Something went wrong', e)
  session.transfer(flowFile, REL_FAILURE)
}
}
           

JRuby

flowFile = session.get()
if flowFile != nil
  begin
    # Something that might raise an exception here
    
    # Last operation is transfer to success (failures handled in the rescue block)
    session.transfer(flowFile, REL_SUCCESS)
  rescue Exception => e 
    log.error('Something went wrong', e)
    session.transfer(flowFile, REL_FAILURE)
  end
end
           

希望本文描述了FlowFile I / O的基礎知識和錯誤處理,但歡迎提出建議和改進!在本系列的下一篇文章中,我将讨論一些更進階的功能,例如動态屬性,子產品,狀态管理以及通路/使用Controller Services.

原文連接配接:https://community.hortonworks.com/articles/75545/executescript-cookbook-part-2.html