nifi的去重方案設計(一)-單隊列内去重.md
在官方元件裡沒有找到去重的元件,這個場景還是比較常見的
會分兩篇來講nifi 隊列内flowflie去重的實作,都不完美,但滿足日常使用
假設flowfile代表任務,以一個技術人員都比較容易了解的,爬蟲任務場景而言
flowfile 分為兩級屬性,attr和檔案體,類似本地檔案的檔案屬性(檔案名,權限,大小,更新日期等)和檔案内容(文本内容,或二進制内容)
在爬蟲的任務場影,flowfile為一條需要下載下傳的url資訊,url位址儲存在attr内,flowfile并不存在檔案體
processor的功能為下載下傳url,上遊的隊列内的flowfile,隻是url的資訊
通常processor的處理是從隊列裡讀一條flowfile來處理,完全1:1的,這樣對隊列裡相同的url會同時處理多次
自已實作一個輕量的processor
通過flowfile的attr 構造唯一key,以此key去重,隻保留唯一(第一條,或最後一條)的資料再輸出到下級隊列即可
使用場景有限,隻對同一隊列内,小範圍時間視窗的flowfile生效,該去重方案隻是輔助,無法徹底解決去重問題,徹底解決需要外部存儲的支援,該方法做去重主要為減少外部存儲的io壓力
主要代碼見 git,結構很簡單,可以當作熟悉nifi processor的定制開發規範的練習項目
https://github.com/cclient/nifi-unique-processor
Nifi Unique Processor
<custom_id:1,custom_value:123> <custom_id:1,custom_value:123>
<custom_id:1,custom_value:456> -> unique by ${custom_id}->
<custom_id:2,custom_value:789> <custom_id:2,custom_value:789>
nifi queued distinct/unique by 'custom key'
deploy
1 compile
mvn package
2 upload to one of
nifi.nar.library.directory=./lib
nifi.nar.library.directory.custom=./lib_custom
nifi.nar.library.autoload.directory=./extensions
nifi.nar.working.directory=./work/nar/
cp nifi-unique-nar/target/nifi-unique-nar-0.1.nar nifi/lib_custom/
3 restart nifi if need
nifi/bin/nifi.sh restart
@Override
public void onTrigger(ProcessContext processContext, ProcessSession session) throws ProcessException {
int bulkSize = processContext.getProperty(BULK_SIZE).asInteger();
if (bulkSize == 0) {
bulkSize = Integer.MAX_VALUE;
}
List<FlowFile> orginalList = session.get(bulkSize);
if (orginalList == null || orginalList.size() == 0) {
return;
}
boolean retainFirst = processContext.getProperty(RETAIN_FIRST).asBoolean();
Map<String, FlowFile> map = new HashMap(orginalList.size());
List<FlowFile> needRemoveFlowFiles = new ArrayList<>(orginalList.size());
List<FlowFile> errorFlowFiles = new ArrayList<>(orginalList.size());
List<FlowFile> needNextFlowFiles = new ArrayList<>(orginalList.size());
orginalList.forEach(flowFile -> {
String key = processContext.getProperty(UNIQUE_KEY).evaluateAttributeExpressions(flowFile).getValue();
if (key == null || key.isEmpty()) {
errorFlowFiles.add(flowFile);
return;
}
if (map.containsKey(key)) {
if (retainFirst) {
needRemoveFlowFiles.add(flowFile);
} else {
FlowFile oldSame = map.get(key);
needRemoveFlowFiles.add(oldSame);
needNextFlowFiles.remove(oldSame);
needNextFlowFiles.add(flowFile);
}
} else {
needNextFlowFiles.add(flowFile);
map.put(key, flowFile);
}
});
logger.info("distinct orginal size: {},retain size: {},remove size: {},error size: {}", Arrays.asList(orginalList.size(), needNextFlowFiles.size(), needRemoveFlowFiles.size(), errorFlowFiles.size()).toArray());
session.transfer(needNextFlowFiles, REL_SUCCESS);
session.transfer(errorFlowFiles, REL_FAILURE);
session.remove(needRemoveFlowFiles);
}