CountDownLatch解决同步问题

2019-06-03

Talk is cheap, show me the code

@Slf4j
public final class ConcurrentUtils {

    private ConcurrentUtils() {
    }

    /**
     * 在线程池中运行任务并阻塞直到所有线程都完成
     *
     * @param runnables
     */
    public static void executeTasksBlocking(Executor executor, List<Runnable> runnables) throws InterruptedException {
        final CountDownLatch latch = new CountDownLatch(runnables.size());

        for (final Runnable r : runnables) {
            executor.execute(() -> {
                try {
                    r.run();
                } catch (Throwable e) {
                    log.error(e.getMessage(), e);
                } finally {
                    latch.countDown();
                }
            });
        }

        try {
            latch.await();
        } catch (InterruptedException e) {
            throw e;
        }
    }

    /**
     * 在线程池中运行任务并阻塞直到所有线程都完成,callables的顺序和返回值的顺序一致
     *
     * @param callables
     * @return Callable的结果
     */
    public static <V> List<V> submitTasksBlocking(Executor executor, List<Callable<V>> callables) throws InterruptedException {
        final CountDownLatch latch = new CountDownLatch(callables.size());
        final ConcurrentMap<Integer, V> seq2valueMap = new ConcurrentHashMap<>();

        int i = 0;
        for (final Callable<V> r : callables) {
            final int j = i;
            i++;
            executor.execute(() -> {
                try {
                    V v = r.call();
                    seq2valueMap.put(j, v);
                } catch (Throwable e) {
                    log.error(e.getMessage(), e);
                } finally {
                    latch.countDown();
                }
            });
        }

        try {
            latch.await();
        } catch (InterruptedException e) {
            throw e;
        }
        List<V> ret = new ArrayList<>(i);
        for (int k = 0; k < i; k++) {
            ret.add(seq2valueMap.get(k));
        }

        return ret;
    }

}

CountDownLatch 基于 AQS(AbstractQueuedSynchronizer) 实现

基本原理是AQS内部维护一个state字段,当state==0时,表示可以竞争获得锁,当初始化CountDownLatch时指定了n,可理解为n个线程获得了n把锁,主线程调用await()方法被阻塞住,任务线程每执行一次countDown(),state就减少1,表示释放掉了一把锁,直到state变为0,主线程可以竞争到锁,脱离阻塞状态继续往下执行

AQS是Java实现多线程同步的精髓所在,值得深入挖掘,博主将另外写一篇博客详细介绍AQS的实现原理