CountDownLatch一次性栅栏
1 概念与用法
CountDownLatch是一个用来同步多个线程的并发工具,n个线程启动后,分别调用CountDownLatch的await方法来等待其m个条件满足(m在初始化时指定);
每当有条件满足时,当前线程调用CountDownLatch的countDown方法,使得其m值减1;
直至m值为0时,所有等待的线程被唤醒,继续执行。
注意,CountDownLatch是一次性的,当条件满足后,它不能再回到初始状态,也不能阻止后续线程了。
若要循环的阻塞多个线程,则考虑使用CyclicBarrier。
例如5匹马参加赛马比赛,需等待3个裁判到位后才能启动,代码如下:
public class CountDownLatchExam {
public static void main(String[] args) {
CountDownLatch latch = new CountDownLatch(3);
ExecutorService service = Executors.newCachedThreadPool();
for (int i = 0; i < 5; i++) {
service.submit(new Horse("horse" + i, latch));
}
for (int i = 0; i < 3; i++) {
service.submit(new Judge("judge" + i, latch));
}
service.shutdown();
}
private static class Horse implements Runnable {
private final String name;
private final CountDownLatch latch;
Horse(String name, CountDownLatch latch) {
this.name = name;
this.latch = latch;
}
@Override
public void run() {
try {
System.out.println(name + " is ready,wait for all judges.");
latch.await();
System.out.println(name + " is running.");
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
private static class Judge implements Runnable {
private final String name;
private final CountDownLatch latch;
private static Random random = new Random(System.currentTimeMillis());
Judge(String name, CountDownLatch latch) {
this.name = name;
this.latch = latch;
}
@Override
public void run() {
try {
TimeUnit.SECONDS.sleep(random.nextInt(5));
System.out.println(name + " is ready.");
latch.countDown();
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
}
horse0 is ready,wait for all judges.
horse1 is ready,wait for all judges.
horse2 is ready,wait for all judges.
horse3 is ready,wait for all judges.
horse4 is ready,wait for all judges.
judge2 is ready.
judge1 is ready.
judge0 is ready.
horse0 is running.
horse1 is running.
horse2 is running.
horse3 is running.
horse4 is running.
CountDownLatch的原理在上一篇的4.7节“一次唤醒所有阻塞线程的共享锁”中已经详细阐述了。简要复述一下,CountDownLatch使用AQS的子类Sync作为内部的同步器,并由Sync复写了AQS的tryAcquireShared和tryReleaseShared方法。它将AQS中的state当做需要满足的条件个数,生成了一个共享锁。
每当调用await方法时,内部调用了tryAcquireShared方法,由于state>0,因此调用的线程会阻塞在共享锁的循环框架中。
每当调用countDown方法时,内部调用了releaseShared方法,而此方法将会把state的值减1,当state的值为0时,tryAcquireShared中的循环将会唤醒所有等待线程,使之继续运行。由于tryAcquireShared方法中没有修改state值,因此CountDownLatch只能一次性使用,不能循环使用。
若需知道更多细节,请直接阅读CountDownLatch和AQS的源代码。提醒一句,CountDownLatch的源代码是所有AQS的应用中最简单的,应当从它读起。
Semaphore信号量
Semaphore信号量,在多个任务争夺几个有限的共享资源时使用。
调用acquire方法获取一个许可,成功获取的线程继续执行,否则就阻塞;
调用release方法释放一个许可。每当有空余的许可时,阻塞的线程和其他线程可竞争许可。
下面的例子中,10辆车竞争3个许可证,有了许可证的车就可以入内访问资源,访问完成后释放许可证:
作者:Alex Wang
链接:https://zhuanlan.zhihu.com/p/27829595
来源:知乎
著作权归作者所有。商业转载请联系作者获得授权,非商业转载请注明出处。
public class SemaphoreExam {
public static void main(String[] args) {
Semaphore semaphore = new Semaphore(3);
ExecutorService service = Executors.newCachedThreadPool();
// 10 cars wait for 3 semaphore
for (int i = 0; i < 10; i++) {
service.submit(new Car("Car" + i, semaphore));
}
service.shutdown();
}
private static class Car implements Runnable {
private final String name;
private final Semaphore semaphore;
private static Random random = new Random(System.currentTimeMillis());
Car(String name, Semaphore semaphore) {
this.name = name;
this.semaphore = semaphore;
}
@Override
public void run() {
try {
System.out.println(name + " is waiting for a permit");
semaphore.acquire();
System.out.println(name+" get a permit to access, available permits:"+semaphore.availablePermits());
TimeUnit.SECONDS.sleep(random.nextInt(5));
System.out.println(name + " release a permit, available permits:"+semaphore.availablePermits());
semaphore.release();
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
}
注意,运行时semaphore.availablePermits()方法会返回当前空余的许可证数量。但由于线程获取许可证的速度往往快于IO的速度,因此很多时刻看到这个数字都是0。
2 原理
Semaphore的原理在上一篇的4.8节“拥有多个许可证的共享锁”中已经详细阐述了。
简要复述一下,Semaphore使用AQS的子类Sync作为内部的同步器,并由Sync复写了AQS的tryAcquireShared和tryReleaseShared方法。
它将AQS中的state当做许可证的个数,生成了一个共享锁。state的值在Semaphore的构造函数中指定,必须大于0。
1.每当调用acquire方法时,内部调用了tryAcquireShared方法,此方法检测state的值是否>0,若是则将state减1,并继续运行,否则线程就阻塞在共享锁的循环框架中。
2.每当调用release方法时,内部调用了releaseShared方法,而此方法将会把state的值加1,当state的值大于0时,tryAcquireShared中的循环将会唤醒所有等待线程,使之继续运行,重新竞争许可证。
若需知道更多细节,请直接阅读Semaphore和AQS的源代码。
CyclicBarrier循环同步栅栏
CyclicBarrier可用来在某些栅栏点处同步多个线程,且可以多次使用,每次在栅栏点同步后,还可以激发一个事件。
例如三个旅游者(线程)各自出发,依次到达三个城市,必须每个人都到达某个城市后(栅栏点),才能再次出发去向下一个城市,当他们每同步一次时,激发一个事件,输出一段文字。代码如下:
public class CyclicBarrierExam {
public static void main(String[] args) {
CyclicBarrier barrier = new CyclicBarrier(3, new Runnable() {
@Override
public void run() {
System.out.println("======== all threads have arrived at the checkpoint ===========");
}
});
ExecutorService service = Executors.newFixedThreadPool(3);
service.submit(new Traveler("Traveler1", barrier));
service.submit(new Traveler("Traveler2", barrier));
service.submit(new Traveler("Traveler3", barrier));
service.shutdown();
}
private static class Traveler implements Runnable {
private final String name;
private final CyclicBarrier barrier;
private static Random rand = new Random(47);
Traveler(String name, CyclicBarrier barrier) {
this.name = name;
this.barrier = barrier;
}
@Override
public void run() {
try {
TimeUnit.SECONDS.sleep(rand.nextInt(5));
System.out.println(name + " arrived at Beijing.");
barrier.await();
TimeUnit.SECONDS.sleep(rand.nextInt(5));
System.out.println(name + " arrived at Shanghai.");
barrier.await();
TimeUnit.SECONDS.sleep(rand.nextInt(5));
System.out.println(name + " arrived at Guangzhou.");
barrier.await();
} catch (InterruptedException e) {
e.printStackTrace();
} catch (BrokenBarrierException e) {
e.printStackTrace();
}
}
}
}
CyclicBarrier是依赖一个可重入锁ReentrantLock和它的一个Condition实现的,
在构造时,CyclicBarrier得到了一个parties数值,它代表参与的线程数量,以及一个Runnable的实例,它代表被激发的事件。
每当有线程调用await时,parties减1。若此时parties大于0,线程就在Condition处阻塞,若parties等于0,则此Condition会调用signalAll释放所有等待线程,并触发事件,同时将parties复原。因此所有的线程又进入下一轮循环。
CyclicBarrier代码非常简单,复杂之处在于它还要处理线程中断、超时等情况。
Exchange线程间变量交换
Exchange专门用于成对的线程间同步的交换一个同类型的变量,这种交换是线程安全且高效的。直接来看一个例子:
public class ExchangerExam {
public static void main(String[] args) {
Exchanger<String> exchanger = new Exchanger<>();
ExecutorService service = Executors.newCachedThreadPool();
service.submit(new StringHolder("LeftHand", "LeftValue", exchanger));
service.submit(new StringHolder("RightHand", "RightValue", exchanger));
service.shutdown();
}
private static class StringHolder implements Runnable {
private final String name;
private final String val;
private final Exchanger<String> exchanger;
private static Random rand = new Random(System.currentTimeMillis());
StringHolder(String name, String val, Exchanger<String> exchanger) {
this.name = name;
this.val = val;
this.exchanger = exchanger;
}
@Override
public void run() {
try {
System.out.println(name + " hold the val:" + val);
TimeUnit.SECONDS.sleep(rand.nextInt(5));
String str = exchanger.exchange(val);
System.out.println(name + " get the val:" + str);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
}
可以看到,代码中两个线程同步的交换了一个String。先执行exchange方法的线程会阻塞直到后一个线程也执行了exchange方法,然后同步的完成数据的交换。再看一个例子:
public class ExchangerExam2 {
public static void main(String[] args) throws InterruptedException {
Exchanger<String> exchanger = new Exchanger<>();
ExecutorService service = Executors.newCachedThreadPool();
long start = System.currentTimeMillis();
service.submit(new StringHolder("LeftHand", "LeftValue", exchanger));
service.submit(new StringHolder("RightHand", "RightValue", exchanger));
service.shutdown();
service.awaitTermination(1, TimeUnit.DAYS);
long end = System.currentTimeMillis();
System.out.println("time span is " + (end - start) + " milliseconds");
}
private static class StringHolder implements Runnable {
private final String name;
private final String val;
private final Exchanger<String> exchanger;
private static Random rand = new Random(System.currentTimeMillis());
StringHolder(String name, String val, Exchanger<String> exchanger) {
this.name = name;
this.val = val;
this.exchanger = exchanger;
}
@Override
public void run() {
try {
for (int i = 0; i < 10000; i++) {
// System.out.println(name + "-" + i + ": hold the val:" + val + i);
// TimeUnit.NANOSECONDS.sleep(rand.nextInt(5));
String str = exchanger.exchange(val + i);
// System.out.println(name + "-" + i + ": get the val:" + str);
}
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
}
代码中,两个线程交换了10000组数据,用时仅41ms,这说明Exchanger的同步效率是非常高的。
再看一段代码:
作者:Alex Wang
链接:https://zhuanlan.zhihu.com/p/27829595
来源:知乎
著作权归作者所有。商业转载请联系作者获得授权,非商业转载请注明出处。
public class ExchangerExam3 {
public static void main(String[] args) {
Exchanger<String> exchanger = new Exchanger<>();
ExecutorService service = Executors.newCachedThreadPool();
service.submit(new StringHolder("North", "NorthValue", exchanger));
service.submit(new StringHolder("East", "EastValue", exchanger));
service.submit(new StringHolder("West", "WestValue", exchanger));
service.submit(new StringHolder("South", "SouthValue", exchanger));
service.shutdown();
}
private static class StringHolder implements Runnable {
private final String name;
private final String val;
private final Exchanger<String> exchanger;
private static Random rand = new Random(System.currentTimeMillis());
StringHolder(String name, String val, Exchanger<String> exchanger) {
this.name = name;
this.val = val;
this.exchanger = exchanger;
}
@Override
public void run() {
try {
for (int i = 0; i < 10000; i++) {
System.out.println(name + "-" + i + ": hold the val:" + val + i);
TimeUnit.NANOSECONDS.sleep(rand.nextInt(5));
String str = exchanger.exchange(val + i);
System.out.println(name + "-" + i + ": get the val:" + str);
}
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
}
这段代码在运行时有很大的概率会死锁,原因就是Exchanger是用来在“成对”的线程之间交换数据的,像上面这样在四个线程之间交换数据,Exchanger很有可能将多个线程互相阻塞在其Slot中,造成死锁。
Exchanger这个类初看非常简单,其公开的接口仅有一个无参构造函数,两个重载的泛型exchange方法:
public V exchange(V x) throws InterruptedException
public V exchange(V x, long timeout, TimeUnit unit) throws InterruptedException, TimeoutException
第一个方法用来持续阻塞的交换数据;第二个方法用来在一个时间范围内交换数据,若超时则抛出TimeoutException后返回,同时唤醒另一个阻塞线程。
Exchanger的基本原理是维持一个槽(Slot),这个Slot中存储一个Node的引用,这个Node中保存了一个用来交换的Item和一个用来获取对象的洞Hole。如果一个来“占有”的线程看见Slot为null,则调用CAS方法使一个Node对象占据这个Slot,并等待另一个线程前来交换。如果第二个来“填充”的线程看见Slot不为null,则调用CAS方法将其设置为null,同时使用CAS与Hole交换Item,然后唤醒等待的线程。注意所有的CAS操作都有可能失败,因此CAS必须是循环调用的。
看看JDK1.7中Exchanger的数据结构相关源代码:
// AtomicReference中存储的是Hole对象
private static final class Node extends AtomicReference<Object> {
/** 用来交换的对象. */
public final Object item;
/** The Thread waiting to be signalled; null until waiting. */
public volatile Thread waiter;
/**
* Creates node with given item and empty hole.
* @param item the item
*/
public Node(Object item) {
this.item = item;
}
}
//Slot中存储的是Node
private static final class Slot extends AtomicReference<Object> {
//这一行是为了防止伪共享而加入的缓冲行,与具体算法无关
long q0, q1, q2, q3, q4, q5, q6, q7, q8, q9, qa, qb, qc, qd, qe;
}
//一个Slot数组,数组中有32个Slot,只在必要时才创建
private volatile Slot[] arena = new Slot[CAPACITY];
下面是进行交换操作的核心算法:/
private Object doExchange(Object item, boolean timed, long nanos) {
Node me = new Node(item); // 创建一个Node,预备在“占用”时使用
int index = hashIndex(); // 当前Slot的哈希值
int fails = 0; // CAS失败次数
for (;;) {
Object y; // 当前Slot中的内容
Slot slot = arena[index]; //得到当前的Slot
if (slot == null) // 延迟加载slots
createSlot(index); // 创建Slot并重入循环
else if ((y = slot.get()) != null && // 如果Hole不为null,准备“填充”
slot.compareAndSet(y, null)) {
Node you = (Node)y; // 从这里开始交换数据
if (you.compareAndSet(null, item)) {
LockSupport.unpark(you.waiter); //唤醒等待线程
return you.item; //“填充”线程从这里返回值
} // 上面条件不满足,重入循环
}
else if (y == null && // 如果Hole为null,准备“占有”
slot.compareAndSet(null, me)) {
if (index == 0) // 在slot 0上等待交换
return timed ?
awaitNanos(me, slot, nanos) :
await(me, slot);
Object v = spinWait(me, slot); // Slot位置不为0时,自旋等待交换
if (v != CANCEL)
return v; //“占有”线程从这里返回值
me = new Node(item); // 抛弃被取消的Node,创建新Node
int m = max.get();
if (m > (index >>>= 1)) // index右移1位,相当于arena中slot向右1位
max.compareAndSet(m, m - 1); // 缩表
}
else if (++fails > 1) { // 在第一个Slot上运行两次失败
int m = max.get();
if (fails > 3 && m < FULL && max.compareAndSet(m, m + 1))
index = m + 1; // 第三次失败时index增加
else if (--index < 0)
index = m; // 当index小于0时,赋值为m
}
}
}