您现在的位置是:亿华云 > 热点

面试官:CountDownLatch有了解过吗?

亿华云2025-10-04 03:58:44【热点】6人已围观

简介前言Java提供了一些非常好用的并发工具类,不需要我们重复造轮子,本节我们讲解CountDownLatch,一起来看下吧!CountDownLatch首先我们来看下这玩意是干啥用的。CountDown

前言

Java提供了一些非常好用的面试并发工具类,不需要我们重复造轮子,解过本节我们讲解CountDownLatch,面试一起来看下吧!

CountDownLatch

首先我们来看下这玩意是解过干啥用的。CountDownLatch同样的面试也是java.util.concurrent并发包下的工具类,通常我们会叫它是解过并发计数器,这个计数不是记12345,主要的使用场景是当一个任务被拆分成多个子任务时,需要等待子任务全部完成后,面试不然会阻塞线程,解过每完成一个任务计数器会-1,面试直到没有。解过这个有点类似go语言中的面试的sync.WaitGroup。

废话不多说,解过我们通过例子带大家快速入门,面试 在这之前,还需给大家补充一下它的解过常用方法:

public CountDownLatch(int count) { ...}构造函数。void await()是面试当前线程等待直到锁存储器计到0,或者线程被中断。boolean await(long timeout, TimeUnit unit)是当前线程等待直到锁存储器计到0,网站模板或者线程被中断, 如果为0返回true, 可以指定等待的超时时间。countDown()递减锁存器的计数,如果到0则释放所有等待的线程。getCount()获取锁存器的计数。

下面我们看下具体的使用:

public class CountDownLaunchTest {

public static void main(String[] args) throws InterruptedException {

CountDownLatch countDownLatch = new CountDownLatch(10);

IntStream.range(0, 10).forEach(i -> {

new Thread(() -> {

try {

Thread.sleep(2000);

System.out.println("worker ------> " + i);

} catch (InterruptedException e) {

e.printStackTrace();

}finally {

countDownLatch.countDown();

}

}).start();

});

countDownLatch.await();

System.out.println("completed !");

}

}

时间输出:

worker ------> 1

worker ------> 4

worker ------> 5

worker ------> 7

worker ------> 8

worker ------> 0

worker ------> 2

worker ------> 3

worker ------> 9

worker ------> 6

completed !

进程已结束,退出代码0

可以看到任务没有完全结束之前,主线程是阻塞状态。

源码剖析

首先看下构造函数。

private final Sync sync;

public CountDownLatch(int count) {

if (count < 0) throw new IllegalArgumentException("count < 0");

this.sync = new Sync(count);

}

这个sync有没有很熟悉,这里又遇到了CAS,几乎涉及到多线程的实现类都会有。

private static final class Sync extends AbstractQueuedSynchronizer {

private static final long serialVersionUID = 4982264981922014374L;

Sync(int count) {

setState(count);

}

int getCount() {

return getState();

}

protected int tryAcquireShared(int acquires) {

return (getState() == 0) ? 1 : -1;

}

protected boolean tryReleaseShared(int releases) {

// Decrement count; signal when transition to zero

for (;;) {

int c = getState();

if (c == 0)

return false;

int nextc = c-1;

if (compareAndSetState(c, nextc))

return nextc == 0;

}

}

}

countDown

首先在构造函数中初始化状态,对应的setState(count);, 其实它的底层实现就是依赖AQS。CountDownLatch主要有两个方法一个是countDown一个是await,下面我们就来看下是如何实现的。

public void countDown() {

sync.releaseShared(1);

}public final boolean releaseShared(int arg) {

if (tryReleaseShared(arg)) {

doReleaseShared();

return true;

}

return false;

}

tryReleaseShared()方法的实现在countDownLatch,自旋操作判断值是否为0,为0说明都执行完了,服务器托管之前说的递减就是在这完成的,就会走到doReleaseShared也就是释放操作。有想过为啥c==0 返回false吗❓可以回顾上一步操作if (tryReleaseShared)才会去doReleaseShared,也就是任务全部执行完才会去释放,释放的过程其实是一个队列去完成的。

protected boolean tryReleaseShared(int releases) {

for (;;) {

int c = getState();

if (c == 0)

return false;

int nextc = c-1;

if (compareAndSetState(c, nextc))

return nextc == 0;

}

}

doReleaseShared是`AbstractQueuedSynchronizer的内部方法。

private void doReleaseShared() {

for (;;) {

Node h = head;

if (h != null && h != tail) {

int ws = h.waitStatus;

if (ws == Node.SIGNAL) {

if (!compareAndSetWaitStatus(h, Node.SIGNAL, 0))

continue; // loop to recheck cases

unparkSuccessor(h);

}

else if (ws == 0 &&

!compareAndSetWaitStatus(h, 0, Node.PROPAGATE))

continue; // loop on failed CAS

}

if (h == head) // loop if head changed

break;

}

}

这个方法之前给大家讲过,其实就是释放锁的操作。可以看到在这里只唤醒了头节点的后继节点,然后就返回了,为啥是后继节点,继续看unparkSuccessor。

private void unparkSuccessor(Node node) {

int ws = node.waitStatus;

if (ws < 0)

compareAndSetWaitStatus(node, ws, 0);

// 后继节点

Node s = node.next;

if (s == null || s.waitStatus > 0) {

s = null;

for (Node t = tail; t != null && t != node; t = t.prev)

if (t.waitStatus <= 0)

s = t;

}

if (s != null)

LockSupport.unpark(s.thread);

}

那么剩余的其它线程怎么去释放呢?

await

再看下await(),同样的也调用了内部方法acquireSharedInterruptibly。

public void await() throws InterruptedException {

sync.acquireSharedInterruptibly(1);

}public final void acquireSharedInterruptibly(int arg)

throws InterruptedException {

if (Thread.interrupted())

throw new InterruptedException();

if (tryAcquireShared(arg) < 0)

doAcquireSharedInterruptibly(arg);

}

// CountDownLatch

protected int tryAcquireShared(int acquires) {

return (getState() == 0) ? 1 : -1;

}

重点在 doAcquireSharedInterruptibly。

private void doAcquireSharedInterruptibly(int arg)

throws InterruptedException {

// 以共享模式添加到等待队列

final Node node = addWaiter(Node.SHARED);

boolean failed = true;

try {

for (;;) {

// 返回前一个节点

final Node p = node.predecessor();

if (p == head) {

int r = tryAcquireShared(arg);

if (r >= 0) {

setHeadAndPropagate(node, r);

p.next = null;

failed = false;

return;

}

}

// 检查并更新未能获取的节点的状态。如果线程应该阻塞,香港云服务器则返回 true

if (shouldParkAfterFailedAcquire(p, node) &&

parkAndCheckInterrupt())

throw new InterruptedException();

}

} finally {

// 失败就取消

if (failed)

cancelAcquire(node);

}

}

很赞哦!(544)