在 Java 开发中,多线程的应用十分频繁,所以在多线程协作完成同一个任务的情况下,线程间同步的需求会非常常见。比如这样一个场景,主线程需要从10个站点下载数据,此时新建一个大小为 5 的线程池来分别从站点下载数据,主线程则必须等到子线程全部下载完成后,拿到完整数据才能进行下一步的处理。Java 给我们提供了一个类 CountDownLatch
可以方便的帮助我们来实现这样的场景。
代码示例
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
|
public class CountdownLatchDe {
private static CountDownLatch guests = new CountDownLatch(5);
public static void main(String[] args) throws InterruptedException {
ExecutorService executorService = Executors.newFixedThreadPool(5);
for (int i = 0; i < 5; i++) {
executorService.execute(() -> {
System.out.println(Thread.currentThread().getName() + ": 游戏开始");
System.out.println(Thread.currentThread().getName() + ": 完成");
guests.countDown();
});
}
guests.await();
System.out.println(Thread.currentThread().getName() + ": 全部结束, 开始一下项");
executorService.shutdown();
}
}
|
输出如下:
上面这个 demo 展示了这样一个场景,在一场有主持人的活动中,主持人必须等参与者都完成了一个游戏,才能宣布进入下一个环节。主线程模拟的是主持人,主线程调用方法await
会被阻塞住,子线程模拟的参与者每次执行方法countDown
, 则CountDownLatch
维护的计数器会减一,直到全部子线程执行完,计数器也变为0,主线程才被唤醒可以继续往下执行。
源码查看
1
2
3
4
5
6
7
8
9
10
11
|
public class CountDownLatch {
// 这是 CountDownLatch 定义的一个继承自 AQS 的内部类,数据结构就是使用 AQS 提供的两个队列 sync queue 和 condition queue
private final Sync sync;
// 构造给定计数的 CountDownLatch, 完成 sync 初识化
public CountDownLatch(int count) {
if (count < 0) throw new IllegalArgumentException("count < 0");
// 初始化状态数
this.sync = new Sync(count);
}
}
|
1
2
3
4
5
|
// 会阻塞调用线程一直到计数器为0或者被中断
public void await() throws InterruptedException {
// 真正调用 sync 对象的方法
sync.acquireSharedInterruptibly(1);
}
|
await
主要调用链如下所示
总结一下核心流程:
- 判读当前计数器是否为 0;
- 计数器不是0, 调用
doAcquireSharedInterruptibly
加入到同步阻塞队列;
- 尝试获取锁,获取失败则调用
shouldParkAfterFailedAcquire
判断是否需要阻塞等待,如果需要阻塞等待则调用 parkAndCheckInterrupt
阻塞当前线程并让出cup资源直到被前一个节点唤醒
1
2
3
4
|
// 计数器减一,如果减为0,则释放所有等待线程
public void countDown() {
sync.releaseShared(1);
}
|
countDown
主要调用链如下所示
总结一下核心流程(doReleaseShared
):
- 获取头结点,头结点不为空且有下一个结点;
- 头结点状态为
Signal
, 调用 unparkSuccessor
唤醒;
- 获取当前节点状态,当前节点正常情况则设置成0,获取下一个状态为非
CANCELLED
的节点,调用 LockSupport.unpark
唤醒此结点
考虑另外一个场景,子线程需要同时执行一个操作,怎么让这些线程同时开始呢? 我们可以借助 Java 提供的 CyclicBarrier 来实现此目的。
代码实例
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
|
public class CyclicBarrierDe {
private static CyclicBarrier barrier = new CyclicBarrier(5, () -> {
System.out.println("下一阶段 go ->");});
public static void main(String[] args) {
ExecutorService executorService = Executors.newFixedThreadPool(5);
for (int i = 0; i < 5; i++) {
executorService.execute(() -> {
try {
System.out.println(Thread.currentThread().getName() + ": 热身活动");
barrier.await();
System.out.println(Thread.currentThread().getName() + ": 比赛开始");
System.out.println(Thread.currentThread().getName() + ": 比赛结束");
barrier.await();
System.out.println(Thread.currentThread().getName() + ": 参加记者招待会");
} catch (InterruptedException | BrokenBarrierException e) {
e.printStackTrace();
}
});
}
}
}
|
输出如下:
上面的 demo 展示了每个子线程都调用了 await
则都到达了 barrier, 才能执行后续的代码,否则被阻塞住。
源码查看
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
|
public class CyclicBarrier {
/** The lock for guarding barrier entry */
// 可重入锁
private final ReentrantLock lock = new ReentrantLock();
/** Condition to wait on until tripped */
// 条件队列
private final Condition trip = lock.newCondition();
/** The number of parties */
// 参与的线程数量
private final int parties;
/* The command to run when tripped */
// 由最后一个进入 barrier 的线程执行的操作
private final Runnable barrierCommand;
/** The current generation */
// 当前代
private Generation generation = new Generation();
// 正在等待进入屏障的线程数量
private int count;
public CyclicBarrier(int parties, Runnable barrierAction) {
// 参与的线程数量小于等于0,抛出异常
if (parties <= 0) throw new IllegalArgumentException();
// 设置parties
this.parties = parties;
// 设置count
this.count = parties;
// 设置barrierCommand
this.barrierCommand = barrierAction;
}
public CyclicBarrier(int parties) {
// 调用含有两个参数的构造函数
this(parties, null);
}
}
|
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
|
private int dowait(boolean timed, long nanos)
throws InterruptedException, BrokenBarrierException,
TimeoutException {
// 保存当前锁
final ReentrantLock lock = this.lock;
// 锁定
lock.lock();
try {
// 保存当前代
final Generation g = generation;
if (g.broken) // 屏障被破坏,抛出异常
throw new BrokenBarrierException();
if (Thread.interrupted()) { // 线程被中断
// 损坏当前屏障,并且唤醒所有的线程,只有拥有锁的时候才会调用
breakBarrier();
// 抛出异常
throw new InterruptedException();
}
// 减少正在等待进入屏障的线程数量
int index = --count;
if (index == 0) { // 正在等待进入屏障的线程数量为0,所有线程都已经进入
// 运行的动作标识
boolean ranAction = false;
try {
// 保存运行动作
final Runnable command = barrierCommand;
if (command != null) // 动作不为空
// 运行
command.run();
// 设置ranAction状态
ranAction = true;
// 进入下一代
nextGeneration();
return 0;
} finally {
if (!ranAction) // 没有运行的动作
// 损坏当前屏障
breakBarrier();
}
}
// loop until tripped, broken, interrupted, or timed out
// 无限循环
for (;;) {
try {
if (!timed) // 没有设置等待时间
// 等待
trip.await();
else if (nanos > 0L) // 设置了等待时间,并且等待时间大于0
// 等待指定时长
nanos = trip.awaitNanos(nanos);
} catch (InterruptedException ie) {
if (g == generation && ! g.broken) { // 等于当前代并且屏障没有被损坏
// 损坏当前屏障
breakBarrier();
// 抛出异常
throw ie;
} else { // 不等于当前带后者是屏障被损坏
// We're about to finish waiting even if we had not
// been interrupted, so this interrupt is deemed to
// "belong" to subsequent execution.
// 中断当前线程
Thread.currentThread().interrupt();
}
}
if (g.broken) // 屏障被损坏,抛出异常
throw new BrokenBarrierException();
if (g != generation) // 不等于当前代
// 返回索引
return index;
if (timed && nanos <= 0L) { // 设置了等待时间,并且等待时间小于0
// 损坏屏障
breakBarrier();
// 抛出异常
throw new TimeoutException();
}
}
} finally {
// 释放锁
lock.unlock();
}
}
|
CyclicBarrier 与 CountDownLatch 对比:
- 二者都可以用来做线程同步;
- CyclicBarrier 到达 barrier 后唤醒全部线程,CountDownLatch 计数为0,则是一个个传播唤醒;
- CyclicBarrier 支持配置 Runnable 任务 CountDownLatch 不支持;CyclicBarrier 可重用 CountDownLatch 不可重用.