天天看點

nifi将mysql資料導到hdfs_用Nifi 從web api 取資料到HDFS(示例代碼)

import org.apache.commons.io.IOUtils

import java.nio.charset.*

import java.text.SimpleDateFormat;

import java.lang.StringBuilder;

import java.util.Calendar;

def flowFile = session.create()

flowFile = session.write(flowFile, {inputStream, outputStream ->

SimpleDateFormat sdf = new SimpleDateFormat("yyyyMMdd");

Calendar cal = Calendar.getInstance();

StringBuilder sb = new StringBuilder();

for(int i = 0; i < 9500; i++) {

cal.add(Calendar.DATE, -1);

sb.append(sdf.format(cal.getTime()) + "\n" );

}

//println(sb);

outputStream.write(sb.toString().getBytes(StandardCharsets.UTF_8))

} as StreamCallback)

//flowFile = session.putAttribute(flowFile, ‘filename‘, ‘get_date‘)

session.transfer(flowFile, REL_SUCCESS)

3. 用SplitText生成每行一個的日期

Line Split Count    1

4. 用ExtractText 取到日期參數

nifi将mysql資料導到hdfs_用Nifi 從web api 取資料到HDFS(示例代碼)

5. 用UpdateAttribute生成url及filename

nifi将mysql資料導到hdfs_用Nifi 從web api 取資料到HDFS(示例代碼)

這裡一定要設定filename,不然,所有的檔案名都一樣,最後隻能成功插入一個記錄到HDFS。

6.  用InvokeHttp擷取資料

nifi将mysql資料導到hdfs_用Nifi 從web api 取資料到HDFS(示例代碼)
nifi将mysql資料導到hdfs_用Nifi 從web api 取資料到HDFS(示例代碼)

7. 用PutHDFS把資料插入到HDFS

nifi将mysql資料導到hdfs_用Nifi 從web api 取資料到HDFS(示例代碼)

注意這裡的Directory 要加上/, 不然就插入到user/root/nifi下了,而不是files下在的nifi了。