企业🤖AI智能体构建引擎,智能编排和调试,一键部署,支持私有化部署方案 广告
## 27 倒数计时开始,三、二、一—CountDownLatch详解 > 时间像海绵里的水,只要你愿意挤,总还是有的。 > ——鲁迅 本节开始我们学习一些新的东西,不再局限于线程、锁、并发容器这些内容。JCU 包中提供了一些工具,用于线程间的协调。这些工具并不是每个并发编程的场景都需要使用。大部分场景通过 wait/nofity 或者 join 等操作就可以解决。但是在一些特定的场景下,我们则需要借助这些工具来解决问题。本节我们先来学习 CountDownLatch。 ## 1、理解 CountDownLatch 从字面理解 CountDownLatch,意思是倒数门闩。它的作用是多个线程做汇聚。主线程开启了 A、B、C 三个线程做不同的事情,但是主线程需要等待 A、B、C 三个线程全部完成后才能继续后面的步骤。此时就需要 CountDownLatch 出马了。CountDownLatch 会阻塞主线程,直到计数走到 0,门闩才会打开,主线程继续执行。而计数递减是每个线程自己操作 CountDownLatch 对象实现的。如下图: ![图片描述](https://img.mukewang.com/5dd5fad80001e9c816000908.jpg) 这种场景在我们的生活中十分常见。比如篮球比赛中,作为控球后卫,如果没有快攻机会,那就需要等到中锋、大前锋、小前锋、得分后卫都跑到位了,我才能决定怎么组织进攻。又比如我们报团去旅游,必须所有人都到机场了,才能一起出发。 对于我们的程序来说这种场景也挺多的,比如你的订单信息可能需要从多个微服务取得数据,汇总后加工才返回给前台。此时从多个微服务取得数据可以是多个子线程来完成。 对于以上场景,都是 CountDownLatch 的用武之地。 ## 2、如何使用 CountDownLatch 我们来模拟打篮球的例子,主线程假如是控球后卫,我们看一下如果不用 CountDownLatch 会有什么问题: ~~~java public static void main(String[] args) throws InterruptedException { System.out.println("控球后卫到位!等待所有位置球员到位!"); new Thread(()->{ System.out.println("得分后卫到位!"); }).start(); new Thread(()->{ System.out.println("中锋到位!"); }).start(); new Thread(()->{ System.out.println("大前锋到位!"); }).start(); new Thread(()->{ System.out.println("小前锋到位!"); }).start(); System.out.println("全部到位,开始进攻!"); } ~~~ 输出为: ~~~ 控球后卫到位!等待所有位置球员到位! 得分后卫到位! 中锋到位! 大前锋到位! 全部到位,开始进攻! 小前锋到位! ~~~ 可以看到小前锋还没有到位,就开始进攻了。这显然和需求不符。出现这种结果也很好理解,因为代码中控球后卫并没有等每个球员的线程到位,就开始进攻了。 正确的姿势应该如下: ~~~java public class Client { private static final CountDownLatch countDownLatch = new CountDownLatch(5); public static void main(String[] args) throws InterruptedException { System.out.println("控球后卫到位!等待所有位置球员到位!"); countDownLatch.countDown(); new Thread(()->{ System.out.println("得分后卫到位!"); countDownLatch.countDown(); }).start(); new Thread(()->{ System.out.println("中锋到位!"); countDownLatch.countDown(); }).start(); new Thread(()->{ System.out.println("大前锋到位!"); countDownLatch.countDown(); }).start(); new Thread(()->{ System.out.println("小前锋到位!"); countDownLatch.countDown(); }).start(); countDownLatch.await(); System.out.print("全部到位,开始进攻!"); } } ~~~ 首先声明声明了一个 countDownLatch 对象,由于有5名球员,所以传入 count=5。每个球员的线程在球员到位后,都会执行 countDownLatch.countDown(),这个方法可以理解为我们把初始值的计数数量5做递减。当减到零时才会执行 countDownLatch.await(); 后面的代码。countDownLatch.await() 就是我们的门闩,这行代码做的是锁门操作,而每次 countDown(),调用5次后,门闩打开,后面的代码才被执行。 这段代码输出如下: ~~~ 控球后卫到位!等待所有位置球员到位! 得分后卫到位! 中锋到位! 大前锋到位! 小前锋到位! 全部到位,开始进攻! ~~~ 可以看出完全符合我们的预期,如果你还对此表示怀疑,那么你可以在某个线程中让其 sleep 上几秒,再看看是否还是全部到位才开始进攻。 ## 3、CountDownLatch 的原理解析 CountDownLatch 内部其实还是借助 AQS 实现的。它内部实现了 AbstractQueuedSynchronizer。使用 AQS 的 state 变量来存储计数器的值,初始化 CountDownLatch,实际在初始化 state 值。 ### 3.1 构造函数 我们看其构造函数: ~~~java public CountDownLatch(int count) { if (count < 0) throw new IllegalArgumentException("count < 0"); this.sync = new Sync(count); } ~~~ ~~~java Sync(int count) { setState(count); } ~~~ ~~~java protected final void setState(int newState) { state = newState; } ~~~ 三个方法串起来看,发现最后就是把传入的 count 设置给了 state。 ### 3.2 await 方法 await 方法会阻塞当前线程,代码如下: ~~~java public void await() throws InterruptedException { sync.acquireSharedInterruptibly(1); } ~~~ 调用了 AQS 的方法: ~~~java public final void acquireSharedInterruptibly(int arg) throws InterruptedException { if (Thread.interrupted()) throw new InterruptedException(); if (tryAcquireShared(arg) < 0) doAcquireSharedInterruptibly(arg); } ~~~ 尝试获取共享锁 tryAcquireShared,如果不能获取进入等待队列。 tryAcquireShared 方法由 CountDownLatch 的内部类 Sync 实现,如下: ~~~java protected int tryAcquireShared(int acquires) { return (getState() == 0) ? 1 : -1; } ~~~ 可以看到如果 state 为0就直接返回了,但如果不为零,才进入等待队列。调用 tryAcquireShared 仅仅检查 state值,而不会对其减 1,可以看到传入的参数 acquires根本没有用。 我们再看看 countDown 方法。 ### 3.3 countDown 方法 这个方法会对 state 递减。当计数器减为 0 时,所有阻塞的线程都被唤醒。代码如下: ~~~java public void countDown() { sync.releaseShared(1); } ~~~ 可见其也是通过对自己的 AQS 子类调用 releaseShared 方法: ~~~java public final boolean releaseShared(int arg) { if (tryReleaseShared(arg)) { doReleaseShared(); return true; } return false; } ~~~ 而在这个方法里,tryReleaseShared 是由子类实现的,也就是 countDown 中的 Sync 类,实现代码如下: ~~~java protected boolean tryReleaseShared(int releases) { // Decrement count; signal when transition to zero for (;;) { int c = getState(); if (c == 0) return false; int nextc = c-1; if (compareAndSetState(c, nextc)) return nextc == 0; } } ~~~ 以上代码在自旋中,通过 CAS 的方式对 state 值-1,如果 c-1 后等于 0,说明计数到 0。那么 releaseShared 中会调用 doReleaseShared(),让 AQS 释放资源出来。 以上对做 CountDownLatch 源代码做了简单的分析,可以看出主要是使用 AQS 来实现。通过阻塞队列阻塞线程。然后通过 state 值的初始化和递减,实现 state 为 0 时,激活阻塞的线程。 ## 4、总结 CountDownLatch 有其一定的应用场景,对于多线程协调和串起流程有很大的帮助。我们在多线程开发中,可以留意是否有类似的场景,能够通过 CountDownLatch 来解决。CountDownLatch 自身也有一定的局限性,它只能被使用一次,而不能被恢复再次使用。下一节我们讲学习 CyclicBarrier,它可以重置以重复使用。