大家好,我是老李。在日常資料存儲和查詢時,很多小夥伴都喜歡用ES做索引,很多還把ES當成資料庫來用。誠然ES的讀寫性能非常優秀,但是大家有沒有遇到過ES丢資料的問題?也就是說資料庫和ES的資料不一緻。今天老李正好看在公衆号銘毅天下Elasticsearch上看到一篇介紹這個問題的文章,裡面的内容寫的非常的清楚,把對資料的方法和思路全都理了出來。下面把文章分享給大家,希望能夠使大家在日常工作中少踩一點坑。當然了,能用來填坑就更好了。
1、實戰線上問題
- Q1:Logstash 同步 postgreSQL 到 Elasticsearch 資料不一緻。
在使用 Logstash 從 pg 庫中将一張表導入到 ES 中時,發現 ES 中的資料量和 PG 庫中的這張表的資料量存在較大差距。如何快速比對哪些資料沒有插入?導入過程中,Logstash 日志沒有異常。PG 中這張表有 7600W。
- Q2:mq 異步雙寫資料庫、es 的方案中,如何保證資料庫資料和 es 資料的一緻性?
2、推薦解決方案之一——ID 比較法
如下示例,僅拿問題1舉例驗證,問題2原理一緻。
2.1 方案探讨
要找出哪些資料沒有插入到 Elasticsearch 中,可以采用以下方法:
- 確定 Logstash 配置檔案中的 input 插件的 JDBC 驅動程式正确配置,以便從 PostgreSQL 資料庫中提取所有資料。注意 statement 參數,確定它選擇了所有需要的資料。
- 檢查 Logstash 配置檔案的 output 插件,確定正确配置了 Elasticsearch 的連接配接參數。同時,檢查是否有過濾器在導入過程中過濾掉了部分資料。
- 在 Logstash 配置檔案中添加一個 stdout 插件,将從 PostgreSQL 資料庫中讀取的資料記錄到檔案中。
例如,可以添加以下内容:
output {
elasticsearch {
...Elasticsearch 配置...
}
stdout {
codec => json_lines
path => "/path/to/logstash_output.log"
}
}
将 Logstash 輸出檔案與 PostgreSQL 資料庫中的原始資料進行比較,以找出未導入的資料。可以使用 Python、Shell 腳本或其他程式設計語言編寫一個簡單的腳本來執行此操作。
如果 Logstash 輸出檔案中的記錄數與 PostgreSQL 資料庫中的記錄數一緻,但 Elasticsearch 中的記錄數不一緻,請檢查 Elasticsearch 叢集的健康狀況和日志。确認叢集是否在接收和索引資料時遇到問題。
如果問題仍然存在,嘗試将批量操作的大小減小,以減輕 Elasticsearch 和 Logstash 的負擔。可以通過在 Logstash 配置檔案的 output 插件中設定 flush_size 和 idle_flush_time 參數來實作。
處理大量資料時,可能需要調整 Logstash 和 Elasticsearch 的性能和資源配置。根據硬體和網絡條件,可能需要優化批量操作、JVM 設定、線程池大小等方面的設定。
2.2 比較腳本的實作
以下是一個簡單的 Shell 腳本示例,用于比較 Logstash 輸出檔案(JSON 格式)和 PostgreSQL 資料庫中的資料。該腳本将比較特定字段(如 id)以确定哪些資料可能未導入到 Elasticsearch。
首先,從 PostgreSQL 資料庫中導出資料,将其儲存為 CSV 檔案:
COPY (SELECT id FROM your_table) TO '/path/to/postgres_data.csv' WITH
接下來,建立一個名為 compare.sh 的 Shell 腳本:
#!/bin/bash
# 将 JSON 檔案中的 ID 提取到一個檔案中
jq '.id' /path/to/logstash_output.log > logstash_ids.txt
# 删除 JSON 中的雙引号
sed -i 's/"//g' logstash_ids.txt
# 對 Logstash 和 PostgreSQL 的 ID 檔案進行排序
sort -n logstash_ids.txt > logstash_ids_sorted.txt
sort -n /path/to/postgres_data.csv > postgres_ids_sorted.txt
# 使用 comm 比較兩個已排序的 ID 檔案
comm -23 postgres_ids_sorted.txt logstash_ids_sorted.txt > missing_ids.txt
# 輸出結果
echo "以下 ID 在 Logstash 輸出檔案中未找到:"
cat missing_ids.txt
為腳本添加可執行權限并運作:
chmod +x compare.sh
./compare.sh
此腳本會比較 logstash_output.log 和 postgres_data.csv 檔案中的 ID。如果發現缺失的 ID,它們将被儲存在 missing_ids.txt 檔案中,并輸出到控制台。請注意,該腳本假設已經安裝了 jq(一個指令行 JSON 處理器)。如果沒有,請先安裝 jq。
3、推薦方案二——Redis 加速對比
在這種情況下,可以使用 Redis 的集合資料類型來存儲 PostgreSQL 資料庫和 Logstash 輸出檔案中的 ID。接下來,可以使用 Redis 提供的集合操作來找到缺失的 ID。
以下是一個使用 Redis 實作加速比對的示例:
首先,從 PostgreSQL 資料庫中導出資料,将其儲存為 CSV 檔案:
COPY (SELECT id FROM your_table) TO '/path/to/postgres_data.csv' WITH CSV HEADER;
安裝并啟動 Redis。
使用 Python 腳本将 ID 資料加載到 Redis:
import redis
import csv
# 連接配接到 Redis
r = redis.StrictRedis(host='localhost', port=6379, db=0)
# 從 PostgreSQL 導出的 CSV 檔案中加載資料
with open('/path/to/postgres_data.csv', newline='') as csvfile:
csv_reader = csv.reader(csvfile)
next(csv_reader) # 跳過表頭
for row in csv_reader:
r.sadd('postgres_ids', row[0])
# 從 Logstash 輸出檔案中加載資料
with open('/path/to/logstash_output.log', newline='') as logstash_file:
for line in logstash_file:
id = line.split('"id":')[1].split(',')[0].strip()
r.sadd('logstash_ids', id)
# 計算差集
missing_ids = r.sdiff('postgres_ids', 'logstash_ids')
# 輸出缺失的 ID
print("以下 ID 在 Logstash 輸出檔案中未找到:")
for missing_id in missing_ids:
print(missing_id)
這個 Python 腳本使用 Redis 集合資料類型存儲 ID,然後計算它們之間的差集以找到缺失的 ID。需要先安裝 Python 的 Redis 庫。可以使用以下指令安裝:
pip install redis
這個腳本是一個基本示例,可以根據需要修改和擴充它。使用 Redis 的優點是它能在記憶體中快速處理大量資料,而不需要在磁盤上讀取和寫入臨時檔案。
4、小結
方案一:使用 Shell 腳本和 grep 指令
- 優點:
(1)簡單,易于實作。
(2)不需要額外的庫或工具。
- 缺點:
(1)速度較慢,因為它需要在磁盤上讀寫臨時檔案。
(2)對于大資料量的情況,可能會導緻較高的磁盤 I/O 和記憶體消耗。
方案二:使用 Redis 實作加速比對
- 優點:
(1)速度更快,因為 Redis 是基于記憶體的資料結構存儲。
(2)可擴充性較好,可以處理大量資料。
- 缺點:
(1)實作相對複雜,需要編寫額外的腳本。
(2)需要安裝和運作 Redis 伺服器。
根據需求和資料量,可以選擇合适的方案。如果處理的資料量較小,且對速度要求不高,可以選擇方案一,使用 Shell 腳本和 grep 指令。這種方法簡單易用,但可能在大資料量下表現不佳。
如果需要處理大量資料,建議選擇方案二,使用 Redis 實作加速比對。這種方法速度更快,能夠有效地處理大資料量。然而,這種方法需要額外的設定和配置,例如安裝 Redis 伺服器和編寫 Python 腳本。
在實際應用中,可能需要根據具體需求進行權衡,以選擇最适合的解決方案。