天天看點

redis用戶端、分布式鎖及資料一緻性

  Redis Java用戶端有很多的開源産品比如Redission、Jedis、lettuce等。

  Jedis是Redis的Java實作的用戶端,其API提供了比較全面的Redis指令的支援;Redisson實作了分布式和可擴充的Java資料結構,和Jedis相比,功能較為簡單,不支援字元串操作,不支援排序、事務、管道、分區等Redis特性。Redisson主要是促進使用者對Redis的關注分離,進而讓使用者能夠将精力更集中地放在處理業務邏輯上。由于常用的是jedis,是以這邊使用jedis作為示範。

jedis-sentinel原理:

  用戶端通過連接配接到哨兵叢集,通過發送Protocol.SENTINEL_GET_MASTER_ADDR_BY_NAME 指令,從哨兵機器中詢問master節點的資訊,拿到master節點的ip和端口号以後,再到用戶端發起連接配接。連接配接以後,需要在用戶端建立監聽機制,當master重新選舉之後,用戶端需要重新連接配接到新的master節點。

  構造器代碼如下:

public JedisSentinelPool(String masterName, Set<String> sentinels, final GenericObjectPoolConfig poolConfig, int timeout, final String password, final int database) {

        this.poolConfig = poolConfig;
        this.timeout = timeout;
        this.password = password;
        this.database = database;

        HostAndPort master = initSentinels(sentinels, masterName);
        initPool(master);
}
      

  其中 masterName 為配置 sentinels的時候再sentinel.conf 所配置的master的名稱。 

initSentinels方法:

private HostAndPort initSentinels(Set<String> sentinels, final String masterName) {

    HostAndPort master = null;
    boolean sentinelAvailable = false;

    log.info("Trying to find master from available Sentinels...");
    // 有多個sentinels,周遊這些個sentinels
    for (String sentinel : sentinels) {
     // host:port表示的sentinel位址轉化為一個HostAndPort對象。
      final HostAndPort hap = HostAndPort.parseString(sentinel);

      log.debug("Connecting to Sentinel {}", hap);

      Jedis jedis = null;
      try {
        // 連接配接到sentinel
        jedis = new Jedis(hap);
        // 根據masterName得到master的位址,傳回一個list,host= list[0], port =// list[1]
        List<String> masterAddr = jedis.sentinelGetMasterAddrByName(masterName);

        // connected to sentinel...
        sentinelAvailable = true;

        if (masterAddr == null || masterAddr.size() != 2) {
          log.warn("Can not get master addr, master name: {}. Sentinel: {}", masterName, hap);
          continue;
        }
        // 如果在任何一個sentinel中找到了master,不再周遊sentinels
        master = toHostAndPort(masterAddr);
        log.debug("Found Redis master at {}", master);
        break;
      } catch (JedisException e) {
        // resolves #1036, it should handle JedisException there's another chance
        // of raising JedisDataException
        log.warn(
          "Cannot get master address from sentinel running @ {}. Reason: {}. Trying next one.", hap,
          e.toString());
      } finally {
        if (jedis != null) {
          jedis.close();
        }
      }
    }
// 到這裡,如果master為null,則說明有兩種情況,一種是所有的sentinels節點都down掉了,一種是master節點沒有被存活的sentinels監控到
    if (master == null) {
      if (sentinelAvailable) {
        // can connect to sentinel, but master name seems to not
        // monitored
        throw new JedisException("Can connect to sentinel, but " + masterName
            + " seems to be not monitored...");
      } else {
        throw new JedisConnectionException("All sentinels down, cannot determine where is "
            + masterName + " master is running...");
      }
    }
    //如果走到這裡,說明找到了master的位址
    log.info("Redis master running at " + master + ", starting Sentinel listeners...");
    //啟動對每個sentinels的監聽為每個sentinel都啟動了一個監聽者MasterListener。MasterListener本身是一個線程,它會去訂閱sentinel上關于master節點位址改變的消息。
    for (String sentinel : sentinels) {
      final HostAndPort hap = HostAndPort.parseString(sentinel);
      MasterListener masterListener = new MasterListener(masterName, hap.getHost(), hap.getPort());
      // whether MasterListener threads are alive or not, process can be stopped
      masterListener.setDaemon(true);
      masterListeners.add(masterListener);
      masterListener.start();
    }

    return master;
  }
      

  可以看到

initSentinels

方法的參數有一個masterName,就是我們所需要查找的master的名字。一開始,周遊多個sentinels,一個一個連接配接到sentinel,去詢問關于masterName的消息,可以看到是通過

jedis.sentinelGetMasterAddrByName()

方法去連接配接sentinel,并詢問目前的master的位址。點進這個方法去看看,源代碼是這樣寫的:從哨兵節點擷取master資訊的方法:調用的是與Jedis綁定的client去發送一個"get-master-addr-by-name"指令

public List<String> sentinelGetMasterAddrByName(final String masterName) {
    client.sentinel(Protocol.SENTINEL_GET_MASTER_ADDR_BY_NAME, masterName);
    final List<Object> reply = client.getObjectMultiBulkReply();
    return BuilderFactory.STRING_LIST.build(reply);
  }
      

  調用

initPool

方法(構造函數中調用),那麼會初始化Jedis執行個體建立工廠,如果不是第一次調用(

MasterListener

中調用),那麼隻對已經初始化的工廠進行重新設定。Jedis的JedisSentinelPool的實作僅僅适用于單個master-slave。

Jedis-cluster原理:

   先來看一下他的連接配接方式:

Set<HostAndPort> hostAndPorts=new HashSet<>();
HostAndPort hostAndPort=new HostAndPort("192.168.11.153",7000);
HostAndPort hostAndPort1=new HostAndPort("192.168.11.153",7001);
HostAndPort hostAndPort2=new HostAndPort("192.168.11.154",7003);
HostAndPort hostAndPort3=new HostAndPort("192.168.11.157",7006);
hostAndPorts.add(hostAndPort);
hostAndPorts.add(hostAndPort1);
hostAndPorts.add(hostAndPort2);
hostAndPorts.add(hostAndPort3);
JedisCluster jedisCluster=new JedisCluster(hostAndPorts,6000);
jedisCluster.set("wuzz","hello");
      

程式啟動初始化叢集環境:

  1)、讀取配置檔案中的節點配置,無論是主從,無論多少個,隻拿第一個,擷取redis連接配接執行個體

  2)、用擷取的redis連接配接執行個體執行clusterNodes()方法,實際執行redis服務端cluster nodes指令,擷取主從配置資訊

  3)、解析主從配置資訊,先把所有節點存放到nodes的map集合中,key為節點的ip:port,value為目前節點的jedisPool

  4)、解析主節點配置設定的slots區間段,把slot對應的索引值作為key,第三步中拿到的jedisPool作為value,存儲在slots的map集合中就實作了slot槽索引值與jedisPool的映射,這個jedisPool包含了master的節點資訊,是以槽和幾點是對應的,與redis服務端一緻

從叢集環境存取值:

1)、把key作為參數,執行CRC16算法,擷取key對應的slot值

2)、通過該slot值,去slots的map集合中擷取jedisPool執行個體

3)、通過jedisPool執行個體擷取jedis執行個體,最終完成redis資料存取工作

分布式鎖的實作:

  分布式鎖一般有三種實作方式:1. 資料庫樂觀鎖;2. 基于Redis的分布式鎖;3. 基于ZooKeeper的分布式鎖。本篇部落格将介紹第二種方式,基于Redis實作分布式鎖。

  關于鎖,其實我們或多或少都有接觸過一些,比如synchronized、 Lock這些,這類鎖的目的很簡單,在多線程環境下,對共享資源的通路造成的線程安全問題,通過鎖的機制來實作資源通路互斥。那麼什麼是分布式鎖呢?或者為什麼我們需要通過Redis來建構分布式鎖,其實最根本原因就是Score(範圍),因為在分布式架構中,所有的應用都是程序隔離的,在多程序通路共享資源的時候我們需要滿足互斥性,就需要設定一個所有程序都能看得到的範圍,而這個範圍就是Redis本身。是以我們才需要把鎖建構到Redis中。Redis裡面提供了一些比較具有能夠實作鎖特性的指令,比如SETEX(在鍵不存在的情況下為鍵設定值),那麼我們可以基于這個指令來去實作一些簡單的鎖的操作.

  首先,為了確定分布式鎖可用,我們至少要確定鎖的實作同時滿足以下四個條件:

  1. 互斥性。在任意時刻,隻有一個用戶端能持有鎖。
  2. 不會發生死鎖。即使有一個用戶端在持有鎖的期間崩潰而沒有主動解鎖,也能保證後續其他用戶端能加鎖。
  3. 具有容錯性。隻要大部分的Redis節點正常運作,用戶端就可以加鎖和解鎖。
  4. 解鈴還須系鈴人。加鎖和解鎖必須是同一個用戶端,用戶端自己不能把别人加的鎖給解了。

1.引入依賴:

<dependency>
    <groupId>redis.clients</groupId>
    <artifactId>jedis</artifactId>
    <version>3.0.0</version>
</dependency>
      

2.來一個連接配接 redis 的工具類:

public class JedisConnectionUtils {
	private static JedisPool pool=null;
    static {
        JedisPoolConfig jedisPoolConfig=new JedisPoolConfig();
        jedisPoolConfig.setMaxTotal(100);
        pool=new JedisPool(jedisPoolConfig,"192.168.254.136",6399,5000,"wuzhenzhao");
    }
    public static Jedis getJedis(){
        return pool.getResource();
    }
}      

3.加鎖:jedis.set(String key, String value, String nxxx, String expx, int time),這個set()方法一共有五個形參:

  • 第一個為key,我們使用key來當鎖,因為key是唯一的。
  • 第二個為value,我們傳的是requestId,很多童鞋可能不明白,有key作為鎖不就夠了嗎,為什麼還要用到value?原因就是我們在上面講到可靠性時,分布式鎖要滿足第四個條件解鈴還須系鈴人,通過給value指派為requestId,我們就知道這把鎖是哪個請求加的了,在解鎖的時候就可以有依據。requestId可以使用UUID.randomUUID().toString()方法生成。
  • 第三個為nxxx,這個參數我們填的是NX,意思是SET IF NOT EXIST,即當key不存在時,我們進行set操作;若key已經存在,則不做任何操作;
  • 第四個為expx,這個參數我們傳的是PX,意思是我們要給這個key加一個過期的設定,具體時間由第五個參數決定。
  • 第五個為time,與第四個參數相呼應,代表key的過期時間。
private final String LOCK_NAME = "DISTRIBUTEDLOCK";

    private static final String LOCK_SUCCESS = "OK";
    private static final String SET_IF_NOT_EXIST = "NX";
    private static final String SET_WITH_EXPIRE_TIME = "PX";

    /**
     * @param acquireTimeout 獲得鎖的逾時時間
     * @param lockTimeout    鎖本身的過期時間
     * @return
     */
    public String acquireLock(long acquireTimeout, long lockTimeout) {
        String identifier = UUID.randomUUID().toString();//保證釋放鎖的時候是同一個持有鎖的人
        String lockKey = "lock:" + LOCK_NAME;
        int lockExpire = (int) (lockTimeout / 1000);
        Jedis jedis = null;
        try {//擷取連接配接
            jedis = JedisConnectionUtils.getJedis();
            long end = System.currentTimeMillis() + acquireTimeout;
            //擷取鎖的限定時間
            while (System.currentTimeMillis() < end) {
                String result = jedis.set(lockKey, identifier, SET_IF_NOT_EXIST, SET_WITH_EXPIRE_TIME, lockTimeout);
                if (LOCK_SUCCESS.equals(result)) {
                    return identifier;
                }
                //表示沒有逾時時間
                if (jedis.ttl(lockKey) == -1) {
                    jedis.expire(lockKey, lockExpire); //設定逾時時間
                }
                try {
                    //等待片刻後進行擷取鎖的重試
                    Thread.sleep(100);
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
            }
        } finally {
            jedis.close(); //回收
        }
        return null;
    }      

3.釋放鎖

public boolean releaseLock(String lockName,String identifier){
        System.out.println(lockName+"開始釋放鎖:"+identifier);
        String lockKey="lock:"+lockName;
        Jedis jedis=null;
        boolean isRelease=false;
        try{
            jedis=JedisConnectionUtils.getJedis();
            while(true){
            	//Watch 指令用于監視一個(或多個) key ,如果在事務執行之前這個(或這些) key 被其他指令所改動,那麼事務将被打斷
                jedis.watch(lockKey);
                //判斷是否為同一把鎖
                if(identifier.equals(jedis.get(lockKey))){
                	//标記事務開始
                    Transaction transaction=jedis.multi();
                    transaction.del(lockKey);
                    if(transaction.exec().isEmpty()){
                        continue;
                    }
                    isRelease=true;
                }else {
                	//TODO 異常
                }
                jedis.unwatch();
                break;
            }
        }finally {
            jedis.close();
        }
        return  isRelease;
    }
      

5.測試:

public class UnitTest  extends Thread{

    @Override
    public void run() {
        while(true){
        	RedisDemo distributedLock=new RedisDemo();
            String rs=distributedLock.acquireLock("updateOrder",
                    2000,5000);
            if(rs!=null){
                System.out.println(Thread.currentThread().getName()+"-> 成功獲得鎖:"+rs);
                try {
                    Thread.sleep(1000);
                    distributedLock.releaseLock("updateOrder",rs);
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
                break;
            }
        }
    }

    public static void main(String[] args) {
        UnitTest unitTest=new UnitTest();
        for(int i=0;i<10;i++){
            new Thread(unitTest,"tName:"+i).start();
        }
    }
}
      

  如果你的項目中Redis是多機部署的,那麼可以嘗試使用

Redisson

實作分布式鎖,這是Redis官方提供的Java元件

管道模式:

  Redis服務是一種C/S模型,提供請求-響應式協定的TCP服務,是以當用戶端發起請求,服務端處理并傳回結果到用戶端,一般是以阻塞形式等待服務端的響應,但這在批量處理連接配接時延遲問題比較嚴重,是以Redis為了提升或彌補這個問題,引入了管道技術:可以做到服務端未及時響應的時候,用戶端也可以繼續發送指令請求,做到用戶端和服務端互不影響,服務端并最終傳回所有服務端的響應,大大提高了C/S模型互動的響應速度上有了質的提高。

  使用方法:

Jedis jedis=new Jedis("192.168.254.136",6399);
Pipeline pipeline=jedis.pipelined();
    for(int i=0;i<1000;i++){
    pipeline.incr("test");
}
pipeline.sync();
      

Redis緩存與資料一緻性問題:

  對于讀多寫少的高并發場景,我們會經常使用緩存來進行優化。比如說支付寶的餘額展示功能,實際上99%的時候都是查詢,1%的請求是變更(除非是土豪,每秒鐘都有收入在不斷更改餘額),是以,我們在這樣的場景下,可以加入緩存,使用者->餘額

  那麼基于上面的這個出發點,問題就來了,當使用者的餘額發生變化的時候,如何更新緩存中的資料,也就是說。我是先更新緩存中的資料再更新資料庫的資料;還是修改資料庫中的資料再更新緩存中的資料

  資料庫的資料和緩存中的資料如何達到一緻性?首先,可以肯定的是,redis中的資料和資料庫中的資料不可能保證事務性達到統一的,這個是毫無疑問的,是以在實際應用中,我們都是基于目前的場景進行權衡降低出現不一緻問題的出現機率。

更新緩存還是讓緩存失效?

  更新緩存表示資料不但會寫入到資料庫,還會同步更新緩存; 而讓緩存失效是表示隻更新資料庫中的資料,然後删除緩存中對應的key。那麼這兩種方式怎麼去選擇?這塊有一個衡量的名額。

1. 如果更新緩存的代價很小,那麼可以先更新緩存,這個代價很小的意思是我不需要很複雜的計算去獲得最新的餘額數字。

2. 如果是更新緩存的代價很大,意味着需要通過多個接口調用和資料查詢才能獲得最新的結果,那麼可以先淘汰緩存。淘汰緩存以後後續的請求如果在緩存中找不到,自然去資料庫中檢索。

先操作資料庫還是先操作緩存?

  當用戶端發起事務類型請求時,假設我們以讓緩存失效作為緩存的的處理方式,那麼又會存在兩個情況,

1. 先更新資料庫再讓緩存失效

2. 先讓緩存失效,再更新資料庫

  前面我們講過,更新資料庫和更新緩存這兩個操作,是無法保證原子性的,是以我們需要根據目前業務的場景的容忍性來選擇。也就是如果出現不一緻的情況下,哪一種更新方式對業務的影響最小,就先執行影響最小的方案。

最終一緻性的解決方案:

  對于分布式系統的資料最終一緻性問題,我們可以引入消息中間件,對于失敗的緩存更新存入對應的 broker,并對其進行訂閱,當有消息來了,我們可以對由于網絡等非程式錯誤的異常緩存更新進行重試更新:

 關于緩存雪崩的解決方案:

  當緩存大規模滲透在整個架構中以後,那麼緩存本身的可用性講決定整個架構的穩定性。那麼接下來我們來讨論下緩存在應用過程中可能會導緻的問題。

緩存雪崩:

  緩存雪崩是指設定緩存時采用了相同的過期時間,導緻緩存在某一個時刻同時失效,或者緩存伺服器當機當機導緻緩存全面失效,請求全部轉發到了DB層面,DB由于瞬間壓力增大而導緻崩潰。緩存失效導緻的雪崩效應對底層系統的沖擊是很大的。

解決方式:

1. 對緩存的通路,如果發現從緩存中取不到值,那麼通過加鎖或者隊列的方式保證緩存的單程序操作,進而避免失效時并并發請求全部落到底層的存儲系統上;但是這種方式會帶來性能上的損耗

2. 将緩存失效的時間分散,降低每一個緩存過期時間的重複率

3. 如果是因為緩存伺服器故障導緻的問題,一方面需要保證緩存伺服器的高可用、另一方面,應用程式中可以采用多級緩存

緩存穿透:

  緩存穿透是指查詢一個根本不存在的資料,緩存和資料源都不會命中。出于容錯的考慮,如果從資料層查不到資料則不寫入緩存,即資料源傳回值為 null 時,不緩存 null。緩存穿透問題可能會使後端資料源負載加大,由于很多後端資料源不具備高并發性,甚至可能造成後端資料源宕掉。

解決方式

1. 如果查詢資料庫也為空,直接設定一個預設值存放到緩存,這樣第二次到緩沖中擷取就有值了,而不會繼續通路資料庫,這種辦法最簡單粗暴。比如,”key” , “&&”。在傳回這個&&值的時候,我們的應用就可以認為這是不存在的key,那我們的應用就可以決定是否繼續等待繼續通路,還是放棄掉這次操作。如果繼續等待通路,過一個時間輪詢點後,再次請求這個key,如果取到的值不再是&&,則可以認為這時候key有值了,進而避免了透傳到資料庫,進而把大量的類似請求擋在了緩存之中。

2. 根據緩存資料Key的設計規則,将不符合規則的key進行過濾采用布隆過濾器,将所有可能存在的資料哈希到一個足夠大的BitSet中,不存在的資料将會被攔截掉,進而避免了對底層存儲系統的查詢壓力。

布隆過濾器:

  布隆過濾器是Burton Howard Bloom在1970年提出來的,一種空間效率極高的機率型算法和資料結構,主要用來判斷一個元素是否在集合中存在。因為他是一個機率型的算法,是以會存在一定的誤差,如果傳入一個值去布隆過濾器中檢索,可能會出現檢測存在的結果但是實際上可能是不存在的,但是肯定不會出現實際上不存在然後回報存在的結果。是以,Bloom Filter不适合那些“零錯誤”的應用場合。而在能容忍低錯誤率的應用場合下,Bloom Filter通過極少的錯誤換取了存儲空間的極大節省。

  bitmap:

  所所謂的BitMap就是用一個bit位來标記某個元素所對應的value,而key即是該元素,由于BitMap使用了bit位來存儲資料,是以可以大大節省存儲空間.

  基本思想:

  這此我用一個簡單的例子來詳細介紹BitMap算法的原理。假設我們要對0-7内的5個元素(4,7,2,5,3)進行排序(這裡假設元素沒有重複)。我們可以使用BitMap算法達到排序目的。要表示8個數,我們需要8個byte。

  1.首先我們開辟一個位元組(8byte)的空間,将這些空間的所有的byte位都設定為0

  2.然後便利這5個元素,第一個元素是4,因為下邊從0開始,是以我們把第五個位元組的值設定為1

  3.然後再處理剩下的四個元素,最終8個位元組的狀态如下圖

  

  4.現在我們周遊一次bytes區域,把值為1的byte的位置輸出(2,3,4,5,7),這樣便達到了排序的目的

  從上面的例子我們可以看出,BitMap算法的思想還是比較簡單的,關鍵的問題是如何确定10進制的數到2進制的映射圖

  假設需要排序或則查找的數的總數N=100000000,BitMap中1bit代表一個數字,1個int = 4Bytes = 4*8bit = 32 bit,那麼N個數需要N/32 int空間。是以我們需要申請記憶體空間的大小為int a[1 + N/32],其中:a[0]在記憶體中占32為可以對應十進制數0-31,依次類推:

  a[0]-----------------------------> 0-31

  a[1]------------------------------> 32-63

  a[2]-------------------------------> 64-95

  a[3]--------------------------------> 96-127

  ......................................................

  那麼十進制數如何轉換為對應的bit位,下面介紹用位移将十進制數轉換為對應的bit位:

  1.求十進制數在對應數組a中的下标

  十進制數0-31,對應在數組a[0]中,32-63對應在數組a[1]中,64-95對應在數組a[2]中………,使用數學歸納分析得出結論:對于一個十進制數n,其在數組a中的下标為:a[n/32]

  2.求出十進制數在對應數a[i]中的下标

  例如十進制數1在a[0]的下标為1,十進制數31在a[0]中下标為31,十進制數32在a[1]中下标為0。 在十進制0-31就對應0-31,而32-63則對應也是0-31,即給定一個數n可以通過模32求得在對應數組a[i]中的下标。

  3.位移

  對于一個十進制數n,對應在數組a[n/32][n%32]中,但數組a畢竟不是一個二維數組,我們通過移位操作實作置1

  a[n/32] |= 1 << n % 32 

  移位操作: 

  a[n>>5] |= 1 << (n & 0x1F)

  n & 0x1F 保留n的後五位 相當于 n % 32 求十進制數在數組a[i]中的下标

  布隆過濾器就是基于這麼一個原理來實作的。假設集合裡面有3個元素{x, y, z},哈希函數的個數為3。首先将位數組進行初始化,将裡面每個位都設定位0。對于集合裡面的每一個元素,将元素依次通過3個哈希函數進行映射,每次映射都會産生一個哈希值,這個值對應位數組上面的一個點,然後将位數組對應的位置标記為1。查詢W元素是否存在集合中的時候,同樣的方法将W通過哈希映射到位數組上的3個點。如果3個點的其中有一個點不為1,則可以判斷該元素一定不存在集合中。反之,如果3個點都為1,則該元素可能存在集合中

   接下來按照該方法處理所有的輸入對象,每個對象都可能把bitMap中一些位置設定為1,也可能會遇到已經是1的位置,遇到已經為1的讓他繼續為1即可。處理完所有的輸入對象之後,在bitMap中可能已經有相當多的位置已經被為1。至此,一個布隆過濾器生成完成,這個布隆過濾器代表之前所有輸入對象組成的集合。

  如何去判斷一個元素是否存在bit array中呢? 原理是一樣,根據k個哈希函數去得到的結果,如果所有的結果都是1,表示這個元素可能(假設某個元素通過映射對應下标為4,5,6這3個點。雖然這3個點都為1,但是很明顯這3個點是不同元素經過哈希得到的位置,是以這種情況說明元素雖然不在集合中,也可能對應的都是1)存在。 如果一旦發現其中一個比特位的元素是0,表示這個元素一定不存在.