天天看點

nifi的去重方案設計(一)-單隊列内去重.mdNifi Unique Processor

nifi的去重方案設計(一)-單隊列内去重.md

在官方元件裡沒有找到去重的元件,這個場景還是比較常見的

會分兩篇來講nifi 隊列内flowflie去重的實作,都不完美,但滿足日常使用

假設flowfile代表任務,以一個技術人員都比較容易了解的,爬蟲任務場景而言

flowfile 分為兩級屬性,attr和檔案體,類似本地檔案的檔案屬性(檔案名,權限,大小,更新日期等)和檔案内容(文本内容,或二進制内容)

在爬蟲的任務場影,flowfile為一條需要下載下傳的url資訊,url位址儲存在attr内,flowfile并不存在檔案體

processor的功能為下載下傳url,上遊的隊列内的flowfile,隻是url的資訊

通常processor的處理是從隊列裡讀一條flowfile來處理,完全1:1的,這樣對隊列裡相同的url會同時處理多次

自已實作一個輕量的processor

通過flowfile的attr 構造唯一key,以此key去重,隻保留唯一(第一條,或最後一條)的資料再輸出到下級隊列即可

使用場景有限,隻對同一隊列内,小範圍時間視窗的flowfile生效,該去重方案隻是輔助,無法徹底解決去重問題,徹底解決需要外部存儲的支援,該方法做去重主要為減少外部存儲的io壓力

主要代碼見 git,結構很簡單,可以當作熟悉nifi processor的定制開發規範的練習項目

https://github.com/cclient/nifi-unique-processor

Nifi Unique Processor

<custom_id:1,custom_value:123>                              <custom_id:1,custom_value:123>
<custom_id:1,custom_value:456> -> unique by ${custom_id}-> 
<custom_id:2,custom_value:789>                              <custom_id:2,custom_value:789>
           

nifi queued distinct/unique by 'custom key'

deploy

1 compile

mvn package

2 upload to one of

nifi.nar.library.directory=./lib
nifi.nar.library.directory.custom=./lib_custom
nifi.nar.library.autoload.directory=./extensions
nifi.nar.working.directory=./work/nar/

           

cp nifi-unique-nar/target/nifi-unique-nar-0.1.nar nifi/lib_custom/

3 restart nifi if need

nifi/bin/nifi.sh restart

@Override
    public void onTrigger(ProcessContext processContext, ProcessSession session) throws ProcessException {
        int bulkSize = processContext.getProperty(BULK_SIZE).asInteger();
        if (bulkSize == 0) {
            bulkSize = Integer.MAX_VALUE;
        }
        List<FlowFile> orginalList = session.get(bulkSize);
        if (orginalList == null || orginalList.size() == 0) {
            return;
        }
        boolean retainFirst = processContext.getProperty(RETAIN_FIRST).asBoolean();
        Map<String, FlowFile> map = new HashMap(orginalList.size());
        List<FlowFile> needRemoveFlowFiles = new ArrayList<>(orginalList.size());
        List<FlowFile> errorFlowFiles = new ArrayList<>(orginalList.size());
        List<FlowFile> needNextFlowFiles = new ArrayList<>(orginalList.size());
        orginalList.forEach(flowFile -> {
            String key = processContext.getProperty(UNIQUE_KEY).evaluateAttributeExpressions(flowFile).getValue();
            if (key == null || key.isEmpty()) {
                errorFlowFiles.add(flowFile);
                return;
            }
            if (map.containsKey(key)) {
                if (retainFirst) {
                    needRemoveFlowFiles.add(flowFile);
                } else {
                    FlowFile oldSame = map.get(key);
                    needRemoveFlowFiles.add(oldSame);
                    needNextFlowFiles.remove(oldSame);
                    needNextFlowFiles.add(flowFile);
                }
            } else {
                needNextFlowFiles.add(flowFile);
                map.put(key, flowFile);
            }
        });
        logger.info("distinct orginal size: {},retain size: {},remove size: {},error size: {}", Arrays.asList(orginalList.size(), needNextFlowFiles.size(), needRemoveFlowFiles.size(), errorFlowFiles.size()).toArray());
        session.transfer(needNextFlowFiles, REL_SUCCESS);
        session.transfer(errorFlowFiles, REL_FAILURE);
        session.remove(needRemoveFlowFiles);
    }