天天看點

#yyds幹貨盤點#隊列同步器AQS

隊列同步器AQS

同步器的設計是基于模闆方法模式的,

重寫同步器指定的方法時,需要使用同步器提供的如下3個方法來通路或修改同步狀态。

  • getState():擷取目前同步狀态。
  • setState(int newState):設定目前同步狀态。
  • compareAndSetState(int expect,int update):使用CAS設定目前狀态,該方法能夠保證狀态設定的原子性。

同步器依賴内部的同步隊列(一個FIFO雙向隊列)來完成同步狀态的管理,目前線程擷取同步狀态失敗時,同步器會将目前線程以及等待狀态等資訊構造成為一個節點(Node)并将其加入同步隊列,同時會阻塞目前線程,當同步狀态釋放時,會把首節點中的線程喚醒,使其再次嘗試擷取同步狀态。

獨占式同步狀态擷取與釋放

public final void acquire(int arg) {
    if (!tryAcquire(arg) &&
        acquireQueued(addWaiter(Node.EXCLUSIVE), arg))
        selfInterrupt();
}
           
private Node addWaiter(Node mode) {
    Node node = new Node(Thread.currentThread(), mode);
    // Try the fast path of enq; backup to full enq on failure
    Node pred = tail;
    if (pred != null) {
        node.prev = pred;
        if (compareAndSetTail(pred, node)) {
            pred.next = node;
            return node;
        }
    }
    enq(node);
    return node;
}
           
final boolean acquireQueued(final Node node, int arg) {
    boolean failed = true;
    try {
        boolean interrupted = false;
        for (;;) {
            final Node p = node.predecessor();
            if (p == head && tryAcquire(arg)) {
                setHead(node);
                p.next = null; // help GC
                failed = false;
                return interrupted;
            }
            if (shouldParkAfterFailedAcquire(p, node) &&
                parkAndCheckInterrupt())
                interrupted = true;
        }
    } finally {
        if (failed)
            cancelAcquire(node);
    }
}
           

獨占式同步鎖擷取流程:

#yyds幹貨盤點#隊列同步器AQS

通過調用同步器的release(int arg)方法可以釋放同步狀态,該方法在釋放了同步狀态之後,會喚醒其後繼節點(進而使後繼節點重新嘗試擷取同步狀态)。

public final boolean release(int arg) {
    if (tryRelease(arg)) {
        Node h = head;
        if (h != null && h.waitStatus != 0)
            unparkSuccessor(h);
        return true;
    }
    return false;
}
           
private void unparkSuccessor(Node node) {
    /*
     * If status is negative (i.e., possibly needing signal) try
     * to clear in anticipation of signalling.  It is OK if this
     * fails or if status is changed by waiting thread.
     */
    int ws = node.waitStatus;
    if (ws < 0)
        compareAndSetWaitStatus(node, ws, 0);

    /*
     * Thread to unpark is held in successor, which is normally
     * just the next node.  But if cancelled or apparently null,
     * traverse backwards from tail to find the actual
     * non-cancelled successor.
     */
    Node s = node.next;
    if (s == null || s.waitStatus > 0) {
        s = null;
        for (Node t = tail; t != null && t != node; t = t.prev)
            if (t.waitStatus <= 0)
                s = t;
    }
    if (s != null)
        LockSupport.unpark(s.thread);
}
           

unparkSuccessor(Node node)方法使用LockSupport來喚醒處于等待狀态的線程。

在擷取同步狀态時,同步器維護一個同步隊列,擷取狀态失敗的線程都會被加入到隊列中并在隊列中進行自旋;移出隊列(或停止自旋)的條件是前驅節點為頭節點且成功擷取了同步狀态。在釋放同步狀态時,同步器調用tryRelease(int arg)方法釋放同步狀态,然後喚醒頭節點的後繼節點。

共享式同步狀态擷取與釋放

共享式擷取與獨占式擷取最主要的差別在于同一時刻能否有多個線程同時擷取到同步狀态。以檔案的讀寫為例,如果一個程式在對檔案進行讀操作,那麼這一時刻對于該檔案的寫操作均被阻塞,而讀操作能夠同時進行。寫操作要求對資源的獨占式通路,而讀操作可以是共享式通路,兩種不同的通路模式在同一時刻對檔案或資源的通路情況。

通過調用同步器的acquireShared(int arg)方法可以共享式地擷取同步狀态

public final void acquireShared(int arg) {
    if (tryAcquireShared(arg) < 0)
        doAcquireShared(arg);
}
           
private void doAcquireShared(int arg) {
    final Node node = addWaiter(Node.SHARED);
    boolean failed = true;
    try {
        boolean interrupted = false;
        for (;;) {
            final Node p = node.predecessor();
            if (p == head) {
                int r = tryAcquireShared(arg);
                if (r >= 0) {
                    setHeadAndPropagate(node, r);
                    p.next = null; // help GC
                    if (interrupted)
                        selfInterrupt();
                    failed = false;
                    return;
                }
            }
            if (shouldParkAfterFailedAcquire(p, node) &&
                parkAndCheckInterrupt())
                interrupted = true;
        }
    } finally {
        if (failed)
            cancelAcquire(node);
    }
}
           

在acquireShared(int arg)方法中,同步器調用tryAcquireShared(int arg)方法嘗試擷取同步狀态,tryAcquireShared(int arg)方法傳回值為int類型,當傳回值大于等于0時,表示能夠擷取到同步狀态。

是以,在共享式擷取的自旋過程中,成功擷取到同步狀态并退出自旋的條件就是tryAcquireShared(int arg)方法傳回值大于等于0。可以看到,在doAcquireShared(int arg)方法的自旋過程中,如果目前節點的前驅為頭節點時,嘗試擷取同步狀态,如果傳回值大于等于0,表示該次擷取同步狀态成功并從自旋過程中退出。

通過調用releaseShared(int arg)方法可以釋放同步狀态,

public final boolean releaseShared(int arg) {
    if (tryReleaseShared(arg)) {
        doReleaseShared();
        return true;
    }
    return false;
}
           

該方法在釋放同步狀态之後,将會喚醒後續處于等待狀态的節點。對于能夠支援多個線程同時通路的并發元件(比如Semaphore),它和獨占式主要差別在于tryReleaseShared(int arg)方法必須確定同步狀态(或者資源數)線程安全釋放,一般是通過循環和CAS來保證的,因為釋放同步狀态的操作會同時來自多個線程。

獨占式逾時擷取同步狀态

通過調用同步器的doAcquireNanos(int arg,long nanosTimeout)方法可以逾時擷取同步狀

态,即在指定的時間段内擷取同步狀态,如果擷取到同步狀态則傳回true,否則,傳回false。

private boolean doAcquireNanos(int arg, long nanosTimeout)
        throws InterruptedException {
    if (nanosTimeout <= 0L)
        return false;
    final long deadline = System.nanoTime() + nanosTimeout;
    final Node node = addWaiter(Node.EXCLUSIVE);
    boolean failed = true;
    try {
        for (;;) {
            final Node p = node.predecessor();
            if (p == head && tryAcquire(arg)) {
                setHead(node);
                p.next = null; // help GC
                failed = false;
                return true;
            }
            nanosTimeout = deadline - System.nanoTime();
            if (nanosTimeout <= 0L)
                return false;
            if (shouldParkAfterFailedAcquire(p, node) &&
                nanosTimeout > spinForTimeoutThreshold)
                LockSupport.parkNanos(this, nanosTimeout);
            if (Thread.interrupted())
                throw new InterruptedException();
        }
    } finally {
        if (failed)
            cancelAcquire(node);
    }
}
           

該方法在自旋過程中,當節點的前驅節點為頭節點時嘗試擷取同步狀态,如果擷取成功則從該方法傳回,這個過程和獨占式同步擷取的過程類似,但是在同步狀态擷取失敗的處理上有所不同。如果目前線程擷取同步狀态失敗,則判斷是否逾時(nanosTimeout小于等于0表示已經逾時),如果沒有逾時,重新計算逾時間隔nanosTimeout,然後使目前線程等待nanosTimeout納秒(當已到設定的逾時時間,該線程會從LockSupport.parkNanos(Objectblocker,long nanos)方法傳回)。

acquire(int args)在未擷取到同步狀态時,将會使目前線程一直處于等待狀态

doAcquireNanos(int arg,long nanosTimeout)會使目前線程等待nanosTimeout納秒,如果目前線程在nanosTimeout納秒内沒有擷取到同步狀态,将會從等待邏輯中自動傳回。

獨占式逾時擷取同步狀态的流程:

#yyds幹貨盤點#隊列同步器AQS

自定義同步元件——TwinsLock

該工具在同一時刻,隻允許至多兩個線程同時通路,超過兩個線程的通路将被阻塞,我們将這個同步工具命名為TwinsLock。

共享式通路,重寫tryAcquireShared(int args)方法和tryReleaseShared(int args)方法

package com.example.xppdemo.chapter4;

import java.util.concurrent.TimeUnit;
import java.util.concurrent.locks.AbstractQueuedSynchronizer;
import java.util.concurrent.locks.Condition;
import java.util.concurrent.locks.Lock;

public class TwinsLock implements Lock {
    private final Sync sync = new Sync(2);

    private static final class Sync extends AbstractQueuedSynchronizer {
        Sync(int count) {
            if (count <= 0) {
                throw new IllegalArgumentException("count must large than zero.");
            }
            setState(count);
        }

        public int tryAcquireShared(int reduceCount) {
            for (; ; ) {
                int current = getState();
                int newCount = current - reduceCount;
                if (newCount < 0 || compareAndSetState(current,
                        newCount)) {
                    return newCount;
                }
            }
        }

        public boolean tryReleaseShared(int returnCount) {
            for (; ; ) {
                int current = getState();
                int newCount = current + returnCount;
                if (compareAndSetState(current, newCount)) {
                    return true;
                }
            }
        }
    }

    public void lock() {
        sync.acquireShared(1);
    }

    @Override
    public void lockInterruptibly() throws InterruptedException {

    }

    @Override
    public boolean tryLock() {
        return false;
    }

    @Override
    public boolean tryLock(long time, TimeUnit unit) throws InterruptedException {
        return false;
    }

    public void unlock() {
        sync.releaseShared(1);
    }

    @Override
    public Condition newCondition() {
        return null;
    }
}
           
package com.example.xppdemo.chapter4;


import java.util.concurrent.locks.Lock;

public class TwinsLockTest {
    public static void main(String[] args) {
        test();
    }
    public static void test() {
        final Lock lock = new TwinsLock();
        class Worker extends Thread {
            public void run() {
                while (true) {
                    lock.lock();
                    try {
                        SleepUtils.second(1);
                        System.out.println(Thread.currentThread().getName());
                        SleepUtils.second(1);
                    } finally {
                        lock.unlock();
                    }
                }
            }
        }
// 啟動10個線程
        for (int i = 0; i < 10; i++) {
            Worker w = new Worker();
            w.setDaemon(true);
            w.start();
        }
// 每隔1秒換行
        for (int i = 0; i < 10; i++) {
            SleepUtils.second(1);
            System.out.println();
        }
    }
}
           
Thread-3

Thread-2

Thread-2
Thread-3


Thread-3
Thread-2



Thread-3
Thread-2

Thread-3
Thread-2