ä½è ï¼Mike Frank è¯ï¼å¾è½¶é¬
å¨æ¤å客ä¸ï¼æå°æ¼ç¤ºå¦ä½å¨è®¸å¤mysqlå®ä¾ä¹é´å°å®¡è®¡æ¥å¿è¿è¡å并å½æ¡£ãå¨åç»æç« ä¸ï¼æå°å±ç¤ºå¦ä½éè¿å¨è¯¥å½æ¡£æ件ä¸å建ä¸ä¸ªç®åçåå¸é¾æ¥æ©å±æ¤ç¤ºä¾âè¿æ ·æ¨å°±å¯ä»¥è¯ææ¯å¦å¯ä»¥éè¿ä»»ä½æ¹å¼å¯¹å ¶è¿è¡äºä¿®æ¹æ污æï¼ä»¥åå¨ä½å¤è¿è¡äºä¿®æ¹ã
å¨ç¤ºä¾ä»£ç ä¸ï¼æå°ä½¿ç¨mysql audit_log_readå½æ°çæ°æ©å±åè½ï¼å¹¶è¯´æ为ä»ä¹mysqlx APIå¯ä»¥ä½¿æäºä»»å¡æ´å ç®åãè¿ä¸ªæ°ç审计é 读åè½å·²å¨MySQL 8.0.22ä¼ä¸çä¸åå¸ã示ä¾å 容使ç¨ä»¥SQLåpython模å¼è¿è¡çMySQL Shellã
å°å±ç¤ºä¸äºçå ¶ä»æå·§å æ¬ï¼
- ä»JSON审计æ°æ®ä¸æåè¡â使ç¨JSON_TABLEå½æ°å°JSONæ°æ®è½¬æ¢ä¸ºè¡¨æ ¼å¼ã
- å°è¿äºè¡ä»å·²å®¡è®¡çæ°æ®åºæå ¥å°å®¡è®¡æ°æ®å½æ¡£çMySQLæ°æ®åºä¸ãå¦æ¨æè§ï¼mysqlx APIå°ä½¿äºæ åå¾æ´å ç®åã
ä¸äºäºå®ãæ£å¦è®¸å¤DBAå¯ä»¥åè¯æ¨çé£æ ·ï¼æ 论æ¯æ³è§é»æ¢è¿æ¯åºäºå ¶ä»å®å ¨åå ï¼DBAé常ä¸æ³ï¼ææ æ³ï¼è®¿é®è¿è¡MySQLçåºå±OSæå¡å¨ãDBA没æSSHï¼é常ä»å®å ¨è§åº¦æ¥çï¼è¿è¡æ°æ®åºæå¡çOSä¸çå 容è¶å°è¶å¥½ã
ç±äºå®å ¨æ§ãåæçå¤ç§åå ï¼æä½³åæ³æ¯ç»å¸¸ä»MySQLæå¡å¨ä¸è·å审计æ°æ®ï¼å¹¶å°å ¶æ¶éå°ä¸äºä¸å¤®æ°æ®åå¨ä¸ï¼æ¨å¯ä»¥å¨å ¶ä¸æ¥çææMySQLæå¡å¨ä¸çæ´»å¨ã为ä»ä¹ä¼è¿æ ·åï¼
- æäºåæ
- é²æ¢æ°æ®è¢«ç ´å
- æ³è§è¦æ±
- åå¨ç®¡ç
å½ç¶ï¼å¯ä»¥ä½¿ç¨å¤ç§æ¹æ³éè¿åç§äº§åæ¥æ§è¡ç§»å¨å®¡è®¡æ°æ®ä»»å¡ãè¿åªæ¯ä¸ç§å¯è½ç设计模å¼ï¼å¯ä»¥è½»æ¾å°è¿è¡ç¬¬ä¸æ¹éæææ´æ¹ä¸ºå°æ°æ®åå ¥å¯¹è±¡åå¨ææäºå ¶ä»å®¡è®¡æ°æ®åå¨åºã
å¨æ¯è¯æ¹é¢ï¼æå°å并审计æ°æ®çæå¡å¨ç§°ä¸ºâå½æ¡£æå¡å¨âã该æå¡å¨å°æ¥æä¸ä¸ªå¸æ·ï¼æå°ç§°å ¶ä¸ºâ auditarchiverâï¼è¯¥å¸æ·åªè½å¨audit_data表ä¸æå ¥å¹¶éæ©ãï¼å®ä¸è½æ´æ¹æ°æ®ï¼ã
å°è¦æå审计æ°æ®çæ¯ä¸ªæå¡å¨é½æä¸ä¸ªå¸æ·ï¼è¯¥å¸æ·éè¿SQLè¿æ¥è¯»å审计æ°æ®ï¼å¹¶ä»å®¡è®¡æ件ä¸è¯»åJSONæ°æ®ã
é¦å 让æ们以管çå身份ç»å½å°å½æ¡£MySQLæå¡å¨å®ä¾ä¸âæå°ä½¿ç¨rootãæ´ä¸ªç¤ºä¾é½éè¦ä½¿ç¨mysql shellãå®å æ¬ç¨äºä»ç®æ æå¡å¨æå审计æ°æ®è¿è¡è®¡åæ¹å¤çå½æ¡£çpythonã
æ¥éª¤1 â审计å½æ¡£æ°æ®åºè®¾ç½®ã
å¨å½æ¡£æå¡å¨ä¸å建模å¼å表
å¨å®¡è®¡æ°æ®å½æ¡£æå¡å¨ä¸
> mysqlsh
\sql
\connect root@<archiving server>;
create schema audit_archive;
use audit_archive;
CREATE TABLE `audit_data` (
`server_uuid` varchar(45) NOT NULL,
`id` int NOT NULL,
`ts` timestamp NOT NULL,
`class` varchar(20) DEFAULT NULL,
`event` varchar(80) DEFAULT NULL,
`the_account` varchar(80) DEFAULT NULL,
`login_ip` varchar(200) DEFAULT NULL,
`login_os` varchar(200) DEFAULT NULL,
`login_user` varchar(200) DEFAULT NULL,
`login_proxy` varchar(200) DEFAULT NULL,
`connection_id` varchar(80) DEFAULT NULL,
`db` varchar(40) DEFAULT NULL,
`status` int DEFAULT NULL,
`connection_type` varchar(40) DEFAULT NULL,
`connect_os` varchar(40) DEFAULT NULL,
`pid` varchar(40) DEFAULT NULL,
`_client_name` varchar(80) DEFAULT NULL,
`_client_version` varchar(80) DEFAULT NULL,
`program_name` varchar(80) DEFAULT NULL,
`_platform` varchar(80) DEFAULT NULL,
`command` varchar(40) DEFAULT NULL,
`sql_command` varchar(40) DEFAULT NULL,
`command_status` varchar(40) DEFAULT NULL,
`query` varchar(40) DEFAULT NULL,
`query_status` int DEFAULT NULL,
`start_server_id` varchar(400) DEFAULT NULL,
`server_os_version` varchar(100) DEFAULT NULL,
`server_mysqlversion` varchar(100) DEFAULT NULL,
`args` varchar(80) DEFAULT NULL,
`account_host` varchar(80) DEFAULT NULL,
`mysql_version` varchar(80) DEFAULT NULL,
`the_os` varchar(80) DEFAULT NULL,
`the_os_ver` varchar(80) DEFAULT NULL,
`server_id` varchar(8) DEFAULT NULL,
PRIMARY KEY (`server_uuid`,`id`,`ts`)
) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4 COLLATE=utf8mb4_0900_ai_ci;
å¤å¶
æ¥éª¤2 âå¨å½æ¡£æå¡å¨ä¸
é¦å å建å¸æ·ãæäºauditarchiverå¸æ·éæ©åæå ¥æéãï¼æ²¡æå ¶ä»æéï¼
\connect root@<archiving server>;
create user auditarchiver identified by 'Th3Archivista!';
grant select, insert on audit_archive.audit_data to auditarchiver;
å¤å¶
æ¥éª¤3 â对äºæ¯å°æå¡å¨è¯»åï¼æåºï¼å®¡è®¡æ°æ®âå建auditreaderå¸æ·
> mysqlsh
\sql
\connect root@<servers to pull audit trail>;
select @@version;
å¤å¶
/ *å¦ææ¨æªè¿è¡mysqlä¼ä¸ç8.0.22ææ´é«çæ¬ï¼è¯·åæ¢æä½å¹¶å级* /
create user auditreader identified by 'Pu11Archivister!';
GRANT AUDIT_ADMIN
ON *.* TO 'auditreader';
å¤å¶
æ¥éª¤4 âå¦æå°æªå®è£ ä¼ä¸å®¡è®¡ï¼è¯·å®è£ ã
å®è£ MySQLä¼ä¸å®¡è®¡
ç®åå°è¯´ï¼æ§è¡ä½äºMySQL shareç®å½ä¸é¢çSQL - audit_log_filter_linux_install.sql
第5æ¥âç¼è¾/etc/my.cnf âæ¤ç¤ºä¾è³å°éè¦å å«å3è¡ã
[mysqld]
plugin-load-add=audit_log.so
audit-log=FORCE_PLUS_PERMANENT
audit-log-format=JSON
audit-log-strategy=SYNCHRONOUS
å¤å¶
éæ°å¯å¨æå¡å¨
æ¥éª¤6 âæ·»å 审计è¿æ»¤å¨å¹¶ç»å®å°ç¨æ·
å¦ææ¨ä»¥å没ææ·»å 审计è¿æ»¤å¨ï¼å以ä¸å 容å°è®°å½ææè¿æ¥ç人ãè¿å°è®°å½å¾å¤å å¦ï¼å¦æåºäºå¨æµè¯ç¯å¢ä¸æ¥çè¿é¡¹å·¥ä½çç®çï¼è¿æ¯åççãå¨ç产ä¸ï¼æ¨å¯è½ä¼å¸ææ´å ·éæ©æ§ã
--- create audit filter - here log everything (this will generate a great deal of data)
SELECT audit_log_filter_set_filter('log_all', '{ "filter": { "log": true } }');
--- attach filter to accounts - here for all users
SELECT audit_log_filter_set_user('%', 'log_all');
å¤å¶
对æ¯ä¸ªå®¡è®¡çå®ä¾éå¤æ¤æä½ã
æ¥éª¤7 âçæä¸äºå®¡è®¡æ°æ®æ´»å¨
以åç§ç¨æ·èº«ä»½å¨å®è£ mysqlä¼ä¸å®¡è®¡çæå¡å¨ä¸è¿è¡ä¸äºSQLæ¥è¯¢ã
æ¥éª¤8 âéæ©ä¸ä¸ªå¯ä»¥å¨æ¹å¤ç模å¼ä¸è°åº¦mysqlshçæå¡å¨
ä¸é¢æ¯æ¹å¤çpythonèæ¬çå·¥ä½æ¹å¼(æåä¼éå¤å并åç代ç 以å¤å¶ãç¼è¾åè¿è¡ï¼ã
请æ´æ¹ä½¿ç¨çå¯ç 并使ç¨ç¹å®çæå¡å¨å称çã
é¦å ï¼æå°ä½¿ç¨mysqlx APIéè¿èªå·±çä¼è¯è¿æ¥å°è¯»åæå¡å¨åå½æ¡£æå¡å¨ã
å°â localhostâæ´æ¹ä¸ºå½æ¡£æå¡å¨çip /主æºåã
archive_session = mysqlx.get_session( {</code> <code>'host': 'localhost', 'port': 33060,</code> <code>'user': 'auditarchiver', 'password': 'Th3Archivista!'} )
å¤å¶
å°â localhostâæ´æ¹ä¸ºå·²å®¡è®¡æå¡å¨çip /主æºåã
read_session = mysqlx.get_session( {
'host': 'localhost', 'port': 33060,
'user': 'auditreader', 'password': 'Pu11Archivister!'} )
å¤å¶
好äºï¼ç°å¨æéè¦ççææ¯å¦æä¹åçå½æ¡£æ°æ®ââè¿æ ·æå°±å¯ä»¥æåºå®¡è®¡æ°æ®ä¸æéè¦å¼å§è¯»åæ´æ°æ°æ®çå°æ¹ãå¦æå½æ¡£ä¸å å«æ¤å®ä¾çæ°æ®âæå°ä»æ¥å¿æ°æ®çå¼å¤´å¼å§ã
å¦æå½æ¡£è¡¨ä¸å å«æ¤å®ä¾çæ°æ®ï¼ç±å ¶server_uuidæ è¯ï¼ï¼åå¨JSONä¸å建带æâstartâçjsonå符串ãâstartâåè¯è¯¥åè½æ§è¡å¸¸è§æ¥ææ¶é´æç´¢ã
ä½æ¯ï¼å¦æå·²ç»å è½½äºå åçæ°æ®ï¼é£ä¹æå°è·å¾æå ¥çæåä¸ä¸ªæ¶é´æ³åäºä»¶IDï¼å¹¶å°å ¶ç¨ä½å®¡è®¡æ°æ®çæéâå¨è¿ç§æ åµä¸ï¼JSONæç´¢å符串ä¸æ²¡æâstartâã
archive_empty = archive_session.run_sql("select count(*) from audit_archive.audit_data limit 1").fetch_one()
if (archive_empty[0] > 0):
print("Data in archive getting last event ts and id")
search_args = archive_session.run_sql("select id, ts from audit_archive.audit_data order by ts desc, id desc limit 1").fetch_one()
x = "set @nextts ='{ \"timestamp\": \"" + str(search_args[1]) + "\",\"id\":" + str(search_args[0]) + ", \"max_array_length\": 100 }'"
setnext = read_session.run_sql(x)
else:
print("The archive is empty - get oldest audit event")
read_session.run_sql("set @nextts='{ \"start\": { \"timestamp\": \"2020-01-01\"}, \"max_array_length\": 100 }'")
å¤å¶
好çï¼æ们ç°å¨ä¸ºæçæç´¢æ¡ä»¶è®¾ç½®äºä¼è¯åéâ @nexttsã
å¦æè¦æ¥çJSONæç´¢å符串
view_nextts = read_session.run_sql("select @nextts")
å¨ä¸ä¸æ¥ä¸ï¼æ¨å°å¨SQLä¸çå°å¯¹audit_log_readç»ä»¶çè°ç¨
AUDIT_LOG_READ(@nextts)
æ¨å°çå°ï¼æå¸æå¨å½æ¡£ä¸ä»¥è¡å½¢å¼åå¨æ°æ®ââå æ¤æ使ç¨JSON_TABLEå½æ°å°JSON转æ¢ä¸ºè¡ã
readaudit = read_session.run_sql(" "
"SELECT @@server_uuid as server_uuid, id, ts, class, event, the_account,login_ip,login_os,login_user,login_proxy,connection_id,db, "
" status,connection_type,connect_os,pid,_client_name,_client_version, "
" program_name,_platform,command,sql_command,command_status,query, "
" query_status,start_server_id,server_os_version,server_mysqlversion,args, "
" account_host,mysql_version,the_os,the_os_ver,server_id "
"FROM "
"JSON_TABLE "
"( "
" AUDIT_LOG_READ(@nextts), "
" '$[*]' "
" COLUMNS "
" ( "
" id INT PATH '$.id', "
" ts TIMESTAMP PATH '$.timestamp', "
" class VARCHAR(20) PATH '$.class', "
" event VARCHAR(80) PATH '$.event', "
" the_account VARCHAR(80) PATH '$.account', "
" login_ip VARCHAR(200) PATH '$.login.ip', "
" login_os VARCHAR(200) PATH '$.login.os', "
" login_user VARCHAR(200) PATH '$.login.user', "
" login_proxy VARCHAR(200) PATH '$.login.proxy', "
" connection_id VARCHAR(80) PATH '$.connection_id', "
" db VARCHAR(40) PATH '$.connection_data.db', "
" status INT PATH '$.connection_data.status', "
" connection_type VARCHAR(40) PATH '$.connection_data.connection_type', "
" connect_os VARCHAR(40) PATH '$.connection_data.connection_attributes._os', "
" pid VARCHAR(40) PATH '$.connection_data.connection_attributes._pid', "
" _client_name VARCHAR(80) PATH '$.connection_data.connection_attributes._client_name', "
" _client_version VARCHAR(80) PATH '$.connection_data.connection_attributes._client_version', "
" program_name VARCHAR(80) PATH '$.connection_data.connection_attributes.program_name', "
" _platform VARCHAR(80) PATH '$.connection_data.connection_attributes._platform', "
" command VARCHAR(40) PATH '$.general_data.command', "
" sql_command VARCHAR(40) PATH '$.general_data.sql_command', "
" command_status VARCHAR(40) PATH '$.general_data.status', "
" query VARCHAR(40) PATH '$.genera_data.query', "
" query_status INT PATH '$.general_data.status', "
" start_server_id VARCHAR(400) PATH '$.startup_data.server_id', "
" server_os_version VARCHAR(100) PATH '$.startup_data.os_version', "
" server_mysqlversion VARCHAR(100) PATH '$.startup_data.mysql_version', "
" args VARCHAR(80) PATH '$.startup_data.args', "
" account_host VARCHAR(80) PATH '$.account.host', "
" mysql_version VARCHAR(80) PATH '$.startup_data.mysql_version', "
" the_os VARCHAR(80) PATH '$.startup_data.os', "
" the_os_ver VARCHAR(80) PATH '$.startup_data.os_version', "
" server_id VARCHAR(80) PATH '$.startup_data.server_id' "
" ) "
") AS auditdata; ")
å¤å¶
ç°å¨æ¨å¯ä»¥ä½¿ç¨ææJSON并å°æ¯ä¸ªäºä»¶åå¨ä¸ºJSONæ°æ®ç±»åãè¿ä¹å¾ç®åãä½å¨è¿éï¼æåå¨å¨ä¸ä¸ªè¡¨ä¸ãç±ä½ å³å®ã
好äºâç°å¨ä½ä¸ºAuditarchiver âæå°ä¿åååæåçæ°æ®ã
è¿æ¯mysqlx apié常æ¹ä¾¿çå°æ¹ãæå¯ä»¥å¾ªç¯æ§è¡ç»æï¼å¹¶ç¨å¾å°ç代ç ä¿åå°è¡¨ä¸ã
aschema=archive_session.get_schema('audit_archive')
atable=aschema.get_table('audit_data')
if (archive_empty[0] > 0):
evt = readaudit.fetch_one_object()
print("Archive was not empty - skip first duplicate event")
else:
print("Archive was empty - load all")
evt = readaudit.fetch_one_object()
while evt:
atable.insert(evt).execute()
evt= readaudit.fetch_one_object()
å¤å¶
æ£å¦æ¨å¯è½å·²ç»æ³¨æå°çé£æ ·âæ并没æå°è¯ä»å®¡è®¡æ¥å¿ä¸ä¸æ¬¡æåè¿å¤ãæ大åäº100ãæ¤å¤è¿åå°ä¸ºâaudit-log-read-buffer-size设置çç¼å²åºå¤§å°çéå¶ã
åè§https://dev.mysql.com/doc/refman/8.0/en/audit-log-reference.html#sysvar_audit_log_read_buffer_size
å æ¤ï¼å°æ¤èæ¬ä¿åå°ç®å½ä¸ã
cdå°ç®å½
ç°å¨ï¼æ¨åªé以æ¹å¤ç模å¼è¿è¡mysqlshã
第ä¸æ¬¡è¿è¡
frank@Mike %
mysqlsh --py < archiveauditbatch
The archive is empty â get oldest audit event
Archive was empty â load all
ä¸ä¸æ¬¡è¿è¡
frank@Mike %
mysqlsh --py < archiveauditbatch
Data in archive getting last event ts and id
Archive was not empty â skip first duplicate event
好ç-ç°å¨æ¨å·²ç»è¿è¡äºä¸äºæµè¯ï¼ä½¿ç¨cronææ¨å欢çè°åº¦ç¨åºå建ä¸ä¸ªè®¡åçæ¹å¤çã让å®å¾ªç¯ç´å°æ¸ 空ã
第9æ¥âç»å½å°å½æ¡£æå¡å¨ï¼æ¥çæ°æ®
select * from audit_archive.audit_data order by server_uuid, id, ts;
è¿è¡ä¸äºç»è®¡
select event, count(event) from audit_archive.audit_data group by event;
select login_user, event, count(event) from audit_archive.audit_data group by login_user, event;
select distinct login_user, sql_command, status, query_status from audit_archive.audit_data ;
å¤å¶
æåâè¿ä¸ç产质é代ç ç¸å·®çè¿ï¼
ææå ¥äºå¯ç ï¼æ²¡æ为审计æå¡å¨æä¾å¾ªç¯åæ¶éæ°æ®çåæ°ã没ææ£æ¥é误çãéç¹æ¯æ¼ç¤ºä¸äºææ¯æ¥å¸®å©å¯¹å ¶è¿è¡å°è¯ç人ã
å¨åç»å客ä¸-
æå°åæ¨å±ç¤ºå¦ä½æ§è¡åå¸é¾ç-è¿æ ·æ¨å°±å¯ä»¥è¯ææ¨ç审计æ°æ®æ¯ä¸å¯åçä¸ä¸å污æã
æè°¢æ¨ä½¿ç¨MySQLãè¿æ¯å®æ´çæ¹å¤çèæ¬ https://mysqlserverteam.com/mysql-audit-data-consolidation-made-simple/archivebatch/