天天看点

并发编程-AQS应用类实现原理CountDownLatchCyclicBarrierSemaphore

并发编程-AQS应用类实现原理

  • CountDownLatch
    • 重要函数
  • CyclicBarrier
    • 应用场景
    • CyclicBarrier 和 CountDownLatch的区别
  • Semaphore

CountDownLatch

CountDownLatch 允许一个或者多个线程等待其他线程完成操作。

重要函数

countDown()
public void countDown() {
//会调用 tryReleaseShared()
       sync.releaseShared(1);
   }
   
public final boolean releaseShared(int arg) {
//如果没有锁了 去唤醒
    if (tryReleaseShared(arg)) {
    //唤醒线程 如果state=0 这里面会重复去唤醒下一个节点。这样会全部被唤醒
        doReleaseShared();
        return true;
    }
    return false;
}

protected boolean tryReleaseShared(int releases) {
   for (;;) {
   //获取State
       int c = getState();
      // 如果原本等于0 直接退出,因为其他线程已经开始唤醒开始唤醒。
       if (c == 0)
           return false;
        
       int nextc = c-1;
       //利用自旋将state-1;如果 为零,可以开始唤醒
       if (compareAndSetState(c, nextc))
           return nextc == 0;
   }
}
           
await()
//将这个线程放入同步等待队列 由于state= count 所以会阻塞。
public void await() throws InterruptedException {
        sync.acquireSharedInterruptibly(1);
    }
           

CyclicBarrier

应用场景

CyclicBarrier 可以用于多线程计算数据,最后合并结果的场景。

代码应用
public class CyclicBarrierRunner implements Runnable {
    private CyclicBarrier cyclicBarrier;
    private int index ;

    public CyclicBarrierRunner(CyclicBarrier cyclicBarrier, int index) {
        this.cyclicBarrier = cyclicBarrier;
        this.index = index;
    }

    public void run() {
        try {
            System.out.println("index: " + index);
            cyclicBarrier.await();
        } catch (Exception e) {
            e.printStackTrace();
        }
    }

    public static void main(String[] args) throws Exception {
        CyclicBarrier cyclicBarrier = new CyclicBarrier(11, new Runnable() {
            public void run() {
                System.out.println("所有特工到达屏障,准备开始执行秘密任务");
            }
        });
        for (int i = 0; i < 10; i++) {
            new Thread(new CyclicBarrierRunner(cyclicBarrier, i)).start();
        }
        //这里阻塞,直到其他线程都跑完,也就是跑完10 个await
        cyclicBarrier.await();
        System.out.println("全部到达屏障....1");

    }

}
           

CyclicBarrier 和 CountDownLatch的区别

  • CountDownLatch 计数器只能使用一次,而CyclicBarrier可以用reset()重新设置。
  • CyclicBarrier还提供其他有用的方法,比如getNumberWaiting()可以获取阻塞线程数量。isBroken()方法用来了解阻塞的线程是否被打断。

Semaphore

Semaphore 是用来控制同时访问特定资源的线程数量。通过调节各个线程,以保证合理的使用公共资源。

public class SemaphoreRunner {
    public static void main(String[] args) {
        Semaphore semaphore = new Semaphore(2);
        for (int i=0;i<5;i++){
            new Thread(new Task(semaphore,"yangguo+"+i)).start();
            //CountDownLatch
        }
    }

    static class Task extends Thread{
        Semaphore semaphore;



        public Task(Semaphore semaphore,String tname){
            this.semaphore = semaphore;
            this.setName(tname);
        }

        public void run() {
            try {
                semaphore.acquire();
                System.out.println(Thread.currentThread().getName()+":aquire() at time:"+System.currentTimeMillis());
                Thread.sleep(1000);
                semaphore.release();
//                System.out.println(Thread.currentThread().getName()+":aquire() at time:"+System.currentTimeMillis());
            } catch (InterruptedException e) {
                e.printStackTrace();
            }

        }
    }
}