天天看點

【高并發】java JUC中的Semaphore(信号量)1.概述3.Semaphore主要方法其他一些使用說明擴充閱讀

【高并發】java JUC中的Semaphore(信号量)1.概述3.Semaphore主要方法其他一些使用說明擴充閱讀

1.概述

轉載:2.Semaphore常用場景:限流

舉個例子:

比如有個停車場,有5個空位,門口有個門衛,手中5把鑰匙分别對應5個車位上面的鎖,來一輛車,門衛會給司機一把鑰匙,然後進去找到對應的車位停下來,出去的時候司機将鑰匙歸還給門衛。停車場生意比較好,同時來了100兩車,門衛手中隻有5把鑰匙,同時隻能放5輛車進入,其他車隻能等待,等有人将鑰匙歸還給門衛之後,才能讓其他車輛進入。

上面的例子中門衛就相當于Semaphore,車鑰匙就相當于許可證,車就相當于線程。

3.Semaphore主要方法

方法 解釋
Semaphore(int permits) 構造方法,參數表示許可證數量,用來建立信号量
Semaphore(int permits,boolean fair) 構造方法,當fair等于true時,建立具有給定許可數的計數信号量并設定為公平信号量
void acquire() 從此信号量擷取1個許可前線程将一直阻塞,相當于一輛車占了一個車位,此方法會響應線程中斷,表示調用線程的interrupt方法,會使該方法抛出InterruptedException異常
void acquire(int permits) 和acquire()方法類似,參數表示需要擷取許可的數量;比如一個大卡車要入停車場,由于車比較大,需要申請3個車位才可以停放
void acquireUninterruptibly(int permits) 和acquire(int permits) 方法類似,隻是不會響應線程中斷
boolean tryAcquire() 嘗試擷取1個許可,不管是否能夠擷取成功,都立即傳回,true表示擷取成功,false表示擷取失敗
boolean tryAcquire(int permits) 和tryAcquire(),表示嘗試擷取permits個許可
boolean tryAcquire(long timeout, TimeUnit unit) 嘗試在指定的時間内擷取1個許可,擷取成功傳回true,指定的時間過後還是無法擷取許可,傳回false
boolean tryAcquire(int permits, long timeout, TimeUnit unit) 和tryAcquire(long timeout, TimeUnit unit)類似,多了一個permits參數,表示嘗試擷取permits個許可
void release() 釋放一個許可,将其傳回給信号量,相當于車從停車場出去時将鑰匙歸還給門衛
void release(int n) 釋放n個許可
int availablePermits() 目前可用的許可數

3.1 示例1:Semaphore簡單的使用

static Semaphore semaphore = new Semaphore(2);

    public static class T extends Thread {
        public T(String name) {
            super(name);
        }

        @Override
        public void run() {
            Thread thread = Thread.currentThread();
            try {
                semaphore.acquire();
                System.out.println(System.currentTimeMillis() + "," + thread.getName() + ",擷取許可!");
                TimeUnit.SECONDS.sleep(3);
            } catch (InterruptedException e) {
                e.printStackTrace();
            } finally {
                semaphore.release();
                System.out.println(System.currentTimeMillis() + "," + thread.getName() + ",釋放許可!");
            }
        }
    }

    @Test
    public void mainTest() throws InterruptedException {
        for (int i = 0; i < 10; i++) {
            new T("t-" + i).start();
        }

        TimeUnit.SECONDS.sleep(200);
    }
           

輸出

1606485049777,t-0,擷取許可!
1606485049780,t-1,擷取許可!
1606485052781,t-0,釋放許可!
1606485052781,t-2,擷取許可!
1606485052781,t-1,釋放許可!
1606485052781,t-3,擷取許可!
1606485055783,t-2,釋放許可!
1606485055783,t-5,擷取許可!
1606485055783,t-4,擷取許可!
1606485055783,t-3,釋放許可!
1606485058787,t-5,釋放許可!
1606485058787,t-4,釋放許可!
1606485058788,t-6,擷取許可!
1606485058788,t-7,擷取許可!
1606485061788,t-7,釋放許可!
1606485061788,t-9,擷取許可!
1606485061788,t-6,釋放許可!
1606485061788,t-8,擷取許可!
1606485064790,t-8,釋放許可!
1606485064790,t-9,釋放許可!
           

代碼中 newSemaphore(2)建立了許可數量為2的信号量,每個線程擷取1個許可,同時允許兩個線程擷取許可,從輸出中也可以看出,同時有兩個線程可以擷取許可,其他線程需要等待已擷取許可的線程釋放許可之後才能運作。為擷取到許可的線程會阻塞在 acquire()方法上,直到擷取到許可才能繼續。

3.2 示例2:擷取許可之後不釋放

門衛(Semaphore)有點呆,司機進去的時候給了鑰匙,出來的時候不歸還,門衛也不會說什麼。最終結果就是其他車輛都無法進入了。

public class Semaphore2 {
    static Semaphore semaphore = new Semaphore(2);

    public static class T extends Thread {
        public T(String name) {
            super(name);
        }

        @Override
        public void run() {
            Thread thread = Thread.currentThread();
            try {
                semaphore.acquire();
                System.out.println(System.currentTimeMillis() + "," + thread.getName() + ",擷取許可!");
                TimeUnit.SECONDS.sleep(3);
                System.out.println(System.currentTimeMillis() + "," + thread.getName() + ",運作結束!");
                System.out.println(System.currentTimeMillis() + "," + thread.getName() + ",目前可用許可數量:" + semaphore.availablePermits());
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
        }
    }

    public static void main(String[] args) throws InterruptedException {
        for (int i = 0; i < 10; i++) {
            new T("t-" + i).start();
        }
    }
}
           

輸出

1606485224353,t-0,擷取許可!
1606485224353,t-1,擷取許可!
1606485227357,t-0,運作結束!
1606485227357,t-1,運作結束!
1606485227358,t-0,目前可用許可數量:0
1606485227358,t-1,目前可用許可數量:0
           

上面程式運作後一直無法結束,觀察一下代碼,代碼中擷取許可後,沒有釋放許可的代碼,最終導緻,可用許可數量為0,其他線程無法擷取許可,會在 semaphore.acquire();處等待,導緻程式無法結束。

3.3. 示例3:釋放許可正确的姿勢

示例1中,在finally裡面釋放鎖,會有問題麼?

如果擷取鎖的過程中發生異常,導緻擷取鎖失敗,最後finally裡面也釋放了許可,最終會怎麼樣,導緻許可數量憑空增長了。

示例代碼:

public class Semaphore3 {
    static Semaphore semaphore = new Semaphore(1);

    public static class T extends Thread {
        public T(String name) {
            super(name);
        }

        @Override
        public void run() {
            Thread thread = Thread.currentThread();
            try {
                semaphore.acquire();
                System.out.println(System.currentTimeMillis() + "," + thread.getName() + ",擷取許可,目前可用許可數量:" + semaphore.availablePermits());   //休眠100秒
                TimeUnit.SECONDS.sleep(100);
                System.out.println(System.currentTimeMillis() + "," + thread.getName() + ",運作結束!");
            } catch (InterruptedException e) {
                e.printStackTrace();
            } finally {
                semaphore.release();
            }
            System.out.println(System.currentTimeMillis() + "," + thread.getName() + ",目前可用許可數量:" + semaphore.availablePermits());
        }
    }

    public static void main(String[] args) throws InterruptedException {
        T t1 = new T("t1");
        t1.start();   //休眠1秒
        TimeUnit.SECONDS.sleep(1);
        T t2 = new T("t2");
        t2.start();   //休眠1秒
        TimeUnit.SECONDS.sleep(1);
        T t3 = new T("t3");
        t3.start();     //給t2和t3發送中斷信号
        t2.interrupt();
        t3.interrupt();
    }
}
           

輸出

1606485357890,t1,擷取許可,目前可用許可數量:0
java.lang.InterruptedException
	at java.util.concurrent.locks.AbstractQueuedSynchronizer.acquireSharedInterruptibly(AbstractQueuedSynchronizer.java:1302)
	at java.util.concurrent.Semaphore.acquire(Semaphore.java:312)
	at com.java.thread.demo.semaphore.Semaphore3$T.run(Semaphore3.java:21)
1606485359905,t3,目前可用許可數量:1
1606485359906,t2,目前可用許可數量:2
java.lang.InterruptedException
	at java.util.concurrent.locks.AbstractQueuedSynchronizer.doAcquireSharedInterruptibly(AbstractQueuedSynchronizer.java:998)
	at java.util.concurrent.locks.AbstractQueuedSynchronizer.acquireSharedInterruptibly(AbstractQueuedSynchronizer.java:1304)
	at java.util.concurrent.Semaphore.acquire(Semaphore.java:312)
	at com.java.thread.demo.semaphore.Semaphore3$T.run(Semaphore3.java:21)
1606485457904,t1,運作結束!
1606485457904,t1,目前可用許可數量:3

           

程式中信号量許可數量為1,建立了3個線程擷取許可,線程t1擷取成功了,然後休眠100秒。其他兩個線程阻塞在 semaphore.acquire();方法處,代碼中對線程t2、t3發送中斷信号,我們看一下Semaphore中acquire的源碼:

public void acquire() throws InterruptedException {
        sync.acquireSharedInterruptibly(1);
    }
           

這個方法會響應線程中斷,主線程中對t2、t3發送中斷信号之後, acquire()方法會觸發 InterruptedException異常,t2、t3最終沒有擷取到許可,但是他們都執行了finally中的釋放許可的操作,最後導緻許可數量變為了2,導緻許可數量增加了。是以程式中釋放許可的方式有問題。需要改進一下,擷取許可成功才去釋放鎖。

正确的釋放鎖的方式,如下:

public class Semaphore4 {
    static Semaphore semaphore = new Semaphore(1);

    public static class T extends Thread {
        public T(String name) {
            super(name);
        }

        @Override
        public void run() {
            Thread thread = Thread.currentThread();   //擷取許可是否成功
            boolean acquireSuccess = false;
            try {
                semaphore.acquire();
                acquireSuccess = true;
                System.out.println(System.currentTimeMillis() + "," + thread.getName() + ",擷取許可,目前可用許可數量:" + semaphore.availablePermits());   //休眠100秒
                TimeUnit.SECONDS.sleep(5);
                System.out.println(System.currentTimeMillis() + "," + thread.getName() + ",運作結束!");
            } catch (InterruptedException e) {
                e.printStackTrace();
            } finally {
                if (acquireSuccess) {
                    semaphore.release();
                }
            }
            System.out.println(System.currentTimeMillis() + "," + thread.getName() + ",目前可用許可數量:" + semaphore.availablePermits());
        }
    }

    public static void main(String[] args) throws InterruptedException {
        T t1 = new T("t1");
        t1.start();   //休眠1秒
        TimeUnit.SECONDS.sleep(1);
        T t2 = new T("t2");
        t2.start();   //休眠1秒
        TimeUnit.SECONDS.sleep(1);
        T t3 = new T("t3");
        t3.start();     //給t2和t3發送中斷信号
        t2.interrupt();
        t3.interrupt();
    }
}
           

輸出

1606485609632,t1,擷取許可,目前可用許可數量:0
java.lang.InterruptedException
	at java.util.concurrent.locks.AbstractQueuedSynchronizer.acquireSharedInterruptibly(AbstractQueuedSynchronizer.java:1302)
	at java.util.concurrent.Semaphore.acquire(Semaphore.java:312)
	at com.java.thread.demo.semaphore.Semaphore4$T.run(Semaphore4.java:22)
java.lang.InterruptedException1606485611643,t3,目前可用許可數量:0

	at java.util.concurrent.locks.AbstractQueuedSynchronizer.doAcquireSharedInterruptibly(AbstractQueuedSynchronizer.java:998)
	at java.util.concurrent.locks.AbstractQueuedSynchronizer.acquireSharedInterruptibly(AbstractQueuedSynchronizer.java:1304)
	at java.util.concurrent.Semaphore.acquire(Semaphore.java:312)
	at com.java.thread.demo.semaphore.Semaphore4$T.run(Semaphore4.java:22)
1606485611644,t2,目前可用許可數量:0
1606485614635,t1,運作結束!
1606485614636,t1,目前可用許可數量:1

           

程式中增加了一個變量 acquireSuccess用來标記擷取許可是否成功,在finally中根據這個變量是否為true,來确定是否釋放許可。

3.4 示例4:在規定的時間内希望擷取許可

司機來到停車場,發現停車場已經滿了,隻能在外等待内部的車出來之後才能進去,但是要等多久,他自己也不知道,他希望等10分鐘,如果還是無法進去,就不到這裡停車了。

Semaphore内部2個方法可以提供逾時擷取許可的功能:

public boolean tryAcquire() {
        return sync.nonfairTryAcquireShared(1) >= 0;
    }

  public boolean tryAcquire(long timeout, TimeUnit unit)
        throws InterruptedException {
        return sync.tryAcquireSharedNanos(1, unit.toNanos(timeout));
    }
           

在指定的時間内去嘗試擷取許可,如果能夠擷取到,傳回true,擷取不到傳回false。

示例代碼:

public class Semaphore5 {
    static Semaphore semaphore = new Semaphore(1);

    public static class T extends Thread {
        public T(String name) {
            super(name);
        }

        @Override
        public void run() {
            Thread thread = Thread.currentThread();   //擷取許可是否成功  
            boolean acquireSuccess = false;
            try {   //嘗試在1秒内擷取許可,擷取成功傳回true,否則傳回false   
                System.out.println(System.currentTimeMillis() + "," + thread.getName() + ",嘗試擷取許可,目前可用許可數量:" + semaphore.availablePermits());
                acquireSuccess = semaphore.tryAcquire(1, TimeUnit.SECONDS);   //擷取成功執行業務代碼     
                if (acquireSuccess) {
                    System.out.println(System.currentTimeMillis() + "," + thread.getName() + ",擷取許可成功,目前可用許可數量:" + semaphore.availablePermits());   //休眠5秒   
                    TimeUnit.SECONDS.sleep(5);
                } else {
                    System.out.println(System.currentTimeMillis() + "," + thread.getName() + ",擷取許可失敗,目前可用許可數量:" + semaphore.availablePermits());
                }
            } catch (InterruptedException e) {
                e.printStackTrace();
            } finally {
                if (acquireSuccess) {
                    semaphore.release();
                }
            }
        }
    }

    public static void main(String[] args) throws InterruptedException {
        T t1 = new T("t1");
        t1.start();   //休眠1秒     

        TimeUnit.SECONDS.sleep(1);
        T t2 = new T("t2");
        t2.start();   //休眠1秒     

        TimeUnit.SECONDS.sleep(1);
        T t3 = new T("t3");
        t3.start();
    }
} 
           

輸出

1606485822477,t1,嘗試擷取許可,目前可用許可數量:1
1606485822480,t1,擷取許可成功,目前可用許可數量:0
1606485823481,t2,嘗試擷取許可,目前可用許可數量:0
1606485824482,t2,擷取許可失敗,目前可用許可數量:0
1606485824482,t3,嘗試擷取許可,目前可用許可數量:0
1606485825484,t3,擷取許可失敗,目前可用許可數量:0
           

代碼中許可數量為1, semaphore.tryAcquire(1,TimeUnit.SECONDS);:表示嘗試在1秒内擷取許可,擷取成功立即傳回true,超過1秒還是擷取不到,傳回false。線程t1擷取許可成功,之後休眠了5秒,從輸出中可以看出t2和t3都嘗試了1秒,擷取失敗。

其他一些使用說明

  1. Semaphore預設建立的是非公平的信号量,什麼意思呢?這個涉及到公平與非公平。舉個例子:5個車位,允許5個車輛進去,來了100輛車,隻能進去5輛,其他95在外面排隊等着。裡面剛好出來了1輛,此時剛好又來了10輛車,這10輛車是直接插隊到其他95輛前面去,還是到95輛後面去排隊呢?讓新來的去排隊就表示公平,直接去插隊争搶第一個,就表示不公平。對于停車場,排隊肯定更好一些。不過對于信号量來說不公平的效率更高一些,是以預設是不公平的。
  2. 建議閱讀以下Semaphore的源碼,對常用的方法有個了解,不需要都記住,用的時候也友善查詢就好。
  3. 方法中帶有 throwsInterruptedException聲明的,表示這個方法會響應線程中斷信号,什麼意思?表示調用線程的 interrupt()方法後,會讓這些方法觸發 InterruptedException異常,即使這些方法處于阻塞狀态,也會立即傳回,并抛出 InterruptedException異常,線程中斷信号也會被清除。

擴充閱讀

頭條面試居然跟我扯了半小時的Semaphore