天天看點

Java源碼解讀-資料容器都是如何實作同步的

用Java的同學可能在自己使用或者面試的時候經常遇到這麼一個問題, 哪些資料結構或者容器是同步的, 是怎麼實作的同步?

其實很多的資料同步原理都比較簡單, 我把目前知道的資料容器的同步方式稍微梳理了一下

1. 線程安全容器

StringBuffer(太明顯,synchronized關鍵字)

@Override
    public synchronized StringBuffer append(String str) {
        toStringCache = null;
        super.append(str);
        return this;
    }
           

HashTable(額,一樣,synchronized關鍵字)

public synchronized V put(K key, V value) {
        ...
}
           

Vector(額, 還是一樣,synchronized關鍵字)

public synchronized boolean add(E e) {
        ...
          
}
           
ConcurrentHashMap (相對高效,針對某個節點加鎖,高端了點點)
           
public V put(K key, V value) {
        return putVal(key, value, false);
 }

 final V putVal(K key, V value, boolean onlyIfAbsent) {
        if (key == null || value == null) throw new NullPointerException();
        int hash = spread(key.hashCode());
        int binCount = 0;
        for (Node<K,V>[] tab = table;;) {
            Node<K,V> f; int n, i, fh;
            if (tab == null || (n = tab.length) == 0)
                tab = initTable();
            else if ((f = tabAt(tab, i = (n - 1) & hash)) == null) {
                if (casTabAt(tab, i, null,
                             new Node<K,V>(hash, key, value, null)))
                    break;                   // no lock when adding to empty bin
            }
            else if ((fh = f.hash) == MOVED)
                tab = helpTransfer(tab, f);
            else {
                V oldVal = null;
                //看這裡,這個叫分片鎖
                synchronized (f) {
                    if (tabAt(tab, i) == f) {
                        if (fh >= 0) {
                            binCount = 1;
                            for (Node<K,V> e = f;; ++binCount) {
                                K ek;
                                if (e.hash == hash &&
                                    ((ek = e.key) == key ||
                                     (ek != null && key.equals(ek)))) {
                                    oldVal = e.val;
                                    if (!onlyIfAbsent)
                                        e.val = value;
                                    break;
                                }
                                Node<K,V> pred = e;
                                if ((e = e.next) == null) {
                                    pred.next = new Node<K,V>(hash, key,
                                                              value, null);
                                    break;
                                }
                            }
                        }
                        else if (f instanceof TreeBin) {
                            Node<K,V> p;
                            binCount = 2;
                            if ((p = ((TreeBin<K,V>)f).putTreeVal(hash, key,
                                                           value)) != null) {
                                oldVal = p.val;
                                if (!onlyIfAbsent)
                                    p.val = value;
                            }
                        }
                    }
                }
                if (binCount != 0) {
                    if (binCount >= TREEIFY_THRESHOLD)
                        treeifyBin(tab, i);
                    if (oldVal != null)
                        return oldVal;
                    break;
                }
            }
        }
        addCount(1L, binCount);
        return null;
    }
           

 CopyOnWriteArrayList (使用ReentrantLock, 進行加鎖)

public boolean add(E e) {
        final ReentrantLock lock = this.lock;
        lock.lock();
        try {
            Object[] elements = getArray();
            int len = elements.length;
            Object[] newElements = Arrays.copyOf(elements, len + 1);
            newElements[len] = e;
            setArray(newElements);
            return true;
        } finally {
            lock.unlock();
        }
    }
           
BlockingQueue 阻塞隊列
           
//資料插入
public boolean offer(E e) {
        if (e == null) throw new NullPointerException();
        final AtomicInteger count = this.count;
        if (count.get() == capacity)
            //超過容量,插入失敗
            return false;
        int c = -1;
        Node<E> node = new Node<E>(e);
        final ReentrantLock putLock = this.putLock;
        //資料插入鎖,保證資料插入的同步
        putLock.lock();
        try {
            if (count.get() < capacity) {
                enqueue(node);
                c = count.getAndIncrement();
                if (c + 1 < capacity)
                    notFull.signal();
            }
        } finally {
            putLock.unlock();
        }
        if (c == 0)
            signalNotEmpty();
        return c >= 0;
    }
           
//資料擷取,類pop操作
public E poll() {
        final AtomicInteger count = this.count;
        if (count.get() == 0)
            return null;
        E x = null;
        int c = -1;
        final ReentrantLock takeLock = this.takeLock;
        //資料擷取鎖,防止資料擷取異常
        takeLock.lock();
        try {
            if (count.get() > 0) {
                x = dequeue();
                //隊列元素減少,AtomicInteger原子操作
                c = count.getAndDecrement();
                if (c > 1)
                    notEmpty.signal();
            }
        } finally {
            takeLock.unlock();
        }
        if (c == capacity)
            signalNotFull();
        return x;
    }
           

2. 容器同步化

Collections.synchronizedXXX(<資料容器>)

...
final Object mutex;     //使用者控制同步的對象鎖
//構造方法
SynchronizedCollection(Collection<E> c) {
      this.c = Objects.requireNonNull(c);
      mutex = this;
}
           
//接上面的操作,看出來沒有沒?就是以目前容器作為對象鎖, 在操作的時候通過鎖定對象實作線程安全
...
public boolean add(E e) {
     synchronized (mutex) {return c.add(e);}
}
public boolean remove(Object o) {
     synchronized (mutex) {return c.remove(o);}
}
...
           

關于線程部分的代碼目前還沒看到,後續補上

你們還知道哪些資料容器的加鎖方式, 可以給我評論

未完,有機會再續

繼續閱讀