天天看點

資料庫同步 Elasticsearch 後資料不一緻,怎麼辦

大家好,我是老李。在日常資料存儲和查詢時,很多小夥伴都喜歡用ES做索引,很多還把ES當成資料庫來用。誠然ES的讀寫性能非常優秀,但是大家有沒有遇到過ES丢資料的問題?也就是說資料庫和ES的資料不一緻。今天老李正好看在公衆号銘毅天下Elasticsearch上看到一篇介紹這個問題的文章,裡面的内容寫的非常的清楚,把對資料的方法和思路全都理了出來。下面把文章分享給大家,希望能夠使大家在日常工作中少踩一點坑。當然了,能用來填坑就更好了。

1、實戰線上問題

  • Q1:Logstash 同步 postgreSQL 到 Elasticsearch 資料不一緻。

在使用 Logstash 從 pg 庫中将一張表導入到 ES 中時,發現 ES 中的資料量和 PG 庫中的這張表的資料量存在較大差距。如何快速比對哪些資料沒有插入?導入過程中,Logstash 日志沒有異常。PG 中這張表有 7600W。

  • Q2:mq 異步雙寫資料庫、es 的方案中,如何保證資料庫資料和 es 資料的一緻性?

2、推薦解決方案之一——ID 比較法

如下示例,僅拿問題1舉例驗證,問題2原理一緻。

2.1 方案探讨

要找出哪些資料沒有插入到 Elasticsearch 中,可以采用以下方法:

  • 確定 Logstash 配置檔案中的 input 插件的 JDBC 驅動程式正确配置,以便從 PostgreSQL 資料庫中提取所有資料。注意 statement 參數,確定它選擇了所有需要的資料。
  • 檢查 Logstash 配置檔案的 output 插件,確定正确配置了 Elasticsearch 的連接配接參數。同時,檢查是否有過濾器在導入過程中過濾掉了部分資料。
  • 在 Logstash 配置檔案中添加一個 stdout 插件,将從 PostgreSQL 資料庫中讀取的資料記錄到檔案中。

例如,可以添加以下内容:

output {
  elasticsearch {
    ...Elasticsearch 配置...
  }
  stdout {
    codec => json_lines
    path => "/path/to/logstash_output.log"
  }
}
           

将 Logstash 輸出檔案與 PostgreSQL 資料庫中的原始資料進行比較,以找出未導入的資料。可以使用 Python、Shell 腳本或其他程式設計語言編寫一個簡單的腳本來執行此操作。

如果 Logstash 輸出檔案中的記錄數與 PostgreSQL 資料庫中的記錄數一緻,但 Elasticsearch 中的記錄數不一緻,請檢查 Elasticsearch 叢集的健康狀況和日志。确認叢集是否在接收和索引資料時遇到問題。

如果問題仍然存在,嘗試将批量操作的大小減小,以減輕 Elasticsearch 和 Logstash 的負擔。可以通過在 Logstash 配置檔案的 output 插件中設定 flush_size 和 idle_flush_time 參數來實作。

處理大量資料時,可能需要調整 Logstash 和 Elasticsearch 的性能和資源配置。根據硬體和網絡條件,可能需要優化批量操作、JVM 設定、線程池大小等方面的設定。

2.2 比較腳本的實作

以下是一個簡單的 Shell 腳本示例,用于比較 Logstash 輸出檔案(JSON 格式)和 PostgreSQL 資料庫中的資料。該腳本将比較特定字段(如 id)以确定哪些資料可能未導入到 Elasticsearch。

資料庫同步 Elasticsearch 後資料不一緻,怎麼辦

首先,從 PostgreSQL 資料庫中導出資料,将其儲存為 CSV 檔案:

COPY (SELECT id FROM your_table) TO '/path/to/postgres_data.csv' WITH
           

接下來,建立一個名為 compare.sh 的 Shell 腳本:

#!/bin/bash
# 将 JSON 檔案中的 ID 提取到一個檔案中
jq '.id' /path/to/logstash_output.log > logstash_ids.txt

# 删除 JSON 中的雙引号
sed -i 's/"//g' logstash_ids.txt

# 對 Logstash 和 PostgreSQL 的 ID 檔案進行排序
sort -n logstash_ids.txt > logstash_ids_sorted.txt
sort -n /path/to/postgres_data.csv > postgres_ids_sorted.txt

# 使用 comm 比較兩個已排序的 ID 檔案
comm -23 postgres_ids_sorted.txt logstash_ids_sorted.txt > missing_ids.txt

# 輸出結果
echo "以下 ID 在 Logstash 輸出檔案中未找到:"
cat missing_ids.txt
           

為腳本添加可執行權限并運作:

chmod +x compare.sh

./compare.sh
           

此腳本會比較 logstash_output.log 和 postgres_data.csv 檔案中的 ID。如果發現缺失的 ID,它們将被儲存在 missing_ids.txt 檔案中,并輸出到控制台。請注意,該腳本假設已經安裝了 jq(一個指令行 JSON 處理器)。如果沒有,請先安裝 jq。

3、推薦方案二——Redis 加速對比

在這種情況下,可以使用 Redis 的集合資料類型來存儲 PostgreSQL 資料庫和 Logstash 輸出檔案中的 ID。接下來,可以使用 Redis 提供的集合操作來找到缺失的 ID。

資料庫同步 Elasticsearch 後資料不一緻,怎麼辦

以下是一個使用 Redis 實作加速比對的示例:

首先,從 PostgreSQL 資料庫中導出資料,将其儲存為 CSV 檔案:

COPY (SELECT id FROM your_table) TO '/path/to/postgres_data.csv' WITH CSV HEADER;
           

安裝并啟動 Redis。

使用 Python 腳本将 ID 資料加載到 Redis:

import redis
import csv

# 連接配接到 Redis

r = redis.StrictRedis(host='localhost', port=6379, db=0)

# 從 PostgreSQL 導出的 CSV 檔案中加載資料
with open('/path/to/postgres_data.csv', newline='') as csvfile:
    csv_reader = csv.reader(csvfile)
    next(csv_reader)  # 跳過表頭
    for row in csv_reader:
        r.sadd('postgres_ids', row[0])

# 從 Logstash 輸出檔案中加載資料
with open('/path/to/logstash_output.log', newline='') as logstash_file:
    for line in logstash_file:
        id = line.split('"id":')[1].split(',')[0].strip()
        r.sadd('logstash_ids', id)

# 計算差集
missing_ids = r.sdiff('postgres_ids', 'logstash_ids')

# 輸出缺失的 ID
print("以下 ID 在 Logstash 輸出檔案中未找到:")
for missing_id in missing_ids:
    print(missing_id)

           

這個 Python 腳本使用 Redis 集合資料類型存儲 ID,然後計算它們之間的差集以找到缺失的 ID。需要先安裝 Python 的 Redis 庫。可以使用以下指令安裝:

pip install redis
           

這個腳本是一個基本示例,可以根據需要修改和擴充它。使用 Redis 的優點是它能在記憶體中快速處理大量資料,而不需要在磁盤上讀取和寫入臨時檔案。

4、小結

方案一:使用 Shell 腳本和 grep 指令

  • 優點:

(1)簡單,易于實作。

(2)不需要額外的庫或工具。

  • 缺點:

(1)速度較慢,因為它需要在磁盤上讀寫臨時檔案。

(2)對于大資料量的情況,可能會導緻較高的磁盤 I/O 和記憶體消耗。

方案二:使用 Redis 實作加速比對

  • 優點:

(1)速度更快,因為 Redis 是基于記憶體的資料結構存儲。

(2)可擴充性較好,可以處理大量資料。

  • 缺點:

(1)實作相對複雜,需要編寫額外的腳本。

(2)需要安裝和運作 Redis 伺服器。

根據需求和資料量,可以選擇合适的方案。如果處理的資料量較小,且對速度要求不高,可以選擇方案一,使用 Shell 腳本和 grep 指令。這種方法簡單易用,但可能在大資料量下表現不佳。

如果需要處理大量資料,建議選擇方案二,使用 Redis 實作加速比對。這種方法速度更快,能夠有效地處理大資料量。然而,這種方法需要額外的設定和配置,例如安裝 Redis 伺服器和編寫 Python 腳本。

在實際應用中,可能需要根據具體需求進行權衡,以選擇最适合的解決方案。

繼續閱讀