并发编程-AQS应用类实现原理
- CountDownLatch
-
- 重要函数
- CyclicBarrier
-
- 应用场景
- CyclicBarrier 和 CountDownLatch的区别
- Semaphore
CountDownLatch
CountDownLatch 允许一个或者多个线程等待其他线程完成操作。
重要函数
countDown()
public void countDown() {
//会调用 tryReleaseShared()
sync.releaseShared(1);
}
public final boolean releaseShared(int arg) {
//如果没有锁了 去唤醒
if (tryReleaseShared(arg)) {
//唤醒线程 如果state=0 这里面会重复去唤醒下一个节点。这样会全部被唤醒
doReleaseShared();
return true;
}
return false;
}
protected boolean tryReleaseShared(int releases) {
for (;;) {
//获取State
int c = getState();
// 如果原本等于0 直接退出,因为其他线程已经开始唤醒开始唤醒。
if (c == 0)
return false;
int nextc = c-1;
//利用自旋将state-1;如果 为零,可以开始唤醒
if (compareAndSetState(c, nextc))
return nextc == 0;
}
}
await()
//将这个线程放入同步等待队列 由于state= count 所以会阻塞。
public void await() throws InterruptedException {
sync.acquireSharedInterruptibly(1);
}
CyclicBarrier
应用场景
CyclicBarrier 可以用于多线程计算数据,最后合并结果的场景。
代码应用
public class CyclicBarrierRunner implements Runnable {
private CyclicBarrier cyclicBarrier;
private int index ;
public CyclicBarrierRunner(CyclicBarrier cyclicBarrier, int index) {
this.cyclicBarrier = cyclicBarrier;
this.index = index;
}
public void run() {
try {
System.out.println("index: " + index);
cyclicBarrier.await();
} catch (Exception e) {
e.printStackTrace();
}
}
public static void main(String[] args) throws Exception {
CyclicBarrier cyclicBarrier = new CyclicBarrier(11, new Runnable() {
public void run() {
System.out.println("所有特工到达屏障,准备开始执行秘密任务");
}
});
for (int i = 0; i < 10; i++) {
new Thread(new CyclicBarrierRunner(cyclicBarrier, i)).start();
}
//这里阻塞,直到其他线程都跑完,也就是跑完10 个await
cyclicBarrier.await();
System.out.println("全部到达屏障....1");
}
}
CyclicBarrier 和 CountDownLatch的区别
- CountDownLatch 计数器只能使用一次,而CyclicBarrier可以用reset()重新设置。
- CyclicBarrier还提供其他有用的方法,比如getNumberWaiting()可以获取阻塞线程数量。isBroken()方法用来了解阻塞的线程是否被打断。
Semaphore
Semaphore 是用来控制同时访问特定资源的线程数量。通过调节各个线程,以保证合理的使用公共资源。
public class SemaphoreRunner {
public static void main(String[] args) {
Semaphore semaphore = new Semaphore(2);
for (int i=0;i<5;i++){
new Thread(new Task(semaphore,"yangguo+"+i)).start();
//CountDownLatch
}
}
static class Task extends Thread{
Semaphore semaphore;
public Task(Semaphore semaphore,String tname){
this.semaphore = semaphore;
this.setName(tname);
}
public void run() {
try {
semaphore.acquire();
System.out.println(Thread.currentThread().getName()+":aquire() at time:"+System.currentTimeMillis());
Thread.sleep(1000);
semaphore.release();
// System.out.println(Thread.currentThread().getName()+":aquire() at time:"+System.currentTimeMillis());
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
}