天天看點

synchronized和ReentrantLock實作原理和差別

在JDK1.5之前共享對象的協調機制隻有synchronized和volatile,在JDK1.5中增加了新的機制ReentrantLock,該機制的誕生并不是為了替代synchronized,而是在 synchronized 不适用的情況下,提供一種可以選擇的進階功能。

synchronized 和 ReentrantLock 是如何實作的?它們有什麼差別?

synchronized 屬于獨占式悲觀鎖,是通過 JVM 隐式實作的,synchronized 隻允許同一時刻隻有一個線程操作資源。

在 Java 中每個對象都隐式包含一個 monitor(螢幕)對象,加鎖的過程其實就是競争 monitor 的過程,當線程進入位元組碼 monitorenter 指令之後,線程将持有 monitor 對象,執行 monitorexit 時釋放 monitor 對象,當其他線程沒有拿到 monitor 對象時,則需要阻塞等待擷取該對象。

ReentrantLock 是 Lock 的預設實作方式之一,它是基于 AQS(Abstract Queued Synchronizer,隊列同步器)實作的,它預設是通過非公平鎖實作的,在它的内部有一個 state 的狀态字段用于表示鎖是否被占用,如果是 0 則表示鎖未被占用,此時線程就可以把 state 改為 1,并成功獲得鎖,而其他未獲得鎖的線程隻能去排隊等待擷取鎖資源。

synchronized 和 ReentrantLock 都提供了鎖的功能,具備互斥性和不可見性。在 JDK 1.5 中 synchronized 的性能遠遠低于 ReentrantLock,但在 JDK 1.6 之後 synchronized 的性能略低于 ReentrantLock,它的差別如下:

synchronized是JVM隐式實作的,而ReentrantLock是Java語言提供的API;
ReentrantLock可設定為公平鎖,而synchronized卻不行;
ReentrantLock 隻能修飾代碼塊,而 synchronized 可以用于修飾方法、修飾代碼塊等;
ReentrantLock 需要手動加鎖和釋放鎖,如果忘記釋放鎖,則會造成資源被永久占用,而 synchronized 無需手動釋放鎖;
ReentrantLock 可以知道是否成功獲得了鎖,而 synchronized  卻不行。
           

ReentrantLock 源碼分析

構造函數:

/**
     * Creates an instance of {@code ReentrantLock}.
     * This is equivalent to using {@code ReentrantLock(false)}.
     */
    public ReentrantLock() {
        sync = new NonfairSync();// 無參構造方法建立的鎖為非公平鎖
    }

    /**
     * Creates an instance of {@code ReentrantLock} with the
     * given fairness policy.
     *
     * @param fair {@code true} if this lock should use a fair ordering policy
     */
    public ReentrantLock(boolean fair) {
        sync = fair ? new FairSync() : new NonfairSync();
    }
    無參的構造函數建立了一個非公平鎖,使用者也可以根據第二個構造函數,設定一個 boolean 類型的值,來決定是否使用公平鎖來實作線程的排程。
           

公平鎖和非公平鎖的差別:

公平鎖的含義是線程需要按照請求的順序來獲得鎖;而非公平鎖則允許“插隊”的情況存在,所謂的“插隊”指的是,線程在發送請求的同時該鎖的狀态恰好變成了可用,那麼此線程就可以跳過隊列中所有排隊的線程直接擁有該鎖。

而公平鎖由于有挂起和恢複是以存在一定的開銷,是以性能不如非公平鎖,是以 ReentrantLock 和 synchronized 預設都是非公平鎖的實作方式。

ReentrantLock 是通過 lock() 來擷取鎖,并通過 unlock() 釋放鎖,使用代碼如下:

Lock lock = new ReentrantLock();
        try {
            //  加鎖
            lock.lock();
            // 業務處理
        } finally {
            // 釋放鎖
            lock.unlock();
        }
    }
           

ReentrantLock 中的 lock() 是通過 sync.lock() 實作的,但 Sync 類中的 lock() 是一個抽象方法,需要子類 NonfairSync 或 FairSync 去實作

sync的源碼如下:

abstract static class Sync extends AbstractQueuedSynchronizer {
        private static final long serialVersionUID = -5179523762034025860L;

        /**
         * Performs {@link Lock#lock}. The main reason for subclassing
         * is to allow fast path for nonfair version.
         */
        abstract void lock();
        /**
         * Performs non-fair tryLock.  tryAcquire is implemented in
         * subclasses, but both need nonfair try for trylock method.
         */
        final boolean nonfairTryAcquire(int acquires) {
            final Thread current = Thread.currentThread();
            int c = getState();
            if (c == 0) {
                if (compareAndSetState(0, acquires)) {
                    setExclusiveOwnerThread(current);
                    return true;
                }
            }
            else if (current == getExclusiveOwnerThread()) {
                int nextc = c + acquires;
                if (nextc < 0) // overflow
                    throw new Error("Maximum lock count exceeded");
                setState(nextc);
                return true;
            }
            return false;
        }

        protected final boolean tryRelease(int releases) {
            int c = getState() - releases;
            if (Thread.currentThread() != getExclusiveOwnerThread())
                throw new IllegalMonitorStateException();
            boolean free = false;
            if (c == 0) {
                free = true;
                setExclusiveOwnerThread(null);
            }
            setState(c);
            return free;
        }

        protected final boolean isHeldExclusively() {
            // While we must in general read state before owner,
            // we don't need to do so to check if current thread is owner
            return getExclusiveOwnerThread() == Thread.currentThread();
        }

        final ConditionObject newCondition() {
            return new ConditionObject();
        }

        // Methods relayed from outer class

        final Thread getOwner() {
            return getState() == 0 ? null : getExclusiveOwnerThread();
        }

        final int getHoldCount() {
            return isHeldExclusively() ? getState() : 0;
        }

        final boolean isLocked() {
            return getState() != 0;
        }

        /**
         * Reconstitutes the instance from a stream (that is, deserializes it).
         */
        private void readObject(java.io.ObjectInputStream s)
            throws java.io.IOException, ClassNotFoundException {
            s.defaultReadObject();
            setState(0); // reset to unlocked state
        }
    }
           

NonfairSync 中的源碼如下:

static final class NonfairSync extends Sync {
        private static final long serialVersionUID = 7316153563782823691L;

        /**
         * Performs lock.  Try immediate barge, backing up to normal
         * acquire on failure.
         */
        final void lock() {
            if (compareAndSetState(0, 1))
                setExclusiveOwnerThread(Thread.currentThread());
            else
                acquire(1);
        }

        protected final boolean tryAcquire(int acquires) {
            return nonfairTryAcquire(acquires);
        }
    }
           

FairSync 中的源碼如下:

static final class FairSync extends Sync {
        private static final long serialVersionUID = -3000897897090466540L;

        final void lock() {
            acquire(1);
        }

        /**
         * Fair version of tryAcquire.  Don't grant access unless
         * recursive call or no waiters or is first.
         */
        protected final boolean tryAcquire(int acquires) {
            final Thread current = Thread.currentThread();
            int c = getState();
            if (c == 0) {
            	// 公平鎖比非公平鎖多了一行代碼 !hasQueuedPredecessors()
                if (!hasQueuedPredecessors() &&
                    compareAndSetState(0, acquires)) { // 嘗試擷取鎖
                    setExclusiveOwnerThread(current); // 擷取成功,标記被搶占
                    return true;
                }
            }
            else if (current == getExclusiveOwnerThread()) {
                int nextc = c + acquires;
                if (nextc < 0)
                    throw new Error("Maximum lock count exceeded");
                setState(nextc); // set state = state + 1
                return true;
            }
            return false;
        }
    }
           

從lock()方法中可以看出非公平鎖比公平鎖隻是多了一行compareAndSetState方法,該方法是嘗試将state值由0置換為1,如果設定成功的話,則說明目前沒有其他線程持有該鎖,不用再去排隊了,可直接占用該鎖,否則,則需要通過 acquire 方法去排隊。

tryacquire()方法嘗試擷取鎖,如果擷取鎖失敗,則把它加入到阻塞隊列中。

對于tryacquire()方法,公平鎖比非公平鎖隻多一行代碼 !hasQueuedPredecessors(),它用來檢視隊列中是否有比它等待時間更久的線程,如果沒有,就嘗試一下是否能擷取到鎖,如果擷取成功,則标記為已經被占用。

如果擷取鎖失敗,則調用 addWaiter 方法把線程包裝成 Node 對象,同時放入到隊列中,但 addWaiter 方法并不會嘗試擷取鎖,acquireQueued 方法才會嘗試擷取鎖,如果擷取失敗,則此節點會被挂起。

AbstractQueuedSynchronizer類源碼如下:

addWaiter() 方法

/**
     * Creates and enqueues node for current thread and given mode.
     *
     * @param mode Node.EXCLUSIVE for exclusive, Node.SHARED for shared
     * @return the new node
     */
    private Node addWaiter(Node mode) {
        Node node = new Node(Thread.currentThread(), mode);
        // Try the fast path of enq; backup to full enq on failure
        Node pred = tail;
        if (pred != null) {
            node.prev = pred;
            if (compareAndSetTail(pred, node)) {
                pred.next = node;
                return node;
            }
        }
        enq(node);
        return node;
	}
           

acquireQueued()方法源碼:

/**
 * 隊列中的線程嘗試擷取鎖,失敗則會被挂起
 */
final boolean acquireQueued(final Node node, int arg) {
        boolean failed = true; // 擷取鎖是否成功的辨別
        try {
            boolean interrupted = false; // 線程是否被中斷
            for (;;) {
            	// 擷取前一個節點(前驅節點)
                final Node p = node.predecessor();
                // 目前節點為頭節點的下一個節點時,有權嘗試擷取鎖
                if (p == head && tryAcquire(arg)) {
                    setHead(node); // 擷取成功,将目前節點設定為 head 節點
                    p.next = null; // help GC 原 head 節點出隊,等待被 GC
                    failed = false; // 擷取成功
                    return interrupted;
                }
                // 判斷擷取鎖失敗後是否可以挂起
                if (shouldParkAfterFailedAcquire(p, node) &&
                    parkAndCheckInterrupt())
                    // 線程若被中斷,傳回 true
                    interrupted = true;
            }
        } finally {
            if (failed)
                cancelAcquire(node);
        }
    }
           

該方法會使用 for(;;)無限循環的方式來嘗試擷取鎖,若擷取失敗,則調用 shouldParkAfterFailedAcquire 方法,嘗試挂起目前線程

shouldParkAfterFailedAcquire ()方法源碼:

/**
     * Checks and updates status for a node that failed to acquire.
     * Returns true if thread should block. This is the main signal
     * control in all acquire loops.  Requires that pred == node.prev.
     *
     * @param pred node's predecessor holding status
     * @param node the node
     * @return {@code true} if thread should block
     *  判斷線程是否可以被挂起
     */
    private static boolean shouldParkAfterFailedAcquire(Node pred, Node node) {
    	// 獲得前驅節點的狀态
        int ws = pred.waitStatus;
        // 前驅節點的狀态為SIGNAL,目前線程可以被挂起(阻塞)
        if (ws == Node.SIGNAL)
            /*
             * This node has already set status asking a release
             * to signal it, so it can safely park.
             */
            return true;
        if (ws > 0) {
            /*
             * Predecessor was cancelled. Skip over predecessors and
             * indicate retry.
             */
            do {
            // 若前驅節點狀态為 CANCELLED,那就一直往前找,直到找到一個正常等待的狀态為止
                node.prev = pred = pred.prev;
            } while (pred.waitStatus > 0);
            // 并将目前節點排在它後邊
            pred.next = node;
        } else {
            /*
             * waitStatus must be 0 or PROPAGATE.  Indicate that we
             * need a signal, but don't park yet.  Caller will need to
             * retry to make sure it cannot acquire before parking.
             * 把前驅節點的狀态修改為 SIGNA
             */
            compareAndSetWaitStatus(pred, ws, Node.SIGNAL);
        }
        return false;
    }
           

線程入列被挂起的前提條件是,前驅節點的狀态為 SIGNAL,SIGNAL 狀态的含義是後繼節點處于等待狀态,目前節點釋放鎖後将會喚醒後繼節點。是以在上面這段代碼中,會先判斷前驅節點的狀态,如果為 SIGNAL,則目前線程可以被挂起并傳回 true;如果前驅節點的狀态 >0,則表示前驅節點取消了,這時候需要一直往前找,直到找到最近一個正常等待的前驅節點,然後把它作為自己的前驅節點;如果前驅節點正常(未取消),則修改前驅節點狀态為 SIGNAL。

到這裡整個加鎖的流程就已經走完了,最後的情況是,沒有拿到鎖的線程會在隊列中被挂起,直到擁有鎖的線程釋放鎖之後,才會去喚醒其他的線程去擷取鎖資源,整個運作流程如下圖所示:

synchronized和ReentrantLock實作原理和差別

unlock 相比于 lock 來說就簡單很多了,源碼如下:

public void unlock() {
        sync.release(1);
    }
    public final boolean release(int arg) {
    	// 嘗試釋放鎖
        if (tryRelease(arg)) {
            // 釋放成功
            Node h = head;
            if (h != null && h.waitStatus != 0)
                unparkSuccessor(h);
            return true;
        }
        return false;
    }
           

鎖的釋放流程為,先調用 tryRelease 方法嘗試釋放鎖,如果釋放成功,則檢視頭結點的狀态是否為SIGNAL,如果是,則喚醒頭結點的下個節點關聯的線程;如果釋放鎖失敗,則傳回 false。

tryRelease 源碼如下:

/**
 * 嘗試釋放目前線程占有的鎖
 */
protected final boolean tryRelease(int releases) {
            int c = getState() - releases; // 釋放鎖後的狀态,0 表示釋放鎖成功
			// 如果擁有鎖的線程不是目前線程的話抛出異常
            if (Thread.currentThread() != getExclusiveOwnerThread())
                throw new IllegalMonitorStateException();
            boolean free = false;
            if (c == 0) { // 鎖被成功釋放
                free = true;
                setExclusiveOwnerThread(null); // 清空獨占線程
            }
            setState(c); // 更新 state 值,0 表示為釋放鎖成功
            return free;
        }
           

在 tryRelease 方法中,會先判斷目前的線程是不是占用鎖的線程,如果不是的話,則會抛出異常;如果是的話,則先計算鎖的狀态值 getState() - releases 是否為 0,如果為 0,則表示可以正常的釋放鎖,然後清空獨占的線程,最後會更新鎖的狀态并傳回執行結果。

JDK 1.6 鎖優化

自适應自旋鎖

JDK 1.5 在更新為 JDK 1.6 時,HotSpot 虛拟機團隊在鎖的優化上下了很大功夫,比如實作了自适應式自旋鎖、鎖更新等。

JDK 1.6 引入了自适應式自旋鎖意味着自旋的時間不再是固定的時間了,比如在同一個鎖對象上,如果通過自旋等待成功擷取了鎖,那麼虛拟機就會認為,它下一次很有可能也會成功 (通過自旋擷取到鎖),是以允許自旋等待的時間會相對的比較長,而當某個鎖通過自旋很少成功獲得過鎖,那麼以後在擷取該鎖時,可能會直接忽略掉自旋的過程,以避免浪費 CPU 的資源,這就是自适應自旋鎖的功能。

鎖更新

鎖更新其實就是從偏向鎖到輕量級鎖再到重量級鎖更新的過程,這是 JDK 1.6 提供的優化功能,也稱之為鎖膨脹。

偏向鎖是指在無競争的情況下設定的一種鎖狀态。偏向鎖的意思是它會偏向于第一個擷取它的線程,當鎖對象第一次被擷取到之後,會在此對象頭中設定标示為“01”,表示偏向鎖的模式,并且在對象頭中記錄此線程的 ID,這種情況下,如果是持有偏向鎖的線程每次在進入的話,不再進行任何同步操作,如 Locking、Unlocking 等,直到另一個線程嘗試擷取此鎖的時候,偏向鎖模式才會結束,偏向鎖可以提高帶有同步但無競争的程式性能。但如果在多數鎖總會被不同的線程通路時,偏向鎖模式就比較多餘了,此時可以通過 -XX:-UseBiasedLocking 來禁用偏向鎖以提高性能。

輕量鎖是相對于重量鎖而言的,在 JDK 1.6 之前,synchronized 是通過作業系統的互斥量(mutex lock)來實作的,這種實作方式需要在使用者态和核心态之間做轉換,有很大的性能消耗,這種傳統實作鎖的方式被稱之為重量鎖。

而輕量鎖是通過比較并交換(CAS,Compare and Swap)來實作的,它對比的是線程和對象的 Mark Word(對象頭中的一個區域),如果更新成功則表示目前線程成功擁有此鎖;如果失敗,虛拟機會先檢查對象的 Mark Word 是否指向目前線程的棧幀,如果是,則說明目前線程已經擁有此鎖,否則,則說明此鎖已經被其他線程占用了。當兩個以上的線程争搶此鎖時,輕量級鎖就膨脹為重量級鎖,這就是鎖更新的過程,也是 JDK 1.6 鎖優化的内容。