天天看點

etcd的使用

  • etcd的使用
    • 什麼是etcd
    • etcd的特點
    • etcd的應用場景
      • 服務注冊與發現
      • 消息釋出和訂閱
      • 負載均衡
      • 分布式通知與協調
      • 分布式鎖
      • 分布式隊列
      • 叢集監控與Leader競選
    • 參考

etcd的使用

什麼是etcd

ETCD是一個分布式、可靠的

key-value

存儲的分布式系統,用于存儲分布式系統中的關鍵資料;當然,它不僅僅用于存儲,還提供配置共享及服務發現;基于Go語言實作 。

etcd的特點

  • 完全複制:叢集中的每個節點都可以使用完整的存檔
  • 高可用性:Etcd可用于避免硬體的單點故障或網絡問題
  • 一緻性:每次讀取都會傳回跨多主機的最新寫入
  • 簡單:包括一個定義良好、面向使用者的API(gRPC)
  • 安全:實作了帶有可選的用戶端證書身份驗證的自動化TLS
  • 可靠:使用Raft算法實作了強一緻、高可用的服務存儲目錄

etcd的應用場景

服務注冊與發現

服務發現還能注冊

服務注冊發現解決的是分布式系統中最常見的問題之一,即在同一個分布式系統中,找到我們需要的目标服務,建立連接配接,然後完成整個鍊路的排程。

本質上來說,服務發現就是想要了解叢集中是否有程序在監聽 udp 或 tcp 端口,并且通過名字就可以查找和連接配接。要解決服務發現的問題,需要有下面三大支柱,缺一不可。

1、一個強一緻性、高可用的服務存儲目錄。基于Raft算法的etcd天生就是這樣一個強一緻性高可用的服務存儲目錄。

2、一種注冊服務和監控服務健康狀态的機制。使用者可以在etcd中注冊服務,并且對注冊的服務設定

key TTL

,定時保持服務的心跳以達到監控健康狀态的效果。

3、一種查找和連接配接服務的機制。通過在 etcd 指定的主題下注冊的服務也能在對應的主題下查找到。為了確定連接配接,我們可以在每個服務機器上都部署一個Proxy模式的etcd,這樣就可以確定能通路etcd叢集的服務都能互相連接配接。

etcd的使用

一個使用者的api請求,可能調用多個微服務資源,這些服務我們可以使用etcd進行服務注冊和服務發現,當每個服務啟動的時候就注冊到etcd中,當我們需要使用的時候,直接在etcd中尋找,調用即可。

當然,每個服務的執行個體不止一個,比如我們的使用者服務,我們可能啟動了多個執行個體,這些執行個體在服務啟動過程中全部注冊到了etcd中,但是某個執行個體可能出現故障重新開機,這時候就etcd在進行轉發的時候,就會屏蔽到故障的執行個體節點,隻向正常運作的執行個體,進行請求轉發。

etcd的使用

來看個服務注冊發現的demo

這裡放一段比較核心的代碼,這裡摘錄了我們線上正在使用的etcd實作grpc服務注冊和發現的實作,具體的實作可參考,etcd實作grpc的服務注冊和服務發現

對于etcd中的連接配接,我們每個都維護一個租約,通過KeepAlive自動續保。如果租約過期則所有附加在租約上的key将過期并被删除,即所對應的服務被拿掉。

package discovery

import (
	"context"
	"encoding/json"
	"errors"
	"net/http"
	"strconv"
	"strings"
	"time"

	clientv3 "go.etcd.io/etcd/client/v3"
	"go.uber.org/zap"
)

// Register for grpc server
type Register struct {
	EtcdAddrs   []string
	DialTimeout int

	closeCh     chan struct{}
	leasesID    clientv3.LeaseID
	keepAliveCh <-chan *clientv3.LeaseKeepAliveResponse

	srvInfo Server
	srvTTL  int64
	cli     *clientv3.Client
	logger  *zap.Logger
}

// NewRegister create a register base on etcd
func NewRegister(etcdAddrs []string, logger *zap.Logger) *Register {
	return &Register{
		EtcdAddrs:   etcdAddrs,
		DialTimeout: 3,
		logger:      logger,
	}
}

// Register a service
func (r *Register) Register(srvInfo Server, ttl int64) (chan<- struct{}, error) {
	var err error

	if strings.Split(srvInfo.Addr, ":")[0] == "" {
		return nil, errors.New("invalid ip")
	}

	if r.cli, err = clientv3.New(clientv3.Config{
		Endpoints:   r.EtcdAddrs,
		DialTimeout: time.Duration(r.DialTimeout) * time.Second,
	}); err != nil {
		return nil, err
	}

	r.srvInfo = srvInfo
	r.srvTTL = ttl

	if err = r.register(); err != nil {
		return nil, err
	}

	r.closeCh = make(chan struct{})

	go r.keepAlive()

	return r.closeCh, nil
}

// Stop stop register
func (r *Register) Stop() {
	r.closeCh <- struct{}{}
}

// register 注冊節點
func (r *Register) register() error {
	leaseCtx, cancel := context.WithTimeout(context.Background(), time.Duration(r.DialTimeout)*time.Second)
	defer cancel()

	leaseResp, err := r.cli.Grant(leaseCtx, r.srvTTL)
	if err != nil {
		return err
	}
	r.leasesID = leaseResp.ID
	if r.keepAliveCh, err = r.cli.KeepAlive(context.Background(), leaseResp.ID); err != nil {
		return err
	}

	data, err := json.Marshal(r.srvInfo)
	if err != nil {
		return err
	}
	_, err = r.cli.Put(context.Background(), BuildRegPath(r.srvInfo), string(data), clientv3.WithLease(r.leasesID))
	return err
}

// unregister 删除節點
func (r *Register) unregister() error {
	_, err := r.cli.Delete(context.Background(), BuildRegPath(r.srvInfo))
	return err
}

// keepAlive
func (r *Register) keepAlive() {
	ticker := time.NewTicker(time.Duration(r.srvTTL) * time.Second)
	for {
		select {
		case <-r.closeCh:
			if err := r.unregister(); err != nil {
				r.logger.Error("unregister failed", zap.Error(err))
			}
			if _, err := r.cli.Revoke(context.Background(), r.leasesID); err != nil {
				r.logger.Error("revoke failed", zap.Error(err))
			}
			return
		case res := <-r.keepAliveCh:
			if res == nil {
				if err := r.register(); err != nil {
					r.logger.Error("register failed", zap.Error(err))
				}
			}
		case <-ticker.C:
			if r.keepAliveCh == nil {
				if err := r.register(); err != nil {
					r.logger.Error("register failed", zap.Error(err))
				}
			}
		}
	}
}
           

消息釋出和訂閱

在分布式系統中,最适用的一種元件間通信方式就是消息釋出與訂閱。即建構一個配置共享中心,資料提供者在這個配置中心釋出消息,而消息使用者則訂閱他們關心的主題,一旦主題有消息釋出,就會實時通知訂閱者。通過這種方式可以做到分布式系統配置的集中式管理與動态更新

  • 應用中用到的一些配置資訊放到 etcd 上進行集中管理。這類場景的使用方式通常是這樣:應用在啟動的時候主動從etcd擷取一次配置資訊,同時,在etcd節點上注冊一個Watcher并等待,以後每次配置有更新的時候,etcd都會實時通知訂閱者,以此達到擷取最新配置資訊的目的。
  • 分布式搜尋服務中,索引的元資訊和伺服器叢集機器的節點狀态存放在etcd中,供各個用戶端訂閱使用。使用etcd的

    key TTL

    功能可以確定機器狀态是實時更新的。
  • 分布式日志收集系統。這個系統的核心工作是收集分布在不同機器的日志。收集器通常是按照應用(或主題)來配置設定收集任務單元,是以可以在 etcd 上建立一個以應用(主題)命名的目錄 P,并将這個應用(主題相關)的所有機器 ip,以子目錄的形式存儲到目錄 P 上,然後設定一個etcd遞歸的Watcher,遞歸式的監控應用(主題)目錄下所有資訊的變動。這樣就實作了機器 IP(消息)變動的時候,能夠實時通知到收集器調整任務配置設定。
  • 系統中資訊需要動态自動擷取與人工幹預修改資訊請求内容的情況。通常是暴露出接口,例如 JMX 接口,來擷取一些運作時的資訊。引入 etcd 之後,就不用自己實作一套方案了,隻要将這些資訊存放到指定的 etcd 目錄中即可,etcd 的這些目錄就可以通過 HTTP 的接口在外部通路。
etcd的使用

消息釋出被訂閱的實際應用

我們一個性能要求比較高的項目,所需要的配置資訊,存放到本地的localCache中,通過etcd的消息釋出和訂閱實作,實作配置資訊在不同節點同步更新。

來看下如何實作

func init() {
	handleMap = make(map[string]func([]byte) error)
}

var handleMap map[string]func([]byte) error

func RegisterUpdateHandle(key string, f func([]byte) error) {
	handleMap[key] = f
}

type PubClient interface {
	Pub(ctx context.Context, key string, val string) error
}

var Pub PubClient

type PubClientImpl struct {
	client *clientv3.Client
	logger *zap.Logger
	prefix string
}

// 監聽變化,實時更新到本地的map中
func (c *PubClientImpl) Watcher() {
	ctx, cancel := context.WithCancel(context.Background())
	rch := c.client.Watch(ctx, c.prefix, clientv3.WithPrefix())
	defer cancel()

	for wresp := range rch {
		for _, ev := range wresp.Events {
			switch ev.Type {
			case mvccpb.PUT:
				c.logger.Warn("Cache Update", zap.Any("value", ev.Kv))
				err := handleCacheUpdate(ev.Kv)
				if err != nil {
					c.logger.Error("Cache Update", zap.Error(err))
				}
			case mvccpb.DELETE:
				c.logger.Error("Cache Delete NOT SUPPORT")
			}
		}
	}
}

func handleCacheUpdate(val *mvccpb.KeyValue) error {
	if val == nil {
		return nil
	}
	f := handleMap[string(val.Key)]
	if f != nil {
		return f(val.Value)
	}
	return nil
}

func (c *PubClientImpl) Pub(ctx context.Context, key string, val string) error {
	ctx, _ = context.WithTimeout(ctx, time.Second*10)
	_, err := c.client.Put(ctx, key, val)
	if err != nil {
		return err
	}
	return nil
}
           

負載均衡

關于負載均衡,通常意義上有兩種

  • 軟負載,顧名思義就是靠軟體手段來實作的負載均衡。軟負載也通常被稱為 4層或 7 層負載!
  • 硬負載,就是靠硬體實作的負載均衡,資料包轉發功能。常見的就是 F5。

通過etcd實作的負載均衡就是軟負載,在分布式系統中,高并發的場景下,我們通常會建構服務的叢集,當某一個機器當機了,别的機器可以馬上頂替上來。

etcd中實作負載均衡,例如我們上文的例子服務注冊和發現,對于一個使用者服務來講,後面的使用者服務的執行個體可能是多個,每個都有自己的ip和port,這些服務會在項目啟動的時候全部注冊到etcd中,是以當使用的時候,每次etcd會輪詢出一個健康的服務執行個體,來處理使用者的請求。

etcd的使用

分布式通知與協調

這裡說到的分布式通知與協調,與消息釋出和訂閱有些相似。都用到了etcd中的Watcher機制,通過注冊與異步通知機制,實作分布式環境下不同系統之間的通知與協調,進而對資料變更做到實時處理。實作方式通常是這樣:不同系統都在etcd上對同一個目錄進行注冊,同時設定Watcher觀測該目錄的變化(如果對子目錄的變化也有需要,可以設定遞歸模式),當某個系統更新了etcd的目錄,那麼設定了Watcher的系統就會收到通知,并作出相應處理。

  • 通過etcd進行低耦合的心跳檢測。檢測系統和被檢測系統通過 etcd 上某個目錄關聯而非直接關聯起來,這樣可以大大減少系統的耦合性。
  • 通過etcd完成系統排程。某系統有控制台和推送系統兩部分組成,控制台的職責是控制推送系統進行相應的推送工作。管理人員在控制台作的一些操作,實際上是修改了etcd上某些目錄節點的狀态,而etcd就把這些變化通知給注冊了Watcher的推送系統用戶端,推送系統再作出相應的推送任務。
  • 通過etcd完成工作彙報。大部分類似的任務分發系統,子任務啟動後,到etcd來注冊一個臨時工作目錄,并且定時将自己的進度進行彙報(将進度寫入到這個臨時目錄),這樣任務管理者就能夠實時知道任務進度。
etcd的使用

分布式鎖

因為etcd使用Raft算法保持了資料的強一緻性,某次操作存儲到叢集中的值必然是全局一緻的,是以很容易實作分布式鎖。鎖服務有兩種使用方式,一是保持獨占,二是控制時序。

首先,來看一下分布式鎖應該具備哪些條件。

  • 互斥性:在任意時刻,對于同一個鎖,隻有一個用戶端能持有,進而保證一個共享資源同一時間隻能被一個用戶端操作;
  • 安全性:即不會形成死鎖,當一個用戶端在持有鎖的期間崩潰而沒有主動解鎖的情況下,其持有的鎖也能夠被正确釋放,并保證後續其它用戶端能加鎖;
  • 可用性:當提供鎖服務的節點發生當機等不可恢複性故障時,“熱備” 節點能夠接替故障的節點繼續提供服務,并保證自身持有的資料與故障節點一緻。
  • 對稱性:對于任意一個鎖,其加鎖和解鎖必須是同一個用戶端,即用戶端 A 不能把用戶端 B 加的鎖給解了。

etcd的 Watch 機制、Lease 機制、Revision 機制和 Prefix 機制,這些機制賦予了 Etcd 實作分布式鎖的能力。

  • Lease 機制

即租約機制(TTL,Time To Live),Etcd 可以為存儲的 Key-Value 對設定租約,當租約到期,Key-Value 将失效删除;同時也支援續約,通過用戶端可以在租約到期之前續約,以避免 Key-Value 對過期失效。Lease 機制可以保證分布式鎖的安全性,為鎖對應的 Key 配置租約,即使鎖的持有者因故障而不能主動釋放鎖,鎖也會因租約到期而自動釋放。

  • Revision 機制

每個 Key 帶有一個 Revision 号,每進行一次事務便加一,是以它是全局唯一的,如初始值為 0,進行一次 put(key, value),Key 的 Revision 變為 1,同樣的操作,再進行一次,Revision 變為 2;換成 key1 進行

put(key1, value)

操作,Revision将變為 3;這種機制有一個作用:通過 Revision 的大小就可以知道寫操作的順序。在實作分布式鎖時,多個用戶端同時搶鎖,根據 Revision 号大小依次獲得鎖,可以避免 “羊群效應” (也稱“驚群效應”),實作公平鎖。

  • Prefix 機制

即字首機制,也稱目錄機制,例如,一個名為

/mylock

的鎖,兩個争搶它的用戶端進行寫操作,實際寫入的Key分别為:

key1="/mylock/UUID1"

,

key2="/mylock/UUID2"

,其中,UUID表示全局唯一的ID,確定兩個Key的唯一性。很顯然,寫操作都會成功,但傳回的Revision不一樣,那麼,如何判斷誰獲得了鎖呢?通過字首

“/mylock”

查詢,傳回包含兩個Key-Value對的Key-Value清單,同時也包含它們的Revision,通過Revision大小,用戶端可以判斷自己是否獲得鎖,如果搶鎖失敗,則等待鎖釋放(對應的 Key 被删除或者租約過期),然後再判斷自己是否可以獲得鎖。

  • Watch 機制

即監聽機制,Watch機制支援監聽某個固定的Key,也支援監聽一個範圍(字首機制),當被監聽的Key或範圍發生變化,用戶端将收到通知;在實作分布式鎖時,如果搶鎖失敗,可通過Prefix機制傳回的Key-Value清單獲得Revision比自己小且相差最小的 Key(稱為 Pre-Key),對Pre-Key進行監聽,因為隻有它釋放鎖,自己才能獲得鎖,如果監聽到Pre-Key的DELETE事件,則說明Pre-Key已經釋放,自己已經持有鎖。

來看下etcd中鎖是如何實作的

client/v3/concurrency/mutex.go

// Mutex implements the sync Locker interface with etcd
type Mutex struct {
	s *Session

	pfx   string // 字首
	myKey string // key
	myRev int64 // 自增的Revision
	hdr   *pb.ResponseHeader
}

// Lock 使用可取消的context鎖定互斥鎖。如果context被取消
// 在嘗試擷取鎖時,互斥鎖會嘗試清除其過時的鎖條目。
func (m *Mutex) Lock(ctx context.Context) error {
	resp, err := m.tryAcquire(ctx)
	if err != nil {
		return err
	}
	// if no key on prefix / the minimum rev is key, already hold the lock
	ownerKey := resp.Responses[1].GetResponseRange().Kvs
	if len(ownerKey) == 0 || ownerKey[0].CreateRevision == m.myRev {
		m.hdr = resp.Header
		return nil
	}
	client := m.s.Client()

	// waitDeletes 有效地等待,直到所有鍵比對字首且不大于
	// 建立的version。
	_, werr := waitDeletes(ctx, client, m.pfx, m.myRev-1)
	// release lock key if wait failed
	if werr != nil {
		m.Unlock(client.Ctx())
		return werr
	}

	// make sure the session is not expired, and the owner key still exists.
	gresp, werr := client.Get(ctx, m.myKey)
	if werr != nil {
		m.Unlock(client.Ctx())
		return werr
	}

	if len(gresp.Kvs) == 0 { // is the session key lost?
		return ErrSessionExpired
	}
	m.hdr = gresp.Header

	return nil
}

func (m *Mutex) tryAcquire(ctx context.Context) (*v3.TxnResponse, error) {
	s := m.s
	client := m.s.Client()
	// s.Lease()租約
	m.myKey = fmt.Sprintf("%s%x", m.pfx, s.Lease())
	// 比較Revision, 這裡建構了一個比較表達式
	// 具體的比較邏輯在下面的client.Txn用到
	// 如果等于0,寫入目前的key,并設定租約,
	// 否則擷取這個key,重用租約中的鎖(這裡主要目的是在于重入)
	// 通過第二次擷取鎖,判斷鎖是否存在來支援重入
	// 是以隻要租約一緻,那麼是可以重入的.
	cmp := v3.Compare(v3.CreateRevision(m.myKey), "=", 0)
	// 通過 myKey 将自己鎖在waiters;最早的waiters将獲得鎖
	put := v3.OpPut(m.myKey, "", v3.WithLease(s.Lease()))
	// 擷取已經拿到鎖的key的資訊
	get := v3.OpGet(m.myKey)
	// 僅使用一個 RPC 擷取目前持有者以完成無競争路徑
	getOwner := v3.OpGet(m.pfx, v3.WithFirstCreate()...)
	// 這裡是比較的邏輯,如果等于0,寫入目前的key,否則則讀取這個key
	// 大佬的代碼寫的就是奇妙
	resp, err := client.Txn(ctx).If(cmp).Then(put, getOwner).Else(get, getOwner).Commit()
	if err != nil {
		return nil, err
	}

	// 根據比較操作的結果寫入Revision到m.myRev中
	m.myRev = resp.Header.Revision
	if !resp.Succeeded {
		m.myRev = resp.Responses[0].GetResponseRange().Kvs[0].CreateRevision
	}
	return resp, nil
}

// 抽象出了一個session對象來持續保持租約不過期
func NewSession(client *v3.Client, opts ...SessionOption) (*Session, error) {
	...
	ctx, cancel := context.WithCancel(ops.ctx)
	// 保證鎖,線上程的活動期間,實作鎖的的續租
	keepAlive, err := client.KeepAlive(ctx, id)
	if err != nil || keepAlive == nil {
		cancel()
		return nil, err
	}

	...
	return s, nil
}
           

設計思路:

1、多個請求來前搶占鎖,通過Revision來判斷鎖的先後順序;

2、如果有比目前key的Revision小的Revision存在,說明有key已經獲得了鎖;

3、等待直到前面的key被删除,然後自己就獲得了鎖。

通過etcd實作的鎖,直接包含了鎖的續租,如果使用Redis還要自己去實作,相比較使用更簡單。

etcd的使用

來實作一個etcd的鎖

package main

import (
	"context"
	"fmt"
	"log"
	"time"

	clientv3 "go.etcd.io/etcd/client/v3"
	"go.etcd.io/etcd/client/v3/concurrency"
)

func main() {
	cli, err := clientv3.New(clientv3.Config{
		Endpoints:   []string{"localhost:2379"},
		DialTimeout: 5 * time.Second,
	})
	if err != nil {
		log.Fatal(err)
	}
	defer cli.Close()

	// m1來搶鎖
	go func() {
		s1, err := concurrency.NewSession(cli)
		if err != nil {
			log.Fatal(err)
		}
		defer s1.Close()
		m1 := concurrency.NewMutex(s1, "/my-lock/")

		// acquire lock for s1
		if err := m1.Lock(context.TODO()); err != nil {
			log.Fatal(err)
		}
		fmt.Println("m1---獲得了鎖")

		time.Sleep(time.Second * 3)

		// 釋放鎖
		if err := m1.Unlock(context.TODO()); err != nil {
			log.Fatal(err)
		}
		fmt.Println("m1++釋放了鎖")
	}()

	// m2來搶鎖
	go func() {
		s2, err := concurrency.NewSession(cli)
		if err != nil {
			log.Fatal(err)
		}
		defer s2.Close()
		m2 := concurrency.NewMutex(s2, "/my-lock/")
		if err := m2.Lock(context.TODO()); err != nil {
			log.Fatal(err)
		}
		fmt.Println("m2---獲得了鎖")

		// mock業務執行的時間
		time.Sleep(time.Second * 3)

		// 釋放鎖
		if err := m2.Unlock(context.TODO()); err != nil {
			log.Fatal(err)
		}

		fmt.Println("m2++釋放了鎖")
	}()

	time.Sleep(time.Second * 10)
}
           

列印下輸出

m2---獲得了鎖
m2++釋放了鎖
m1---獲得了鎖
m1++釋放了鎖
           

分布式隊列

即建立一個先進先出的隊列保持順序。另一種比較有意思的實作是在保證隊列達到某個條件時再統一按順序執行。這種方法的實作可以在

/queue

這個目錄中另外建立一個

/queue/condition

節點。

  • condition 可以表示隊列大小。比如一個大的任務需要很多小任務就緒的情況下才能執行,每次有一個小任務就緒,就給這個 condition 數字加 1,直到達到大任務規定的數字,再開始執行隊列裡的一系列小任務,最終執行大任務。
  • condition 可以表示某個任務在不在隊列。這個任務可以是所有排序任務的首個執行程式,也可以是拓撲結構中沒有依賴的點。通常,必須執行這些任務後才能執行隊列中的其他任務。
  • condition 還可以表示其它的一類開始執行任務的通知。可以由控制程式指定,當 condition 出現變化時,開始執行隊列任務。

來看下實作

入隊

func newUniqueKV(kv v3.KV, prefix string, val string) (*RemoteKV, error) {
	for {
		newKey := fmt.Sprintf("%s/%v", prefix, time.Now().UnixNano())
		// 建立對應的key
		rev, err := putNewKV(kv, newKey, val, v3.NoLease)
		if err == nil {
			return &RemoteKV{kv, newKey, rev, val}, nil
		}
		// 如果之前已經建立了,就傳回
		if err != ErrKeyExists {
			return nil, err
		}
	}
}

// 隻有在沒有建立的時候才能建立成功
func putNewKV(kv v3.KV, key, val string, leaseID v3.LeaseID) (int64, error) {
	cmp := v3.Compare(v3.Version(key), "=", 0)
	req := v3.OpPut(key, val, v3.WithLease(leaseID))
	// 這裡還用到了這種比較的邏輯
	txnresp, err := kv.Txn(context.TODO()).If(cmp).Then(req).Commit()
	if err != nil {
		return 0, err
	}
	// 已經存在則傳回錯誤
	if !txnresp.Succeeded {
		return 0, ErrKeyExists
	}
	return txnresp.Header.Revision, nil
}
           

出隊

// Dequeue處理的是一個先進新出的隊列
// 如果隊列為空,Dequeue将會阻塞直到裡面有值塞入
func (q *Queue) Dequeue() (string, error) {
	resp, err := q.client.Get(q.ctx, q.keyPrefix, v3.WithFirstRev()...)
	if err != nil {
		return "", err
	}

	kv, err := claimFirstKey(q.client, resp.Kvs)
	if err != nil {
		return "", err
	} else if kv != nil {
		return string(kv.Value), nil
		// more 表示在請求的範圍内是否還有更多的鍵要傳回。
		// 則進行Dequeue重試
	} else if resp.More {
		// missed some items, retry to read in more
		return q.Dequeue()
	}

	// nothing yet; wait on elements
	ev, err := WaitPrefixEvents(
		q.client,
		q.keyPrefix,
		resp.Header.Revision,
		[]mvccpb.Event_EventType{mvccpb.PUT})
	if err != nil {
		return "", err
	}

	ok, err := deleteRevKey(q.client, string(ev.Kv.Key), ev.Kv.ModRevision)
	if err != nil {
		return "", err
	} else if !ok {
		// 如果删除失敗,重試
		return q.Dequeue()
	}
	return string(ev.Kv.Value), err
}
           

總結

1、這裡的入隊是一個先進新出的隊列;

2、出隊的實作也很簡單,如果隊列為空,Dequeue将會阻塞直到裡面有值塞入;

來個demo

package main

import (
	"fmt"
	"log"
	"time"

	clientv3 "go.etcd.io/etcd/client/v3"
	recipe "go.etcd.io/etcd/client/v3/experimental/recipes"
)

func main() {
	cli, err := clientv3.New(clientv3.Config{
		Endpoints: []string{"localhost:2379"},
	})
	if err != nil {
		log.Fatalf("error New (%v)", err)
	}

	go func() {
		q := recipe.NewQueue(cli, "testq")
		for i := 0; i < 5; i++ {
			if err := q.Enqueue(fmt.Sprintf("%d", i)); err != nil {
				log.Fatalf("error enqueuing (%v)", err)
			}
		}
	}()

	go func() {
		q := recipe.NewQueue(cli, "testq")
		for i := 10; i < 100; i++ {
			if err := q.Enqueue(fmt.Sprintf("%d", i)); err != nil {
				log.Fatalf("error enqueuing (%v)", err)
			}
		}
	}()

	q := recipe.NewQueue(cli, "testq")
	for i := 0; i < 100; i++ {
		s, err := q.Dequeue()
		if err != nil {
			log.Fatalf("error dequeueing (%v)", err)
		}
		fmt.Println(s)
	}

	time.Sleep(time.Second * 3)
}
           

叢集監控與Leader競選

通過etcd來進行監控實作起來非常簡單并且實時性強

1、前面幾個場景已經提到Watcher機制,當某個節點消失或有變動時,Watcher會第一時間發現并告知使用者。

2、節點可以設定

TTL key

,比如每隔 30s 發送一次心跳使代表該機器存活的節點繼續存在,否則節點消失。

這樣就可以第一時間檢測到各節點的健康狀态,以完成叢集的監控要求

另外,使用分布式鎖,可以完成Leader競選。這種場景通常是一些長時間CPU計算或者使用IO操作的機器,隻需要競選出的Leader計算或處理一次,就可以把結果複制給其他的Follower。進而避免重複勞動,節省計算資源。

這個的經典場景是

搜尋系統中建立全量索引

。如果每個機器都進行一遍索引的建立,不但耗時而且建立索引的一緻性不能保證。通過在etcd的CAS機制同時建立一個節點,建立成功的機器作為Leader,進行索引計算,然後把計算結果分發到其它節點。

etcd的使用

參考

【一文入門ETCD】https://juejin.cn/post/6844904031186321416

【etcd:從應用場景到實作原理的全方位解讀】https://www.infoq.cn/article/etcd-interpretation-application-scenario-implement-principle

【Etcd 架構與實作解析】http://jolestar.com/etcd-architecture/

【linux單節點和叢集的etcd】https://www.jianshu.com/p/07ca88b6ff67

【軟負載均衡與硬負載均衡、4層與7層負載均衡】https://cloud.tencent.com/developer/article/1446391

【Etcd Lock詳解】https://tangxusc.github.io/blog/2019/05/etcd-lock詳解/

【etcd基礎與使用】https://zhuyasen.com/post/etcd.html

【ETCD核心機制解析】https://www.cnblogs.com/FG123/p/13632095.html

【etcd watch機制】http://liangjf.top/2019/12/31/110.etcd-watch機制分析/

【ETCD 源碼學習--Watch(server)】https://www.codeleading.com/article/15455457381/

【etcdV3—watcher服務端源碼解析】https://blog.csdn.net/stayfoolish_yj/article/details/104497233