天天看點

DolphinDB 流計算應用:引擎級聯監測門禁異常狀态

物聯網的發展為智能安防和自動化監控帶來了更多便利,與此同時,新型城鎮建設、智慧城市與智慧社群的發展也為門禁管理等安防問題智能化提出了更高的要求。在智能化發展的背景下,門禁不僅僅是門禁,更是一套內建了訪客、考勤、消費、巡更、梯控等更多功能的全面便捷的系統安全應用。目前門禁系統主要用于出入口管理,在大陸加速推動智慧城市、智慧工地、智慧社群等智慧化建設發展的前提下,門禁系統智能化更新的趨勢成為必然,其普及率和使用率也将更加廣泛,随着接入門禁系統裝置越來越多,對其産生的海量資料進行實時快速的處理也成為了日益重要的問題。

DolphinDB 提供了流資料表和流計算引擎用于實時資料處理,為智能安防提供了有力支援。本教程将介紹如何使用流計算引擎多級級聯實作對門禁裝置異常狀态的實時監測。

1. 背景介紹

1.1 行業背景

大陸新型城鎮化建設、智慧城市以及智慧社群的不斷發展,給智能安防産業提供了巨大的市場拓展空間。智能安防系統正在向安全、軍事、交通、政府、電力、通信、能源、金融、文博、倉庫、别墅、工廠等衆多行業領域延伸,涵蓋廣泛的智能化系列産品及解決方案,主要包括智能安防視訊分析系統、智能交通視訊監控系統、智慧城市智能監控系統、基于異常行為的定制化系統以及計算機視覺分析的前瞻性技術探索等。

根據人們對智能安防系統日益增長的需求,智能安防系統逐漸向名額性能優異、環境适應性強、運作可靠穩定和技術更加相容的方向發展。常見的智能安防系統一般包含有監控、報警、門禁和遠端控制 4 個主要功能,可以單獨運作也可統一管理。而門禁則是整個智能安防系統中的基礎應用,同時也關系到物聯網領域中的公共安全、城市管理、智能家居等多個方面。

1.2 真實場景

目前一般的門禁系統工作于用戶端/伺服器端方式,包含以下功能設定,可以完成事件監控、報警關聯。

①門禁、報警綜合管理系統伺服器:提供集中管理及監控,輸出,關聯功能。

②門禁工作站:門禁工作站提供功能設定及事件監控,可以接上發夾裝置作為發夾工作站。

③報警輸入:每個控制箱具有獨立的報警輸入接口,接報警輸入裝置,如紅外報警器等。

④報警輸出:每個控制箱具有獨立的報警輸出接口,接報警輸出裝置,如聲光報警器等。

⑤門禁控制器:是門禁管理系統的核心部分,對系統的卡直接管理及控制相關裝置,具有存儲功能,可存放持卡人資料及各種事件記錄。

⑥讀卡器:工作于射頻方式,采集感應卡的資料傳輸到門禁控制器,以便控制器進行各種管理及相應的控制。

⑦電鎖:電子方式開關,實作開門及鎖門,由門禁控制器直接控制。

⑧開門按鈕:提供友善的開門方式。

⑨門磁:檢測門的狀态資訊,然後傳輸到控制器。

⑩報警輸入輸出裝置:為加強系統的保安,可以将輸入輸出裝置接入門禁控器的輸入輸出接口,實作系統的報警及關聯。

DolphinDB 流計算應用:引擎級聯監測門禁異常狀态

圖1 工業中心門禁管理系統結構

上圖展示了一個常見的門禁管理系統結構,一般而言,報警系統是安防及門禁系統中保障安全問題的重要功能元件及環節,也可以實作與其他監控裝置的聯防聯控。随着接入智能門禁系統的終端越來越多,如何對海量資料進行實時高效計算,及時回報報警消息,成為智能門禁以及智慧社群建設的關鍵問題。

1.3 DolphinDB優勢

DolphinDB 是一款高性能分布式時序資料庫,內建了功能強大的程式設計語言和高容量高速度的流資料分析系統,為海量結構化資料的快速存儲、檢索、分析及計算提供一站式解決方案,适用于工業物聯網領域。DolphinDB 提供的流式計算架構具備高性能實時流資料處理能力,支援毫秒甚至微秒級别資料計算,非常适合用于門禁安防資料的處理和分析。

2. 需求

假定有一個監控系統,對所有門禁裝置每5秒鐘采集1次資料,同時開門或關門的事件會主動上報資料,采集後的資料以 json 格式寫入 mqtt 伺服器,本文使用到的資料示例如下:

| recordType | doorEventCode | eventDate           | readerType | sn    | doorNum | card     |
|------------|---------------|---------------------|------------|-------|---------|----------|
| 0          | 11            | 2022.12.01 00:00:00 | false      | a1008 | 1       | ic100000 |
| 1          | 65            | 2022.12.01 00:00:00 | false      | a1010 | 2       | ic100000 |
| 3          | 61            | 2022.12.01 00:00:53 | true       | a1004 | 1       | ic100044 |
| 2          | 66            | 2022.12.01 00:00:53 | true       | a1002 | 2       | ic100020 |
| 2          | 60            | 2022.12.01 00:19:54 | false      | a1008 | 1       | ic100000 |
| 3          | 11            | 2022.12.01 00:19:54 | true       | a1000 | 2       | ic100000 |
| 2          | 66            | 2022.12.01 00:23:21 | true       | a1009 | 1       | ic100082 |
| 2          | 61            | 2022.12.01 00:23:21 | false      | a1006 | 2       | ic100068 |
| 3          | 12            | 2022.12.01 00:45:26 | true       | a1003 | 1       | ic100000 |
| 1          | 11            | 2022.12.01 00:45:26 | false      | a1004 | 2       | ic100000 |           

本教程實作門禁異常狀态檢測需要用到的資料字段說明如下:

字段名 說明
doorEventCode 事件碼``11: 合法開門 12: 密碼開門 56: 按鈕開門 60: 開門 61: 關門 65: 軟體開門 66:軟體關門
eventDate 事件時間
doorNum 門号 0-4

保持門禁正常關閉狀态是保證社群或樓宇内居民安全的基礎需求之一,是以本案例需要實作的門禁異常狀态檢測需求是:開門狀态連續存在超過5分鐘報警。

3. 實驗環境

實驗環境的配置如下:

  • 伺服器環境:
    • CPU類型:Intel(R) Core(TM) i5-11500 @ 2.70GHz 2.71 GHz
    • 邏輯 CPU 總數:12
    • 記憶體:16 GB
    • OS:64位 Windows
  • DolphinDB server 部署
    • server 版本:2.00.8 Windows 64,社群版
    • 部署模式:單節點模式
  • DolphinDB GUI:1.30.14 版本
  • MQTT 伺服器:mosquitto-2.0.15

4. 設計思路

DolphinDB 的流計算架構目前已提供時序聚合引擎、橫截面聚合引擎、異常檢測引擎、會話視窗引擎和響應式狀态引擎等10餘種計算引擎應對不同計算場景。本文主要介紹如何用響應式狀态引擎和會話視窗引擎實作門禁異常狀态的實時監測。

4.1 使用 DolphinDB 内置流計算引擎監測門禁異常狀态

  • 響應式狀态引擎(createReactiveStateEngine)

DolphinDB 流資料引擎所計算的因子可分為無狀态因子與有狀态因子。無狀态因子僅根據最新一條資料即可完成計算,不需要之前的資料,亦不依賴之前的計算結果。有狀态因子計算除需要最新的資料,還需要曆史資料或之前計算得到的中間結果,統稱為“狀态”。是以有狀态因子計算需要存儲狀态,以供後續因子計算使用,且每次計算都會更新狀态。響應式狀态引擎每輸入一條資料都将觸發一條結果輸出,是以輸入和輸出資料量一緻。響應式狀态引擎的算子中隻能包含向量函數,DolphinDB 針對生産業務中的常見狀态算子(滑動視窗函數、累積函數、序列相關函數和 topN 相關函數等)進行了優化,大幅提升了這些算子在響應式狀态引擎中的計算效率。

  • 會話視窗引擎(creatSessionWindowEngine)

會話視窗可以了解為一個活動階段(資料産生階段),其前後都是非活動階段(無資料産生階段)。會話視窗引擎與時間序列引擎極為相似,它們計算規則和觸發計算的方式相同。不同之處在于時間序列引擎具有固定的視窗長度和滑動步長,但會話視窗引擎的視窗不是按照固定的頻率産生的,其視窗長度也不是固定的。會話視窗引擎以引擎收到的第一條資料的時間戳作為第一個會話視窗的起始時間。會話視窗收到某條資料之後,若在指定的等待時間内仍未收到下一條新資料,則(該資料的時間戳 + 等待時間)是該視窗的結束時間。視窗結束後收到的第一條新資料的時間戳是新的會話視窗的起始時間。

在物聯網領域應用場景中,由于裝置線上的時間段不同,可能某些時間段有大量資料産生,而某些時間段完全沒有資料。若對這類特征的資料進行滑動視窗計算,無資料的視窗會增加不必要的計算開銷。是以 DolphinDB 開發了會話視窗引擎,以解決此類問題。

4.2 設計思路與方案

對于本案例的需求,由于門禁監控裝置采用輪詢方式5秒采集一次資料,在沒有新事件上報的時間裡,會産生重複記錄的資料,是以需要首先對采集資料進行去重處理,再檢測出資料中狀态持續逾時的記錄。此時的記錄會包括所有狀态持續超過 5 分鐘的資料,是以仍需将資料接入下一級引擎去除關門告警,隻保留開門狀态逾時報警。根據 DolphinDB 各個引擎的特點,采用響應式狀态引擎完成第一個與第三個過濾篩選資料的任務,并通過會話視窗引擎檢測出逾時資料。将三個引擎級聯,實作多級引擎級聯檢測開門時間大于5分鐘的異常門禁狀态的流水線處理模式。

在 DolphinDB 中的處理流程如下圖所示:

DolphinDB 流計算應用:引擎級聯監測門禁異常狀态

圖2 門禁異常狀态資料處理流程

5. 實作步驟

5.1 定義并共享輸入輸出流資料表

首先定義一個用于實時接收門禁監控裝置資料的流資料表,表結構共包含七列,通過 enableTableShareAndPersistence函數共享流資料表并持久化到硬碟上。通過 cacheSize 參數将記憶體中可儲存的最大資料量設定為10萬行。代碼如下:

st=streamTable(
	array(INT,0) as recordype, //記錄類型
	array(INT,0) as doorEventCode, //事件碼
    array(DATETIME,0) as eventDate, //事件時間 
    array(BOOL,0) as readerType, //進出類型 1:入 0:出
   	array(SYMBOL,0) as sn, //裝置SN号
    array(INT,0) as doorNum, //門号
    array(SYMBOL,0) as card //卡号            
	)
enableTableShareAndPersistence(st,`doorRecord, false, true, 100000, 100, 0);           

其次定義異常狀态流資料表 outputSt1 ,用于響應式狀态引擎的輸出,并将其持久化到磁盤上。createReactiveStateEngine 響應式狀态引擎對輸出表的格式有嚴格要求,它的第一列必須是分組列,其中,根據 keyColumn 的設定,輸出表的前幾列必須和 keyColumn 設定的列及其順序保持一緻。在本例中,分組列為門号 doorNum ,資料類型為 INT 。之後的兩列分别為 DATETIME 類型和 INT 類型,用于記錄時間和事件碼。建立及共享流資料表代碼如下:

out1 =streamTable(10000:0,`doorNum`eventDate`doorEventCode,[INT,DATETIME, INT])
enableTableShareAndPersistence(out1,`outputSt,false,true,100000)           

有關函數及各參數的詳細說明,參考 DolphinDB使用者手冊 https://www.dolphindb.cn/cn/help/index.html

5.2 建立響應式狀态引擎過濾重複資料

響應式狀态引擎會對輸入的每一條消息做出計算響應,産生一條記錄作為結果,計算的結果在預設情況下都會輸出到結果表,也就是說輸入 n 個消息,輸出 n 條記錄。如果希望僅輸出一部分結果,可以啟用過濾條件,隻有滿足條件的結果才會輸出。

下面的例子檢查記錄資料是否有變化,隻有事件類型有變化的記錄才會輸出。設定分組列為 doorNum,輸出表各列的順序為分組列、計算結果列,需要注意保持下一級引擎 dummyTable 的 Schema 與該順序一緻。設定 filter 為 prev(doorEventCode)!=doorEventCode,這裡以元代碼的形式表示過濾條件,隻有符合過濾條件的結果,即事件碼有變化的資料資料才會被輸出到通過 outputTable 設定的輸出表中。兩個計算名額為 eventDate 和 doorEventCode,表示原樣輸出。

DolphinDB 内置的流計算引擎均實作了資料表(table)的接口,是以多個引擎流水線處理變得異常簡單,隻要将後一個引擎作為前一個引擎的輸出即可。引入流水線處理,可以解決更為複雜的計算問題。在本例中,将輸出表通過 getStreamEngine() 方法接入下一級會話視窗引擎。具體建立引擎代碼如下:

reactivEngine1 = createReactiveStateEngine(name=`reactivEngine1,metrics=<[eventDate,doorEventCode]>,
    dummyTable=objByName(`doorRecord),outputTable= getStreamEngine("swEngine"),keyColumn=`doorNum,
    filter=<prev(doorEventCode)!=doorEventCode>)           

有關函數及各參數的詳細說明,參考 DolphinDB使用者手冊。

5.3 通過級聯會話視窗引擎檢測狀态逾時資料

首先建立一張記憶體表,為響應式狀态引擎提供輸入的表結構,該表結構需要與上一級引擎輸出表的結構一緻。在會話視窗引擎中,設定分組列 keyColumn為門号 doorNum,時間列 timeColumn 為時間 eventDate。檢測需求是五分鐘内無資料報警,是以 sessionGap 為300(機關為秒,同 eventDate 列),表示收到某條資料後經過該時間的等待仍無新資料到來,就終止目前視窗。metrics 設為 last(doorEventCode),即傳回視窗内的最後一條記錄資料。設定 useSessionStartTime 為 false,表示輸出表中的時刻為資料視窗結束時刻,即每個視窗中最後一條資料的時刻 + sessionGap。訂閱流資料後,會話視窗引擎的輸入資料為上一級響應式狀态引擎的輸出,輸出作為下一級響應式狀态引擎的輸入。參考 DolphinDB 使用者手冊中 createSessionWindowEngine頁面内容完成對其他參數的設定。代碼如下:

swOut2 = table(1:0,`doorNum`eventDate`doorEventCode,[INT,DATETIME,INT])
swEngine = createSessionWindowEngine(name="swEngine",sessionGap = 300,metrics=<last(doorEventCode)>,
    dummyTable = objByName(`doorRecord), outputTable = getStreamEngine("reactivEngine"), 
    timeColumn = `eventDate, keyColumn =`doorNum, useSessionStartTime = false)           

5.4 響應式狀态引擎過濾關門告警

上級會話視窗引擎擷取到的資料包括開門和關門超過5分鐘的資料,是以需要再通過響應式狀态引擎過濾掉關門狀态逾時資料,隻保留開門告警。與上一級引擎類似,首先同樣建立一張記憶體表,為響應式狀态引擎提供輸入的表結構,在該響應式狀态引擎中,設定分組列 keyColumn 為門号 doorNum ,兩個計算名額為 eventDate 和 doorEventCode,表示原樣輸出。filter參數設定為 doorEventCode in [11,12,56,60,65,67],即隻輸出記錄的事件碼為開門事件的資料。參考 DolphinDB 使用者手冊中 createReactiveStateEngine頁面内容完成對其他參數的設定。代碼如下:

swOut1 =table(1:0,`eventDate`doorNum`doorEventCode,[DATETIME,INT, INT])
reactivEngine = createReactiveStateEngine(name=`reactivEngine, metrics=<[eventDate,doorEventCode]>, 
    dummyTable=swOut1,outputTable= objByName(`outputSt),keyColumn= "doorNum",
    filter=<doorEventCode in [11,12,56,60,65,67]>)           

5.5 訂閱流資料

過濾了關門告警資料後,訂閱流資料表 doorRecord 并将 handler 設定為 “向 reactivEngine1 中添加資料”,把收到的流資料寫入上述會話視窗引擎,msgAsTable 設為 true ,表示訂閱的資料是由列組成的元組。代碼如下:

subscribeTable(tableName="doorRecord", actionName="monitor", offset=0,
               handler=append!{reactivEngine1}, msgAsTable=true           

5.6 從 MQTT 伺服器接收資料

DolphinDB 提供了 MQTT 插件用于訂閱 MQTT 伺服器的資料。DolphinDB server 2.00.8 linux 64 JIT 版本已包含 MQTT 插件在 server/plugins/mqtt 目錄下,不用下載下傳插件即可直接加載使用。使用者可以使用 mqtt::subscribe 從 MQTT 伺服器訂閱資料,在訂閱時需要資料格式解析函數,目前插件提供了 json 和 csv 格式的解析函數,本例使用 mqtt::createJsonParser 解析 json 格式資料。示例代碼如下:

loadPlugin(getHomeDir()+"/plugins/mqtt/PluginMQTTClient.txt")
sp = createJsonParser([INT,INT,DATETIME, BOOL,SYMBOL,INT,SYMBOL], 
    `recordType`doorEventCode`eventDate`readerType`sn`doorNum`card)
mqtt::subscribe(host, port, topic, sp, objByName(`doorRecord))           

6. 模拟寫入與驗證

6.1 模拟門禁裝置寫入資料

下列代碼模拟門禁裝置寫入開門事件與關門事件,每五秒産生一次資料,共産生350條門禁資料記錄,非重複記錄有7次,逾時資料有3條,其中開門逾時記錄有兩條,模拟寫入資料代碼如下:

def duplicateData(mutable st, num, doorCode, time){
    for(i in 0:num){
        eventTime = time
        st.append!(table(rand(0..5,1) as recordType, doorCode as doorEventCode, eventTime as eventDate, rand([true,false],1) as readerType, rand(`a+string(1000..1010),1) as sn, 1 as doorNum, rand(`ic+string(100000..100000),1) as card))
        eventTime = datetimeAdd(eventTime, 5, `s)
    }
}
startEventDate = 2022.12.01T00:00:00
duplicateData(st, 75, 11, startEventDate)
startEventDate=datetimeAdd(startEventDate , 375, `s)
duplicateData(st, 25, 56, startEventDate)
startEventDate=datetimeAdd(startEventDate , 125, `s)
duplicateData(st, 100, 61, startEventDate)
startEventDate=datetimeAdd(startEventDate , 500, `s)
duplicateData(st, 25, 66, startEventDate)
startEventDate=datetimeAdd(startEventDate , 125, `s)
duplicateData(st, 70, 12, startEventDate)
startEventDate=datetimeAdd(startEventDate , 350, `s)
duplicateData(st, 30, 60, startEventDate)
startEventDate=datetimeAdd(startEventDate , 150, `s)
duplicateData(st, 25, 67, startEventDate)
startEventDate=datetimeAdd(startEventDate , 125, `s)           

6.2 驗證監測結果準确性

從模拟的資料中查詢出開門逾時且符合過濾條件的資料,通過 eqObj()方法比較流計算引擎擷取到的異常資料與真實異常資料是否相同,進而驗證監測結果的準确性。

t = select *, deltas(eventDate), prev(doorNum), prev(eventDate), prev(doorEventCode) 
    from doorRecord context by doorNum 
resultTable = select prev_doorNum as doorNum,prev_eventDate+300 as eventDate,
              prev_doorEventCode as doorEventCode from t 
              where deltas_eventDate>= 300 and prev_doorEventCode in [11,12,56,60,65,67] 
              and (prev(eventDate)!=eventDate or prev(doorEventCode)!=doorEventCode)
              order by eventDate
eqObj(resultTable.values(),outputSt1.values())           

7. 總結

在網絡與數字技術飛速發展的今天,門禁系統早已不再是單純的門道及鑰匙管理,而是逐漸發展成為一套完整的出入門禁安全管理系統。如今的門禁管理系統集微機自動識别技術和現代安全管理措施為一體,涉及電子、機械、光學、聲學、計算機技術、通訊技術、生物技術等諸多新技術,為各類機要部門實作出入口安全防範管理提供了有效措施。

本教程基于 DolphinDB 流資料處理架構,提供了一種實時監測門禁裝置異常狀态的低延時解決方案,能夠有效提升對于海量資料的實時計算效率,滿足了門禁系統智能化的計算需求。利用 DolphinDB 流資料處理架構中引擎的流水線處理方式,實作了會話視窗引擎和響應式狀态引擎級聯,将開發難度大大降低。本教程旨在提高開發人員在使用 DolphinDB 内置的流資料架構開發物聯網領域流計算業務場景時的開發效率、降低開發難度,更好地挖掘 DolphinDB 在複雜實時流計算場景中的價值。

參考文獻

  1. 《智能化樓宇技術》
  2. 《安防&智能化》
  3. 《智能安防對于智慧社群的重要性》
  4. 《淺談門禁為何是智能家居安防的基礎》
  5. 《智能安防系統在智慧城市中的應用及展望》
  6. 《淺談智能化門禁系統的當下與未來》

附錄

代碼:https://gitee.com/dolphindb/Tutorials_CN/tree/master/script/streaming_engine_anomaly_alerts

繼續閱讀