天天看點

LoadingCache學習案例适用性主要的類和接口使用加載緩存回收其他特性待處理

2021-07-23日  晚9點
           

本文主要意思是:學習LoadingCache本地緩存特性和源碼。

文中會引用大量的官網觀點.

一定要看get源碼,注意:回收/删除的動作,可能并不是自動删除,因為涉及到工作線程和使用者線程競争鎖。

LoadingCache學習

  • 案例
  • 适用性
    • 官方說明
    • 功能
  • 主要的類和接口
  • 使用
    • 常用參數
    • segment資料結構
  • 加載
    • 擷取
    • 顯示插入
    • 源碼分析 不外乎 get/put/remove 因為它類似于concurrentMap
      • LocalCache建構
      • LocalCache初始化
      • Segment初始化
      • **get**
      • **put**
        • preWriteCleanup
        • evictEntries
      • remove
      • refresh 其實底層就是同步執行了load方法
    • 資料結構
      • recencyQueue和accessQueue
      • writeQueue
  • 緩存回收
    • 基于容量回收
    • 定時回收
    • 基于引用回收
    • 顯示清除
    • 移除監聽器
    • 清理什麼時候發生!! 重要!!
    • 重新整理
  • 其他特性
    • 統計
    • adMap視圖
    • 中斷
  • 待處理
    • refreshAfterWrite 和expiredAfterWrite 一般怎樣設定/關系, 業務
    • 四種引用
    • 異步線程 (看他源碼)
    • concurrentHashMap1.7 -> 1.8

案例

private static final LoadingCache<String, Long> cache = CacheBuilder.newBuilder()
            .maximumSize(100)
            .expireAfterWrite(10, TimeUnit.SECONDS)
            .removalListener((RemovalListener<String, Long>) notification -> {
                System.out.println("删除監聽器: key, value" + notification.getKey() + "," + notification.getValue());
            })
            .build(new CacheLoader<String, Long>() {
                @Override
                public Long load(String key) throws Exception {
                    long curTime = System.currentTimeMillis();
                    System.out.println("curTime=" + curTime);
                    return curTime;
                }
            });
           

适用性

官方說明

Guava Cache 與ConcurrentMap 很相似,但是也不完全相同。最基本的差別是ConcurrentMap會一直儲存所有添加的元素,直到顯示的删除。

相對地,Guava Cache 為了限制記憶體占用,通常都會設定自定回收元素。在某些場景下,盡管LoadingCache 不回收元素,它也是很有用的,因為它會自動加載緩存。

後面這句話其實就是說LoadingCache自動加載緩存是優點: get(key, load); || 重寫 load 自動觸發   --> 這裡的自動其實都不是自動的, 必須要有讀/寫請求來的時候, 才會觸發
           
通常來說,Guava Cache适用于: 
           
  • 你願意消耗一些機器記憶體空間來提升速度。
  • 你預料到某些鍵會被查詢一次以上。
  • 緩存中存放的資料總量不會超出記憶體總量。(因為Guava Cache是單個應用運作時的本地緩存,它不把資料存儲到檔案或外部伺服器。如果此時不符合你的業務需求,請嘗試Memcached這類工具。)

如果以上場景符合你的業務需要, Guava Cache就适合你。

注意: 如果你不需要Cache 中的特性, 使用ConcurrentHashMap 有更好的記憶體效率–但 Cache的大多數特性都很難基于舊的ConcurrentMap 複制, 甚至根本不可能做到.

也就是說,官方建議我們使用Guava Cache (在符合場景的前提下)

功能

LoadingCache 是一個支援多線程并發讀寫、高性能、通用的in-heap(堆)本地緩存,有以下功能:

  • 核心功能: 高性能線程安全的in-heap Map 資料結構(類似于ConcurrentHashMap)
  • 支援key不存在時按照給定的CacheLoader 的loader方法進行loading。如果有多個線程同時get一個不存在的key,那麼會有一個線程負責load,其他線程阻塞wait等待。
get方法中: 

    // at this point e is either null or expired; 
    // 翻譯: 此時get擷取到的元素為 null / 過期
    return lockedGetOrLoad(key, hash, loader);  
    // 啥意思? 加鎖去get或者執行load方法。 

lockedGetOrLoad 方法

    try {
      // Synchronizes on the entry to allow failing fast when a recursive load is
      // detected. This may be circumvented when an entry is copied, but will fail fast most
      // of the time.
      synchronized (e) {
        return loadSync(key, hash, loadingValueReference, loader);
      }
    } finally {
      statsCounter.recordMisses(1);
    }
           
  • 支援entry的evitBySize, 這是一個LRU cache的基本功能. --> 啥意思
  • 支援對entry設定過期時間, 按照最後通路/寫入的時間來淘汰entry.
  • 支援傳入entry删除事件監聽器, 當entry被删除或者淘汰時執行監聽器邏輯.
  • 支援對entry進行定時reload, 預設使用loader邏輯進行同步reload (建議重寫 CacheLoader的reload方法實作異步reload)
原生CacheLoader 此時執行這個方法是'同步'的, 不過傳回值确是ListenableFuture
    @Override
    public ListenableFuture<Long> reload(String key, Long oldValue) throws Exception{
    //    return Futures.immediateFuture(load(key));  
        return super.reload(key, oldValue);  
        // 這裡直接調用了load方法去擷取傳回值,然後将傳回值設定到一個ImmediateFuture類型的future中。本質上沒有異步執行。
    }


如果需要refresh是異步的, 不影響使用者線程.
此時需要: 自己使用寫一個類AsyncReload, 實作reload方法. 實際上就是啟動一個'異步線程',不阻塞使用者線程.
    相對于原生的CacheLoader, 也隻是隻少了'一個'線程阻塞.
AsyncReload
    final static ListeningExecutorService service = MoreExecutors.listeningDecorator(Executors.newCachedThreadPool());
    @Override
    public ListenableFuture<V> reload(K key, V oldValue) throws Exception {
        ListenableFutureTask<V> task = create(() -> load(key));
        service.execute(task);
        return task;
    }

    .build(new AsyncReload<String, Long>() {
        load..

        @Override
        public ListenableFuture<Long> reload(String key, Long oldValue) throws Exception {
            return super.reload(key, oldValue);
        }
    });
           
  • 支援WeakReference封裝key, 當key在應用程式裡沒有别的引用是, jvm會gc回收key對象, LoadingCache也會得到通知進而進行清理.
  • 支援用WeakReference 或者 SoftReference 封裝value對象
  • 支援緩存相關運作資料的統計.
關于load方法和reload方法說明:
load方法: 第一次加載, 或key對應的value不存在(已經過期)會觸發load方法. 
    load方法是同步的, 對于同一個key, 多次請求隻能觸發一次加載. 比如thread1通路key1,
    發現cache中不存在key1, 觸發key1的load, 同時, 在加載過程完成之前, 其他線程都通路key1, 
    此時這些通路都不會觸發key1的load加載, 因為加載load方法被synchronized修飾.
    synchronized (e) {
        return loadSync(key, hash, loadingValueReference, loader);
    }

reload方法: 
    1. 當cache中有值, 但是需要重新整理的該值的時候就會觸發reload方法. ('不是自動的')
    2. loadingCache的所有更新操作都是'依賴讀寫操作'觸發的, 因為内部沒有定時任務或者時鐘信号. 
        例如, 上一次寫入之後超過了refresh設定的更新時間, 但是之後沒有cache的通路了, 
        此時之後下一次get的時候才會進行觸發refresh.....   
        比如: 20s 後過期, 5s重新整理一次, main開始, 寫入一次. 此時初始的5s内(包括第5s)
        沒有get請求, 此時不會自動觸發refresh, 而是後面15s内來了一個get請求,
        此時才會觸發refresh.
    3. 對于同一個key, '多次請求隻有一個線程觸發reload, 其他請求直接傳回舊值'.
        比如: 7s的時候有兩個線程通路cache, 隻會有一個線程refresh.
        如果是原生CacheLoader, 此時這兩個線程是同步執行的. 其中有一個必然會觸發refresh, 就看作業系統把鎖給哪個線程了. 'A先refresh, B傳回舊值'.  
        如果是AsyncLoader重寫reload, create一個線程異步執行, '這兩個線程傳回舊值'. '異步線程執行觸發refresh'.

隻配置expireAfterWrite: load和refresh都阻塞.
在配置refreshAfterWrite之後, 執行reload時: 
    原生: 一個使用者線程去refresh, 其他線程傳回舊值.
    異步: 所有使用者線程傳回舊值, 新建立一個異步線程去refresh, '是以比原生隻少了一個線程的阻塞'
           

主要的類和接口

  • CacheBuilder
    • Builder設計模式, 更友好的支援多參數的類的建構. CacheBuilder在build方法中, 會前面設定的參數, 全部傳遞個LoaclCache, 它自己實際不參與任何計算. 不錯
  • CacheLoader
    • 抽象類, 使用者從資料源加載資料, 業務也可以繼承它, 重寫load, reload.
  • Cache
    • 接口, 定義get, put, invalidate等方法, 這裡隻有cache的增删改操作, 沒有資料加載的操作.
  • AbstractCache
    • 抽象類, 繼承自Cache接口. 其中批量操作都是循環執行單次行為, 而單次行為都沒有具體定義.
  • LoadingCache
    • 接口, 繼承自Cache接口. 定義get, getUnchecked, getAll等操作, 這些操作都會從資料源load資料.
  • AbstractLoadingCache
    • 抽象類, 繼承自AbstractCache, 實作LoadingCache接口.
  • LoadCache(核心)
    • guava cache核心類, 包含了guava cache的資料結構以及基本的緩存的操作辦法.
  • LocalManualCache
    • LocalCache内部靜态類, 實作Cache接口. 其内部的增删改緩存操作全部調用成員變量 loaclCache的相關方法. (緩存的手動加載: 手動get)
  • LocalLoadingCache
    • LocalCache内部靜态類, 繼承自LocalManualCache類, 實作LoadingCache接口. 其所有操作也是調用成員變量localCache的相關方法. (緩存的自動加載: 自動get)

接口和類關系圖: LocalLoadingCache, LocalManualCache, 是loaclCache内部靜态類

LoadingCache學習案例适用性主要的類和接口使用加載緩存回收其他特性待處理

使用

常用參數

  • expireAfterWrite: 當建立或寫之後的固定有效期到達時, 資料會過期, 但是不會删除, 因為下一次請求來的時候, 會擷取有效值, 無效則删除, 後同步指定load方法.
V value = getLiveValue(e, now); // 擷取存活的值 (如果expired, 則expired 包含: recencyQueue -> 入AccessQueue,删除(writeQueue删除, AccessQueue隊列删除))

    V getLiveValue(ReferenceEntry<K, V> entry, long now) {
      if (entry.getKey() == null) {
        tryDrainReferenceQueues();
        return null;
      }
      V value = entry.getValueReference().get();
      if (value == null) {
        tryDrainReferenceQueues();
        return null;
      }

      if (map.isExpired(entry, now)) {   // 如果需要過期
        tryExpireEntries(now);
        return null;
      }
      return value;
    }

    void tryExpireEntries(long now) {
      if (tryLock()) {
        try {
          expireEntries(now);
        } finally {
          unlock();
      }
    }

    void expireEntries(long now) {
      drainRecencyQueue();   // 最近讀隊列recencyQueue -> 入AccessQueue(最近通路隊列)

      ReferenceEntry<K, V> e;
      while ((e = writeQueue.peek()) != null && map.isExpired(e, now)) {
        if (!removeEntry(e, e.getHash(), RemovalCause.EXPIRED)) {  
          throw new AssertionError();
        }
      }
      while ((e = accessQueue.peek()) != null && map.isExpired(e, now)) {
        if (!removeEntry(e, e.getHash(), RemovalCause.EXPIRED)) {
          throw new AssertionError();
        }
      }
    }
           
  • expireAfterAccess: 當建立或者或讀之後的固定有效期到達時, 同上. 讀寫操作都會重置通路時間, 但asMap方法不會.
  • refreshAfterWrite: 當建立或者寫之後的固定有效期到達時, 且新請求過來時, 資料會被自動重新整理.(注意不是删除而是重新整理(原生同步, 改造異步), 其他線程傳回舊值)
  • concurrencyLevel: 更新操作之間允許的并發, 也就是segment的數量.
  • maximumSize: 指定緩存可以包含的最大entries數.
  • maximumWeight: 指定緩存可能包含的entries數的最大權重, 使用此方法需要先調用weigher方法.
  • weigher: 指定确定entries的weight的wegher, 通過重寫weigh(K key, V value)方法, 來确定每一個Entries的weight.
  • weakKeys: 指定将緩存中存儲的每個鍵都包裝在WeakReference(預設情況下, 使用強引用)
  • weakValues: 指定将緩存中存儲的每個值都包裝在WeakReference(預設情況下, 使用強引用)
  • softValues: 指定将緩存中存儲的每個值都包裝在SoftReference(預設情況下, 使用強引用)

segment資料結構

WriteQueue:按照寫入時間進行排序的元素隊列,寫入一個元素時會把它加入到隊列尾部。
AccessQueue:按照通路時間進行排序的元素隊列,通路(包括寫入)一個元素時會把它加入到隊列尾部。
LoadingCache學習案例适用性主要的類和接口使用加載緩存回收其他特性待處理
LoadingCache學習案例适用性主要的類和接口使用加載緩存回收其他特性待處理
LoadingCache學習案例适用性主要的類和接口使用加載緩存回收其他特性待處理

加載

擷取

  • get 如果沒有, cacheLoader 向緩存原子地加載新值.
  • getAll<Iterable<? extend K>>: 批量查詢. 預設情況下, 對每個不在緩存中的鍵, getAll方法會單獨調用CacheLoader.load來加載緩存項. 如果批量的加載比多個單獨加載高效, 可以重載CacheLoader.loadAll 來利用這一點. getAll(iterable)的性能也會提升.
注: CacheLoader.loadAll 的實作可以為沒有明确請求的鍵加載緩存值. 例如, 為某組中的任意鍵計算值時, 能夠擷取該組中的所有鍵值, loadAll方法就可以實作為在同一時間擷取該組的其他鍵值. getAll(Iterable<? extends K>)方法會調用loadAll, 但會篩選結果, 隻會傳回請求的鍵值對.

顯示插入

使用cache.put(key, value)方法可以直接向緩存中插入值, 這會直接覆寫掉給定鍵之前映射的值. 使用Cache.asMap()視圖提供的任何方法也能修改緩存. 但一定要注意, asMap視圖的任何方法都不能保證緩存項被原子地加載到緩存.

進一步說. asMap視圖的原子運算在Guava Cache的原子加載範疇之外, 是以相比于Cache.asMap().putIfAbsent(K, V), Cache.get(K, Callable)應該總是優先使用.

源碼分析 不外乎 get/put/remove 因為它類似于concurrentMap

LocalCache建構

private static final LoadingCache<String, Long> notifyNoticeAndEmailCache = CacheBuilder.newBuilder() //
            .expireAfterWrite(
                    20, TimeUnit.SECONDS) // 緩存項在給定時間内沒有被寫通路(建立或覆寫),則回收: 10s失效
            .maximumSize(10)
            .removalListener((RemovalListener<String, Long>) notification -> {
                System.out.println("删除監聽器: key, value" + notification.getKey() + "," + notification.getValue());
            })
            .refreshAfterWrite(5, TimeUnit.SECONDS)
            .build(new AsyncReLoad<String, Long>() {
                @Override
                public Long load(String key) {
                    long curTime = System.currentTimeMillis();
                    System.out.println("curTime=" + curTime);
                    return curTime;
                }

                @Override
                public ListenableFuture<Long> reload(String key, Long oldValue) throws Exception {
                    System.out.println("異步線程, 執行");
                    return super.reload(key, oldValue);
                }
            });

           

以上是一個建構LoadingCache的小李子. 流程如下:

  1. 采用建造者模式, 首先創造出CacheBuilder.
  2. 使用流式風格, 依次設定參數.
  3. build方法建立完成.

其中以上參數都是CacheBuilder的屬性, 在使用Build方法時, CacheBuilder被當做參數傳入LoaclLoadingCache或LoaclManualCache的構造器中進行構造.

public <K1 extends K, V1 extends V> LoadingCache<K1, V1> build(
      CacheLoader<? super K1, V1> loader) {
    checkWeightWithWeigher();
    return new LocalCache.LocalLoadingCache<>(this, loader);   // 多個了一個loader
    }
    public <K1 extends K, V1 extends V> Cache<K1, V1> build() {
    checkWeightWithWeigher();
    checkNonLoadingCache();
    return new LocalCache.LocalManualCache<>(this);
  }
           

從上面代碼可以看出來, 差別是在建立時是否傳入CacheLoader, 分為’自動加載’與手動加載. 同時建立緩存時, 也将CacheBuilder傳入(this).

static class LocalLoadingCache<K, V> extends LocalManualCache<K, V>
      implements LoadingCache<K, V> {

    LocalLoadingCache(
        CacheBuilder<? super K, ? super V> builder, CacheLoader<? super K, V> loader) {
      super(new LocalCache<K, V>(builder, checkNotNull(loader)));
    }

static class LocalManualCache<K, V> implements Cache<K, V>, Serializable {
    final LocalCache<K, V> localCache;

    LocalManualCache(CacheBuilder<? super K, ? super V> builder) {
      this(new LocalCache<K, V>(builder, null));
    }

    private LocalManualCache(LocalCache<K, V> localCache) {
      this.localCache = localCache;
    }

           

從上面代碼看出, LocalLoadingCache繼承自LoadManualCache, 差別是: CacherLoader是否自主傳入進來. 有/沒有.

兩個類都有一個LocalCache的引用, 其内部的增删改緩存操作都是調用成員變量localCache. 但是它兩都是LocalCache的靜态内部類.

LocalCache初始化

LocalCache首先把builder中的所有屬性都取出來, 然後指派給自己的所有成員變量.

class LocalCache<K, V> extends AbstractMap<K, V> implements ConcurrentMap<K, V>

  static final int MAXIMUM_CAPACITY = 1 << 30; // 緩存最大容量, 該數值必須為2的幂, 同時小于這個最大值 2^30 

  static final int MAX_SEGMENTS = 1 << 16; // segment數組最大容量

  static final int CONTAINS_VALUE_RETRIES = 3; // containsValue方法的重試次數

  static final int DRAIN_THRESHOLD = 0x3F;     // 在更新緩存的最近排序資訊之前, 每個段可以緩沖的緩存通路操作的數量. 這是為了避免鎖争用, 通過記錄讀的記憶和延遲擷取鎖,直到超過門檻值或發生突變.

  static final int DRAIN_MAX = 16;            // 一次清理操作中, 最大移除的entry數量

  final int segmentMask;                      // 定位segment

  final int segmentShift; // 定位segment, 同時讓entry分布均勻,盡量平均分布在每個segment[i]中
  final Segment<K, V>[] segments;

  final int concurrencyLevel;  // 更新操作之間允許的并發, 也就是**segment**的數量.

  下來我們來看一下LocalCache的構造
     LocalCache(
          CacheBuilder<? super K, ? super V> builder, @Nullable CacheLoader<? super K, V> loader) {
        // 并發程度, 根據傳入參數和預設最大值中取小值
        // 如果沒有指定參數的情況下, concurrencyLevel == UNSET_INT == -1 
        // 然後getConcurrencyLevel 方法判斷上面這個方式成立, 傳回預設值4
        // 否則傳回使用者傳遞進來的參數
        concurrencyLevel = Math.min(builder.getConcurrencyLevel(), MAX_SEGMENTS);

        // 鍵值的引用類型, 沒有指定的話, 預設為強引用類型.
        keyStrength = builder.getKeyStrength();
        valueStrength = builder.getValueStrength();

        // 判斷相同的方法, 強引用類型就是Equivalence.equals()  各個引用類型的判斷下相同的方法都不同, 在  enum Strength 中可以看到
        keyEquivalence = builder.getKeyEquivalence();
        valueEquivalence = builder.getValueEquivalence();

        // 最大權重
        maxWeight = builder.getMaximumWeight();
        weigher = builder.getWeigher();
        expireAfterAccessNanos = builder.getExpireAfterAccessNanos();
        expireAfterWriteNanos = builder.getExpireAfterWriteNanos();
        refreshNanos = builder.getRefreshNanos();
        // 移除消息監聽器
        removalListener = builder.getRemovalListener();
        // 如果指定了移除消息監聽器的話, 會建立一個隊列, 臨時儲存移除的内容
        removalNotificationQueue =
            (removalListener == NullListener.INSTANCE)
                ? LocalCache.<RemovalNotification<K, V>>discardingQueue()
                : new ConcurrentLinkedQueue<RemovalNotification<K, V>>();

        ticker = builder.getTicker(recordsTime());
        // 建立新的緩存内容(entry) 的工廠, 會根據引用類型選擇對應的工廠
        entryFactory = EntryFactory.getFactory(keyStrength, usesAccessEntries(), usesWriteEntries());
        globalStatsCounter = builder.getStatsCounterSupplier().get();
        defaultLoader = loader;

        // 初始化緩存容量, 預設為16
        int initialCapacity = Math.min(builder.getInitialCapacity(), MAXIMUM_CAPACITY);
        if (evictsBySize() && !customWeigher()) {
          initialCapacity = (int) Math.min(initialCapacity, maxWeight);
        }

        // Find the lowest power-of-two segmentCount that exceeds concurrencyLevel, unless
        // maximumSize/Weight is specified in which case ensure that each segment gets at least 10
        // entries. The special casing for size-based eviction is only necessary because that eviction
        // happens per segment instead of globally, so too many segments compared to the maximum size
        // will result in random eviction behavior.
        int segmentShift = 0;
        int segmentCount = 1;
        // 根據并發程度來計算segement的個數, (大于等于concurrencyLevel的最小的2次幂, 這裡即為 4)
        while (segmentCount < concurrencyLevel && (!evictsBySize() || segmentCount * 20 <= maxWeight)) {
          ++segmentShift;
          segmentCount <<= 1;
        }
        // 這裡的segmentShift和segmentMask用來打散entry, 讓緩存内容盡量均勻分布在每個segment下
        this.segmentShift = 32 - segmentShift;
        segmentMask = segmentCount - 1;

        // 初始化segment數組, 大小為4
        this.segments = newSegmentArray(segmentCount);

        // 每個segment的容量, 總容量 / 數組個數, 向上取整, 16 / 4 = 4
        int segmentCapacity = initialCapacity / segmentCount;
        if (segmentCapacity * segmentCount < initialCapacity) {
          ++segmentCapacity;
        }
        // segmentSize 為小于segmentCapacity的最大的2的次幂, 這裡為4
        int segmentSize = 1;
        while (segmentSize < segmentCapacity) {
          segmentSize <<= 1;
        }
        // 初始化每個segment[i]
        // 注意: 判斷是否有權重, 根據權重來初始化
        if (evictsBySize()) {
          // Ensure sum of segment max weights = overall max weights
          long maxSegmentWeight = maxWeight / segmentCount + 1;
          long remainder = maxWeight % segmentCount;
          for (int i = 0; i < this.segments.length; ++i) {
            if (i == remainder) {
              maxSegmentWeight--;
            }
            this.segments[i] =
                createSegment(segmentSize, maxSegmentWeight, builder.getStatsCounterSupplier().get());
          }
        } else {
          // 權重為-1, 也就是基本上沒有權重
          for (int i = 0; i < this.segments.length; ++i) {
            this.segments[i] =
                createSegment(segmentSize, UNSET_INT, builder.getStatsCounterSupplier().get());
          }
        }
      }
           
2021年7月25日早11點:

Segment初始化

static class Segment<K, V> extends ReentrantLock

        @Weak final LocalCache<K, V> map;
        volatile @MonotonicNonNull AtomicReferenceArray<ReferenceEntry<K, V>> table;
            final @Nullable ReferenceQueue<K> keyReferenceQueue;
        final @Nullable ReferenceQueue<V> valueReferenceQueue;
        final Queue<ReferenceEntry<K, V>> recencyQueue;
        final AtomicInteger readCount = new AtomicInteger();
        @GuardedBy("this")
        final Queue<ReferenceEntry<K, V>> writeQueue;
        @GuardedBy("this")
        final Queue<ReferenceEntry<K, V>> accessQueue;

初始化:
    Segment(
        LocalCache<K, V> map,
        int initialCapacity,
        long maxSegmentWeight,
        StatsCounter statsCounter) {
      this.map = map;
      this.maxSegmentWeight = maxSegmentWeight;
      this.statsCounter = checkNotNull(statsCounter);
      initTable(newEntryArray(initialCapacity));

      keyReferenceQueue = map.usesKeyReferences() ? new ReferenceQueue<K>() : null;

      valueReferenceQueue = map.usesValueReferences() ? new ReferenceQueue<V>() : null;

      recencyQueue =
          map.usesAccessQueue()
              ? new ConcurrentLinkedQueue<ReferenceEntry<K, V>>()
              : LocalCache.<ReferenceEntry<K, V>>discardingQueue();

      writeQueue =
          map.usesWriteQueue()
              ? new WriteQueue<K, V>()
              : LocalCache.<ReferenceEntry<K, V>>discardingQueue();

      accessQueue =
          map.usesAccessQueue()
              ? new AccessQueue<K, V>()
              : LocalCache.<ReferenceEntry<K, V>>discardingQueue();
    }
           
從segment的資料結構來看, 它有以下幾個隊列和table, map組成:
  1. keyReferenceQueue
  2. valueReferenceQueue

keyReference和valueReferenceQueue 在build時設定了soft或weak時會被初始化,

用于接收gc回收key/value對象的通知, 隊列中的元素是引用key/value的reference.

顯然referenceQueue是線程安全的, 隊列的生成者是jvm的gc線程, 消費者是LoadingCache自身.

當key對象除了reference之外沒有别的地方引用時, 下次gc時對象會被回收,

同時referenceQueue會受到通知, 然後對應的entry會被清理掉.

其實就是一句話, 在弱引用/軟引用的情況下gc回收的内容會放入這兩個隊列. 加速gc回收.
  1. recencyQueue
recencyQueue 啟用條件和accessQueue一樣。
  • 每次’通路操作’(讀)都會将該entry加入到隊列尾部,并更新accessTime。
  • 如果遇到寫入操作,則将該隊列内容排幹,如果accessQueue隊列中持有該這些 entry,然後将這些entry add到accessQueue隊列。
  • 注意,因為accessQueue是非線程安全的,是以如果每次通路entry時就将該entry加入到accessQueue隊列中,就會導緻并發問題。
  • 是以這裡每次通路先将entry臨時加入到并發安全的ConcurrentLinkedQueue隊列中,也就是recencyQueue中。
  • 在寫入的時候通過加鎖的方式,将recencyQueue中的資料添加到accessQueue隊列中。
  • 如此看來,recencyQueue是為accessQueue服務的.
  1. accessQueue

按照通路時間進行排序的元素隊列, 通路(包括寫入)一個元素時會把它加入到隊列尾部.

  1. writeQueue

按照寫入時間進行排序的元素隊列, 寫入一個元素時會把它加入到隊列尾部.

get

下面我們來介紹一下 get源碼, !!! —重要

@Override
    public V get(K key) throws ExecutionException {
      return localCache.getOrLoad(key);
    }
一路進來: 其實就定位到了那個segment中entry中的值
    V get(K key, CacheLoader<? super K, V> loader) throws ExecutionException {
        int hash = hash(checkNotNull(key));
        return segmentFor(hash).get(key, hash, loader);
    }

Segment#get()
    V get(K key, int hash, CacheLoader<? super K, V> loader) throws ExecutionException {
      checkNotNull(key);
      checkNotNull(loader);
      try {
        // count儲存的是該segment中entry的數量, 如果為0, 就直接去加載
        if (count != 0) { // read-volatile
          // don't call getLiveEntry, which would ignore loading values
          // 擷取entry, 并做一些清理工作, 在操作開始/結束. 預設清理DRAIN_MAX = 16次.
          ReferenceEntry<K, V> e = getEntry(key, hash);
          if (e != null) {
            long now = map.ticker.read();
            // 在entry無效, 過期, 正在載入都會傳回null, 如果傳回不為null, 即為正常命中
            V value = getLiveValue(e, now);
            if (value != null) {
              // 如果有expiredAfterAccess, 更新entry的accessTime. finaly,  丢入recencyQueue 最近通路隊列, 為啥不進入accessQueue. TODO 後面會解釋... 
              recordRead(e, now);
              // 性能統計
              statsCounter.recordHits(1);
              // 根據使用者是否設定重新整理時間&距離上次通路/寫入時間-短時間過期, 進行重新整理或者直接傳回.
              return scheduleRefresh(e, key, hash, value, now, loader);
            }
            ValueReference<K, V> valueReference = e.getValueReference();
            if (valueReference.isLoading()) {
              // 如果正在被加載, 等待加載完成.
              return waitForLoadingValue(e, key, valueReference);
            }
          }
        }

        // 如果不存在或者過期, 就用loader方式加載, loader從上面來: 1. default, 2. get(key, loader)
        return lockedGetOrLoad(key, hash, loader);
      } catch (ExecutionException ee) {
        Throwable cause = ee.getCause();
        if (cause instanceof Error) {
          throw new ExecutionError((Error) cause);
        } else if (cause instanceof RuntimeException) {
          throw new UncheckedExecutionException(cause);
        }
        throw ee;
      } finally {
        // 清理, 清除操作總是伴随着寫入進行的, 如果是過長時間沒有寫入, 那我們就根據讀線程來完成清理.
        // 很長時間是多長?還記得前面有一個參數DRAIN_THRESHOLD = 0x3F = 3F = 63
        // (readCount.incrementAndGet() & DRAIN_THRESHOLD) == 0 
        // 也就是說 讀64次就會清理一次, 具體是怎樣清理的, 給你們透露一下, <font color='red'>**上面5個隊列都會清理**</font>.......
        postReadCleanup();
      }
    }

Segment#lockedGetOrLoad(): 

  V lockedGetOrLoad(K key, int hash, CacheLoader<? super K, V> loader) throws ExecutionException {
      ReferenceEntry<K, V> e;
      ValueReference<K, V> valueReference = null;
      LoadingValueReference<K, V> loadingValueReference = null;
      boolean createNewEntry = true;

      // 因為Segment繼承自ReentrantLock, 是以有這個方法. 為啥會加鎖, 因為有多線程.
      lock();
      try {
        // re-read ticker once inside the lock
        long now = map.ticker.read();
        preWriteCleanup(now);
        // 該segment下面的HashTable
        int newCount = this.count - 1;
        AtomicReferenceArray<ReferenceEntry<K, V>> table = this.table;
        //這裡也是為什麼table的大小要為2的幂(最後index範圍剛好在0-table.length()-1),  資料需要分散. 如果是奇數, 那麼 & 之後永遠在 奇數槽
        int index = hash & (table.length() - 1);
        ReferenceEntry<K, V> first = table.get(index);

        //周遊連結清單
        for (e = first; e != null; e = e.getNext()) {
          K entryKey = e.getKey();
          if (e.getHash() == hash
              && entryKey != null
              && map.keyEquivalence.equivalent(key, entryKey)) { 
            // keyEquivalence 是指比對規則, equivalent 類似equals
            valueReference = e.getValueReference();
            // 如果正在載入, 此時不需要建立, 等待即可
            if (valueReference.isLoading()) {
              createNewEntry = false;
            } else {
              V value = valueReference.get();
              // 被gc回收 (配置了weakReference / SoftReference 弱引用/軟引用)
              if (value == null) {
                enqueueNotification(
                    entryKey, hash, value, valueReference.getWeight(), RemovalCause.COLLECTED);
              } else if (map.isExpired(e, now)) {
                // 過期了呀
                // This is a duplicate check, as preWriteCleanup already purged expired
                // entries, but let's accommodate an incorrect expiration queue.
                enqueueNotification(
                    entryKey, hash, value, valueReference.getWeight(), RemovalCause.EXPIRED);
              } else {
                // 記錄被讀,== 更新accessTime, 和recency隊列, 正常傳回value值
                recordLockedRead(e, now);
                statsCounter.recordHits(1);
                // we were concurrent with loading; don't consider refresh
                return value;
              }

              // 針對被gc回收和過期的情況, 從寫隊列和最近通路隊列中移除,
              // 因為後面重新載入後, 會再次添加到隊列中
              // immediately reuse invalid entries
              writeQueue.remove(e);
              accessQueue.remove(e);
              this.count = newCount; // write-volatile
            }
            break;
          }
        }


        // 注意: 下面這段代碼隻是說正在加載中, 實際上沒有把具體的值加載進去, 隻是為了阻塞其他線程.
        if (createNewEntry) {
          // 先建立一個LoadingValueReference, 表示正在載入中
          loadingValueReference = new LoadingValueReference<>();

          if (e == null) {
            // 如果目前連結清單為null, 先建立一個頭結點
            e = newEntry(key, hash, first);
            e.setValueReference(loadingValueReference);
            table.set(index, e);
          } else {
            e.setValueReference(loadingValueReference);
          }
        }
      } finally {
        unlock();
        // 執行清理, 确實是這樣, 因為官網規定, 每次操作都會清空資料.
        postWriteCleanup();
      }

      if (createNewEntry) {
        try {
          // Synchronizes on the entry to allow failing fast when a recursive load is
          // detected. This may be circumvented when an entry is copied, but will fail fast most
          // of the time.
          synchronized (e) {
            // 同步加載
            return loadSync(key, hash, loadingValueReference, loader);
          }
        } finally {
          // 記錄沒有被命中
          statsCounter.recordMisses(1);
        }
      } else {
        // The entry already exists. Wait for loading.
        return waitForLoadingValue(e, key, valueReference);
      }
    }

Segment#loadSync(): 
    V loadSync(
        K key,
        int hash,
        LoadingValueReference<K, V> loadingValueReference,
        CacheLoader<? super K, V> loader)
        throws ExecutionException {
      // 調用loader, 看着是異步, 要注意不是異步!!!!!
      ListenableFuture<V> loadingFuture = loadingValueReference.loadFuture(key, loader);
      return getAndRecordStats(key, hash, loadingValueReference, loadingFuture);
    }

Segment#getAndRecordStats():
    // 等待載入, 并且記錄是否成功
    V getAndRecordStats(
        K key,
        int hash,
        LoadingValueReference<K, V> loadingValueReference,
        ListenableFuture<V> newValue)
        throws ExecutionException {
      V value = null;
      try {
        value = getUninterruptibly(newValue);
        if (value == null) {
          throw new InvalidCacheLoadException("CacheLoader returned null for key " + key + ".");
        }
        // 性能統計是否加載成功!!
        statsCounter.recordLoadSuccess(loadingValueReference.elapsedNanos());
        `真正去把資料,載入到緩存中 (目前還是loadingValueReference, 表示isLoading 不過這個方法内部會把LoadingValueReference變成對應引用類型的ValueReference )`
        storeLoadedValue(key, hash, loadingValueReference, value);
        return value;
      } finally {
        loader方法資料為null, 或者store資料異常, 删除LoadingValue
        if (value == null) {
          statsCounter.recordLoadException(loadingValueReference.elapsedNanos());
          removeLoadingValue(key, hash, loadingValueReference);
        }
      }
    }

Segment#storeLoadedValue():
    boolean storeLoadedValue(
        K key, int hash, LoadingValueReference<K, V> oldValueReference, V newValue) {
      lock();
      try {
        long now = map.ticker.read();
        preWriteCleanup(now);

        int newCount = this.count + 1;
        if (newCount > this.threshold) { // ensure capacity
          expand();
          newCount = this.count + 1;
        }

        AtomicReferenceArray<ReferenceEntry<K, V>> table = this.table;
        int index = hash & (table.length() - 1);
        ReferenceEntry<K, V> first = table.get(index);
        // 找到entry
        for (ReferenceEntry<K, V> e = first; e != null; e = e.getNext()) {
          K entryKey = e.getKey();
          if (e.getHash() == hash
              && entryKey != null
              && map.keyEquivalence.equivalent(key, entryKey)) {
            ValueReference<K, V> valueReference = e.getValueReference();
            V entryValue = valueReference.get();
            // replace the old LoadingValueReference if it's live, otherwise
            // perform a putIfAbsent
            if (oldValueReference == valueReference
                || (entryValue == null && valueReference != UNSET)) {
              ++modCount;
              if (oldValueReference.isActive()) {
                RemovalCause cause =
                    (entryValue == null) ? RemovalCause.COLLECTED : RemovalCause.REPLACED;
                enqueueNotification(key, hash, entryValue, oldValueReference.getWeight(), cause);
                newCount--;
              }
              // LoadingValueReference變成對應引用類型的ValueReference,并進行指派操作
              setValue(e, key, newValue, now);
              this.count = newCount; // write-volatile
              evictEntries(e);
              return true;
            }

            // the loaded value was already clobbered
            enqueueNotification(key, hash, newValue, 0, RemovalCause.REPLACED);
            return false;
          }
        }

        ++modCount;
        ReferenceEntry<K, V> newEntry = newEntry(key, hash, first);
        setValue(newEntry, key, newValue, now);
        table.set(index, newEntry);
        this.count = newCount; // write-volatile
        evictEntries(newEntry);
        return true;
      } finally {
        unlock();
        // 清理
        postWriteCleanup();
      }
    }


           
總結一下get方法的過程:
  1. 指定segment中table中找到沒有被回收、沒有過期的entry, 如果找到了, 并且在CacheBuilder配置了refreshAfterWrite, 并且目前時間已經超過了這個時間, 則重新加載. 否則, 傳回這個值.
  2. 找到的ValueReference是loadingValueReference(正在加載). 此時waitForLoadingValue阻塞等待.
  3. 如果沒有找到entry, 或者找到為null/expired. lockedGetOrLoad, 加鎖. 在table裡面找對應key的entry,

    如果找到valueReference.isLoading() == true為正在加載中, 等待加載完, 拿到value值即可.

    如果找到值為非null&非expired, 傳回. 否則, 建立一個LoadingValueReference, 調用loadSync真正加載相關的值入table.

  4. loadSync會執行loader, 然後給出ListenableFuture, 根據future去存儲getAndRecordStats并且擷取值. 存儲storeLoadedValue.

    然後, 大部分情況下替換LoadingValueReference, 少部分情況是删除LoadingValueReference(此時 是因為load方法執行得到的資料是null, 抛出了異常.)

下面是get的大體流程圖:

LoadingCache學習案例适用性主要的類和接口使用加載緩存回收其他特性待處理
2021年8月1日14點:

put

下面我們來介紹一下 put源碼, !!! —重要

@Nullable
    V put(K key, int hash, V value, boolean onlyIfAbsent) {
      lock();
      try {
        long now = map.ticker.read();
        // 1. 先去清理key/value 引用隊列, 再調用expireEntries 删除writeQueue/accessQueue中過期的entry
        preWriteCleanup(now);

        int newCount = this.count + 1;
        if (newCount > this.threshold) { // ensure capacity  擴容
          expand();
          newCount = this.count + 1;
        }

        AtomicReferenceArray<ReferenceEntry<K, V>> table = this.table;
        int index = hash & (table.length() - 1);
        ReferenceEntry<K, V> first = table.get(index);

        // Look for an existing entry.
        for (ReferenceEntry<K, V> e = first; e != null; e = e.getNext()) {
          K entryKey = e.getKey();
          if (e.getHash() == hash
              && entryKey != null
              && map.keyEquivalence.equivalent(key, entryKey)) {
            // We found an existing entry.

            ValueReference<K, V> valueReference = e.getValueReference();
            V entryValue = valueReference.get();

            if (entryValue == null) {
              ++modCount;
              // 2. value被自動删除, 因為值被回收了
              if (valueReference.isActive()) {
                enqueueNotification(
                    key, hash, entryValue, valueReference.getWeight(), RemovalCause.COLLECTED);
                setValue(e, key, value, now);
                newCount = this.count; // count remains unchanged
              } else {
                setValue(e, key, value, now);
                newCount = this.count + 1;
              }
              this.count = newCount; // write-volatile
              // 3. 判斷權重是否超了, 超了就要被删除
              evictEntries(e);
              return null;
              // 4. 如果是存在&onlyIfAbsent==true
            } else if (onlyIfAbsent) {
              // Mimic
              // "if (!map.containsKey(key)) ...
              // else return map.get(key);
              // 5. 記錄被讀取, 然後傳回值
              recordLockedRead(e, now);
              return entryValue;
            } else {
              // clobber existing entry, count remains unchanged
              ++modCount;
              enqueueNotification(
                  key, hash, entryValue, valueReference.getWeight(), RemovalCause.REPLACED);
              setValue(e, key, value, now);
              evictEntries(e);
              return entryValue;
            }
          }
        }

        // Create a new entry.
        ++modCount;
        ReferenceEntry<K, V> newEntry = newEntry(key, hash, first);
        setValue(newEntry, key, value, now);
        table.set(index, newEntry);
        newCount = this.count + 1;
        this.count = newCount; // write-volatile
        // 每次寫入之後都會判斷權重, 是否删除
        evictEntries(newEntry);
        return null;
      } finally {
        unlock();
        postWriteCleanup();
      }
    }

public V put(K key, V value); // onlyIfAbsent為fale
public V putIfAbsent(K key, V value); // onlyIfAbsent為true 這個參數的意思是如果緩存中沒有才會寫入資料哦
           

下面是put的大概流程:

LoadingCache學習案例适用性主要的類和接口使用加載緩存回收其他特性待處理

preWriteCleanup

void preWriteCleanUp(long now)

傳入目前時間

作用: 清理keyQueue和valueQueue中被GC, 等待清除的entry資訊.

過期掉 writeQueue和accessQueue中entry.

evictEntries

void evictEntries(ReferenceEntry<K, V> newest);

傳入參數為最新的entry, 可能是最新插入的, 也可能是剛更新過的.

這個方法隻有設定了maximumSize才會進行.

作用/步驟: 1. 清除recencyQueue, 判斷該元素自身的權重是否超過上限, 如果超過則移除元素.

2. 判斷總的權重是否大于上限, 如果超過則去accessQueue找到隊首(即最不常通路元素)進行移除, 直到小于上限.

remove

public V remove(@Nullable Object key);

調用LocalManualCache的invalidate(Object key)方法即可調用remove.

refresh 其實底層就是同步執行了load方法

資料結構

recencyQueue和accessQueue

如果在build緩存時設定了expiredAfterAccess || maxWeight, recencyQueue于accessQueue就會被初始化.

recencyQueue是最近讀隊列, accessQueue是最近通路隊列(讀寫). 其中最近讀隊列是采用的是jdk中線程安全、支援高并發讀寫的ConcurrentLinkedQueue, 隊列中存儲的元素是ReferenceEntry. 最近通路隊列采用的是LoadingCache自己實作的AccessQueue, 節點是ReferenceEntry、非線程安全. (帶頭結點的雙向連結清單)

用兩個隊列實作LRU的原因為:

我們首先來看一下緩存下為啥要用LRU:

  1. 緩存的場景基本是讀多寫少, LoadingCache的讀操作要做到高性能、lock-free的讀, 這樣就會有多個線程同時讀緩存, 意味着LRU隊列支援多線程高并發寫入(調整元素在LRU隊列中的通路順序)
  2. LoadingCache中元素可能會過期、容量限制、被gc回收等原因被淘汰出緩存, 意味着需要從LRU隊列中高效删除元素. 是以我們需要一個支援多線程并發通路的、常數時間删除元素的隊列實作.

    顯然一個ConcurrentLinkedQueue不能同時滿足這兩個需求, Guava給的解就是再增加一個簡單的AccessQueue做到常數時間删除元素. 具體來說:

  • 每次無鎖的讀操作都會去寫而且隻會寫最近讀隊列(将entry直接入隊, 方法: recordRead)
  • 每次鎖保護下寫操作都會涉及到最近通路隊列的讀寫,
    1. 比如每次想緩存新增元素都會做幾次清理工作. 清理就需要讀accessQueue(淘汰掉隊頭的元素, 見方法寫前expiredEntries, 寫後evictEntries);
    2. 每次向緩存新增元素成功後記錄元素寫操作, 記錄會寫accessQueue(加到隊尾, 見方法recordWrite).
    3. 每次通路accessQueue前都需要先排幹recencyQueue至accessQueue中(key相同的, 按先進先出順序, ==批量調整accessQueue中元素順序), 然後再去進行accessQueue的讀或者寫操作, 以盡量保證accessQueue中元素順序和真實的最近通路順序一緻. (見方法: drainRecencyQueue)

關于這個做法其實是有以下幾個問題:

  • 如果在寫少讀非常多的場景下, 讀寫accessQueue的機會很少, 大量讀操作會在recencyQueue中累積很多元素占用記憶體而得不到排幹的機會. 是以Guava為了解決這個難問題, 為讀操作設定了一個DRAIN_THRESHOLD == 63(方法: postReadCleanup), 當累積讀次數達到排幹門檻值時64也會觸發一次清理操作, 進而排幹recenceyQueue到accessQueue
  • 在高并發場景下, 即使是在每次讀寫accessQueue前做排幹recencyQueue操作, 也不能保證accessQueue中元素順序和實際元素通路順序一緻. 這個問題也沒有那麼嚴重,導緻結果無非就是LRU淘汰過程沒那麼deterministic, 對于一個緩存來說也可以接受.

writeQueue

最近寫隊列: 如果build緩存時設定expireAfterWrite, 建立segment時就會初始化writeQueue。

實作: 十分簡單, 就是一個帶頭節點的雙向循環連結清單, 并且沒有任何并發. 節點對象就是ReferenceEntry本身.

注意到segment中hash表裡的散列連結清單節點也是ReferenceEntry, 這就有一個技巧了: 即一個節點對象可能會同時屬于多個連結清單中, 不同連結清單使用不同的前後節點指針, 這個技巧的好處在于給定一個節點entry對象, 所有連結清單都可以做到常數時間的查找和删除(jdk裡面的linkedHashMap實作也采用了這個技巧).

由于寫操作都會在鎖保護進行, 是以這個writeQueue無需是線程安全的.

緩存回收

一個殘酷的現實是, 我們好像沒有足夠的記憶體緩存所有資料. 你必須決定: 什麼時候某個緩存項不值得保留了? Guava Cache 提供了三種基本的緩存回收方式: 基于容量回收、定時回收和基于引用回收.

基于容量回收

如果要規定緩存項的數目不超過固定值, 隻需使用CacheBuilder.maximumSize(long). 緩存将嘗試回收最近沒有使用或者總體上沒有使用的緩存項. ----- 注意: 在緩存項的數目達到限定值之前, 緩存就可能進行回收操作— 通常來說, 這種情況發生在緩存項的數目逼近限定值時.

另外, 不同緩存項有不同的權重(weights) --例如, 如果你的緩存值, 占據完全不同的記憶體空間, 你可以使用CacheBuilder.weight(Weighter)指定一個權重函數, 并可以用CacheBuilder.maximumWeight(long)指定最大總重.

定時回收

CacheBuilder提供了兩種定時回收的方法:

  • expiredAfterAccess(long, TimeUnit): 沒有在給定時間内通路(讀/寫), 回收
  • expiredAfterWrite(long, TimeUnit): 沒有在給定時間内寫, 回收

如下文: 定時回收大多是情況是在寫進行, 偶爾在讀進行

基于引用回收

通過使用弱引用的鍵/值, 軟值. Cuava Cache可以把緩存設定為允許垃圾回收:

  • CacheBuilder.weakKeys():
  • CacheBuilder.weakValues():
  • CacheBuilder.softKeys():

顯示清除

任何時候, 我們都可以顯式地清除緩存項, 而不是等着觸發回收/被回收:

  • 單個清除: Cache.invalidate(key)
  • 批量清除: Cache.invalidateAll(key)
  • 清除所有緩存項: Cache.invalidateAll()

移除監聽器

通過CacheBuilder.removeListener(RemovalListener), 你可以定義一個監聽器, 友善在緩存項被删除的時候做一些額外操作, removalListener,會獲得移除通知removalNotification, 其中包含移除原因removalCause, 鍵/值.

.removalListener((RemovalListener<String, Long>) notification -> {
        System.out.println("删除監聽器: key, value" + notification.getKey() + "," + notification.getValue());
    })
           

注意: 預設情況下, 監聽器方法是在移除緩存時同步調用的. 因為緩存的維護和請求響應通常是同時進行的, 代價高昂的監聽器方法在同步模式下會拖慢正常的緩存請求. 此時, 可以使用removalListeners.asynchronous(removalListener, executor)把監聽器裝飾為異步操作. --------------- 有待觀察, 因為沒看過底層

清理什麼時候發生!! 重要!!

使用CacheBuilder建構的緩存是不會自動執行清理和回收工作, 也不會在某個緩存項過期後馬上清理, 也沒有諸如此類的清理機制. 相反, 它會在寫操作時順帶做少量的維護工作, perWriteCleanUp->expiredEntries(write, accessQueue), evictEntries, postCleanUp, 或者偶爾在讀操作時做—如果寫操作實在是太少了.

這樣做的原因是: 如果要自動地持續清理緩存, 就必須有一個線程, 這個線程會和使用者線程競争共享鎖. 此外, 某些環境下線程建立可能受限制, 這樣CacheBuilder其實就不太好用了.

其實這樣做挺好的, 清理選擇權交給自己手裡, 可以手動Cache.cleanUp. ScheduledExecutorService可以幫助你更好的實作定時排程.

重新整理

重新整理和回收不太一樣. 正如loadingCache.refresh(K)所說, 其實就是為鍵加載值, 這個過程其實可以是異步的. 在重新整理操作進行時, 緩存仍然可以向其他線程傳回舊值, 而不像回收操作, 讀緩存的的線程必須等待新值加載完成.

如果重新整理過程抛出異常, 緩存将保留舊值, 而異常會記錄到日志後被丢棄[swallowed].

重載CacheLoader.reload(K, V)可以擴充重新整理時的行為, 這個方法允許開發者在計算新值時使用舊的值. 廢話. 底層其實就是同步執行了load方法, 不過其他線程refresh通路時, 有舊值傳回.

下面是異步線程:

final static ListeningExecutorService service = MoreExecutors.listeningDecorator(Executors.newCachedThreadPool());
    private static final LoadingCache<String, Long> graphs = CacheBuilder.newBuilder()
            .maximumSize(1000)
            .refreshAfterWrite(8, TimeUnit.SECONDS)
            .build(new CacheLoader<String, Long>() {
                @Override
                public Long load(String key) {
                    long timeMillis = System.currentTimeMillis();
                    System.out.println("load, time=" + timeMillis);
                    return timeMillis;
                }

                @Override
                public ListenableFuture<Long> reload(String key, Long oldValue) throws Exception {
                    // asynchronous!
                    ListenableFutureTask<Long> task =
                            ListenableFutureTask.create(new Callable<Long>() {
                                @Override
                                public Long call() throws Exception {
                                    long timeMillis = System.currentTimeMillis();
                                    System.out.println(Thread.currentThread().getName() + " reload, time= " + timeMillis);
                                    // 其實就是異步線程太快了, 以至于10s時有部分線程請求到了新值, 我們可以人為讓異步線程放慢, 這時, 全部線程都會拿到舊值.
                                    Thread.sleep(TimeUnit.SECONDS.toMillis(2));
                                    return timeMillis;
                                }
                            });
                    service.execute(task);
                    return task;
                }
            });
    main開始請求:
       new Thread(() -> {
            try {
                Thread.sleep(TimeUnit.SECONDS.toMillis(0));
                graphs.get(key1);
            } catch (InterruptedException | ExecutionException e) {
                e.printStackTrace();
            }

        }).start();

        new Thread(() -> {
            try {
                Thread.sleep(TimeUnit.SECONDS.toMillis(10));
                Long a1 = graphs.get(key1);
                System.out.println(Thread.currentThread().getName() + " a1 " + a1);
            } catch (InterruptedException | ExecutionException e) {
                e.printStackTrace();
            }

        }).start();
        new Thread(() -> {
            try {
                Thread.sleep(TimeUnit.SECONDS.toMillis(12));
                Long a2 = graphs.get(key1);
                System.out.println(Thread.currentThread().getName() + " a2 " + a2);
            } catch (InterruptedException | ExecutionException e) {
                e.printStackTrace();
            }

        }).start();
    // a1, a2 都拿到的是舊值

           

注意: CacheBuilder.refreshAfterWrite(long, TimeUnit) 可以為緩存定時自動的重新整理緩存. 緩存項隻有被檢索/通路時才會真正重新整理, 這一點就不在贅述了. 上面有原因.

其他特性

統計

CacheBuilder.recordStats() 用來開啟Guava Cache統計功能. 統計打開後, Cache.stats() 方法會傳回CacheStats 對象以提供如下統計資訊:

  • hitRate(): 緩存命中率;
  • averageLoadPenalty(): 加載新值的平均時間, 機關為ns.
  • evictionCount(): 緩存項被回收的總數, 不包括顯示清楚.

    此外,還有其他很多統計資訊。這些統計資訊對于調整緩存設定是至關重要的,在性能要求高的應用中我們建議

    密切關注這些資料。

adMap視圖

asMap 視圖提供了緩存的ConcurrentMap 形式, 但asMap視圖和緩存互動需要注意:

  • cache.asMap() 包含目前所有加載到緩存的項. 是以相應地, cache.asMap().keySet() 包含目前所有已加載鍵;
  • asMap().get(key) 實質上等同于cache.getIfPresent(key), 而且不會引起緩存項的加載. 這和map的約定是一緻的.
  • 所有讀寫操作都會重置相關緩存項的通路時間, cache.asMap().get, put, 但是不包括.containsKey(), 還有一些對Cache.asMap()的集合視圖上的操作, 比如asMap().entrySet()不會重置緩存項的讀取時間.

中斷

待處理

refreshAfterWrite 和expiredAfterWrite 一般怎樣設定/關系, 業務

四種引用

  • 強引用: 建立一個對象并給它指派一個引用, 引用是存在于jvm的棧中.
  • 軟引用: 如果一個對象具有軟引用, 記憶體空間足夠, 不回收, 否則, 回收. 看記憶體空間自己的度量.
  • 弱引用: 垃圾回收器才不管你記憶體空間夠不夠, 直接回收. 看垃圾回收器心情.
  • 虛引用: 虛引用其實和上面的軟引用和弱引不同, 它并不影響對象的生命周期. 如果一個對象與虛引用關聯, 則跟沒有引用之前一樣, 在任何時候都可能被垃圾回收器回收.
注: 虛引用(PhantomReference)的必須和引用隊列(ReferenceQueue)一起使用, 當垃圾回收器準備回收一個對象時, 如果發現它還有虛引用, 就會把這個虛引用加入引用隊列. 程式可以通過判斷引用隊列中是否加入了虛引用, 來了解被引用的對象是否将要被垃圾回收. 如果程式發現某個虛引用已經被加入了引用隊列, 那麼就可以在所引用的對象的記憶體被回收之前采取必要的行動.

異步線程 (看他源碼)

  1. 怎樣建立一個異步線程
final static ListeningExecutorService service = MoreExecutors.listeningDecorator(Executors.newCachedThreadPool());
ListenableFutureTask<Long> task =
        ListenableFutureTask.create(new Callable<Long>() {
            @Override
            public Long call() throws Exception {
                return timeMillis;
            }
        });
service.execute(task);
return task;
           
  1. feture->listenFeture

concurrentHashMap1.7 -> 1.8

本文參考:

https://blog.csdn.net/zjccsg/article/details/51932252

https://www.jianshu.com/p/38bd5f1cf2f2

http://www.qishunwang.net/knowledge_show_177365.aspx