Talk is cheap, show me the code
@Slf4j
public final class ConcurrentUtils {
private ConcurrentUtils() {
}
/**
* 在线程池中运行任务并阻塞直到所有线程都完成
*
* @param runnables
*/
public static void executeTasksBlocking(Executor executor, List<Runnable> runnables) throws InterruptedException {
final CountDownLatch latch = new CountDownLatch(runnables.size());
for (final Runnable r : runnables) {
executor.execute(() -> {
try {
r.run();
} catch (Throwable e) {
log.error(e.getMessage(), e);
} finally {
latch.countDown();
}
});
}
try {
latch.await();
} catch (InterruptedException e) {
throw e;
}
}
/**
* 在线程池中运行任务并阻塞直到所有线程都完成,callables的顺序和返回值的顺序一致
*
* @param callables
* @return Callable的结果
*/
public static <V> List<V> submitTasksBlocking(Executor executor, List<Callable<V>> callables) throws InterruptedException {
final CountDownLatch latch = new CountDownLatch(callables.size());
final ConcurrentMap<Integer, V> seq2valueMap = new ConcurrentHashMap<>();
int i = 0;
for (final Callable<V> r : callables) {
final int j = i;
i++;
executor.execute(() -> {
try {
V v = r.call();
seq2valueMap.put(j, v);
} catch (Throwable e) {
log.error(e.getMessage(), e);
} finally {
latch.countDown();
}
});
}
try {
latch.await();
} catch (InterruptedException e) {
throw e;
}
List<V> ret = new ArrayList<>(i);
for (int k = 0; k < i; k++) {
ret.add(seq2valueMap.get(k));
}
return ret;
}
}
CountDownLatch 基于 AQS(AbstractQueuedSynchronizer) 实现
基本原理是AQS内部维护一个state字段,当state==0时,表示可以竞争获得锁,当初始化CountDownLatch时指定了n,可理解为n个线程获得了n把锁,主线程调用await()方法被阻塞住,任务线程每执行一次countDown(),state就减少1,表示释放掉了一把锁,直到state变为0,主线程可以竞争到锁,脱离阻塞状态继续往下执行
AQS是Java实现多线程同步的精髓所在,值得深入挖掘,博主将另外写一篇博客详细介绍AQS的实现原理