Java多线程系列–“JUC锁”09之 CountDownLatch原理和示例

Java多线程系列–“JUC锁”09之 CountDownLatch原理和示例概要前面对"独占锁"和"共享锁"有了个大致的了解;本章,我们对CountDownLatch进行学习。和ReadWriteLock.ReadLock一样,Coun

大家好,又见面了,我是你们的朋友全栈君。

 

概要

前面对”独占锁“和”共享锁“有了个大致的了解;本章,我们对CountDownLatch进行学习。和ReadWriteLock.ReadLock一样,CountDownLatch的本质也是一个”共享锁”。本章的内容包括:
CountDownLatch简介
CountDownLatch数据结构

CountDownLatch源码分析(基于JDK1.7.0_40)
CountDownLatch示例

转载请注明出处:http://www.cnblogs.com/skywang12345/p/3533887.html

 

CountDownLatch简介

CountDownLatch是一个同步辅助类,在完成一组正在其他线程中执行的操作之前,它允许一个或多个线程一直等待。

 

CountDownLatch和CyclicBarrier的区别
(01) CountDownLatch的作用是允许1或N个线程等待其他线程完成执行;而CyclicBarrier则是允许N个线程相互等待。
(02) CountDownLatch的计数器无法被重置;CyclicBarrier的计数器可以被重置后使用,因此它被称为是循环的barrier。
关于CyclicBarrier的原理,后面一章再来学习。

CountDownLatch函数列表

CountDownLatch(int count)
构造一个用给定计数初始化的 CountDownLatch。

// 使当前线程在锁存器倒计数至零之前一直等待,除非线程被中断。
void await()
// 使当前线程在锁存器倒计数至零之前一直等待,除非线程被中断或超出了指定的等待时间。
boolean await(long timeout, TimeUnit unit)
// 递减锁存器的计数,如果计数到达零,则释放所有等待的线程。
void countDown()
// 返回当前计数。
long getCount()
// 返回标识此锁存器及其状态的字符串。
String toString()

 

CountDownLatch数据结构

CountDownLatch的UML类图如下:

<span role="heading" aria-level="2">Java多线程系列--“JUC锁”09之 CountDownLatch原理和示例

CountDownLatch的数据结构很简单,它是通过”共享锁“实现的。它包含了sync对象,sync是Sync类型。Sync是实例类,它继承于AQS。

 

CountDownLatch源码分析(基于JDK1.7.0_40)

CountDownLatch完整源码(基于JDK1.7.0_40)

<span role="heading" aria-level="2">Java多线程系列--“JUC锁”09之 CountDownLatch原理和示例
<span role="heading" aria-level="2">Java多线程系列--“JUC锁”09之 CountDownLatch原理和示例

 1 /*  2  * ORACLE PROPRIETARY/CONFIDENTIAL. Use is subject to license terms.  3  *  4  *  5  *  6  *  7  *  8  *  9  *  10  *  11  *  12  *  13  *  14  *  15  *  16  *  17  *  18  *  19  *  20  *  21  *  22  *  23 */  24  25 /*  26  *  27  *  28  *  29  *  30  *  31  * Written by Doug Lea with assistance from members of JCP JSR-166  32  * Expert Group and released to the public domain, as explained at  33  * http://creativecommons.org/publicdomain/zero/1.0/  34 */  35  36 package java.util.concurrent;  37 import java.util.concurrent.locks.*;  38 import java.util.concurrent.atomic.*;  39  40 /**  41  * A synchronization aid that allows one or more threads to wait until  42  * a set of operations being performed in other threads completes.  43  *  44  * <p>A {@code CountDownLatch} is initialized with a given <em>count</em>.  45  * The {@link #await await} methods block until the current count reaches  46  * zero due to invocations of the {@link #countDown} method, after which  47  * all waiting threads are released and any subsequent invocations of  48  * {@link #await await} return immediately. This is a one-shot phenomenon  49  * -- the count cannot be reset. If you need a version that resets the  50  * count, consider using a {@link CyclicBarrier}.  51  *  52  * <p>A {@code CountDownLatch} is a versatile synchronization tool  53  * and can be used for a number of purposes. A  54  * {@code CountDownLatch} initialized with a count of one serves as a  55  * simple on/off latch, or gate: all threads invoking {@link #await await}  56  * wait at the gate until it is opened by a thread invoking {@link  57  * #countDown}. A {@code CountDownLatch} initialized to <em>N</em>  58  * can be used to make one thread wait until <em>N</em> threads have  59  * completed some action, or some action has been completed N times.  60  *  61  * <p>A useful property of a {@code CountDownLatch} is that it  62  * doesn't require that threads calling {@code countDown} wait for  63  * the count to reach zero before proceeding, it simply prevents any  64  * thread from proceeding past an {@link #await await} until all  65  * threads could pass.  66  *  67  * <p><b>Sample usage:</b> Here is a pair of classes in which a group  68  * of worker threads use two countdown latches:  69  * <ul>  70  * <li>The first is a start signal that prevents any worker from proceeding  71  * until the driver is ready for them to proceed;  72  * <li>The second is a completion signal that allows the driver to wait  73  * until all workers have completed.  74  * </ul>  75  *  76  * <pre>  77  * class Driver { // ...  78  * void main() throws InterruptedException {  79  * CountDownLatch startSignal = new CountDownLatch(1);  80  * CountDownLatch doneSignal = new CountDownLatch(N);  81  *  82  * for (int i = 0; i < N; ++i) // create and start threads  83  * new Thread(new Worker(startSignal, doneSignal)).start();  84  *  85  * doSomethingElse(); // don't let run yet  86  * startSignal.countDown(); // let all threads proceed  87  * doSomethingElse();  88  * doneSignal.await(); // wait for all to finish  89  * }  90  * }  91  *  92  * class Worker implements Runnable {  93  * private final CountDownLatch startSignal;  94  * private final CountDownLatch doneSignal;  95  * Worker(CountDownLatch startSignal, CountDownLatch doneSignal) {  96  * this.startSignal = startSignal;  97  * this.doneSignal = doneSignal;  98  * }  99  * public void run() { 100  * try { 101  * startSignal.await(); 102  * doWork(); 103  * doneSignal.countDown(); 104  * } catch (InterruptedException ex) {} // return; 105  * } 106  * 107  * void doWork() { ... } 108  * } 109  * 110  * </pre> 111  * 112  * <p>Another typical usage would be to divide a problem into N parts, 113  * describe each part with a Runnable that executes that portion and 114  * counts down on the latch, and queue all the Runnables to an 115  * Executor. When all sub-parts are complete, the coordinating thread 116  * will be able to pass through await. (When threads must repeatedly 117  * count down in this way, instead use a {@link CyclicBarrier}.) 118  * 119  * <pre> 120  * class Driver2 { // ... 121  * void main() throws InterruptedException { 122  * CountDownLatch doneSignal = new CountDownLatch(N); 123  * Executor e = ... 124  * 125  * for (int i = 0; i < N; ++i) // create and start threads 126  * e.execute(new WorkerRunnable(doneSignal, i)); 127  * 128  * doneSignal.await(); // wait for all to finish 129  * } 130  * } 131  * 132  * class WorkerRunnable implements Runnable { 133  * private final CountDownLatch doneSignal; 134  * private final int i; 135  * WorkerRunnable(CountDownLatch doneSignal, int i) { 136  * this.doneSignal = doneSignal; 137  * this.i = i; 138  * } 139  * public void run() { 140  * try { 141  * doWork(i); 142  * doneSignal.countDown(); 143  * } catch (InterruptedException ex) {} // return; 144  * } 145  * 146  * void doWork() { ... } 147  * } 148  * 149  * </pre> 150  * 151  * <p>Memory consistency effects: Until the count reaches 152  * zero, actions in a thread prior to calling 153  * {@code countDown()} 154  * <a href="package-summary.html#MemoryVisibility"><i>happen-before</i></a> 155  * actions following a successful return from a corresponding 156  * {@code await()} in another thread. 157  * 158  * @since 1.5 159  * @author Doug Lea 160 */ 161 public class CountDownLatch { 162 /** 163  * Synchronization control For CountDownLatch. 164  * Uses AQS state to represent count. 165 */ 166 private static final class Sync extends AbstractQueuedSynchronizer { 167 private static final long serialVersionUID = 4982264981922014374L; 168 169 Sync(int count) { 170  setState(count); 171  } 172 173 int getCount() { 174 return getState(); 175  } 176 177 protected int tryAcquireShared(int acquires) { 178 return (getState() == 0) ? 1 : -1; 179  } 180 181 protected boolean tryReleaseShared(int releases) { 182 // Decrement count; signal when transition to zero 183 for (;;) { 184 int c = getState(); 185 if (c == 0) 186 return false; 187 int nextc = c-1; 188 if (compareAndSetState(c, nextc)) 189 return nextc == 0; 190  } 191  } 192  } 193 194 private final Sync sync; 195 196 /** 197  * Constructs a {@code CountDownLatch} initialized with the given count. 198  * 199  * @param count the number of times {@link #countDown} must be invoked 200  * before threads can pass through {@link #await} 201  * @throws IllegalArgumentException if {@code count} is negative 202 */ 203 public CountDownLatch(int count) { 204 if (count < 0) throw new IllegalArgumentException("count < 0"); 205 this.sync = new Sync(count); 206  } 207 208 /** 209  * Causes the current thread to wait until the latch has counted down to 210  * zero, unless the thread is {@linkplain Thread#interrupt interrupted}. 211  * 212  * <p>If the current count is zero then this method returns immediately. 213  * 214  * <p>If the current count is greater than zero then the current 215  * thread becomes disabled for thread scheduling purposes and lies 216  * dormant until one of two things happen: 217  * <ul> 218  * <li>The count reaches zero due to invocations of the 219  * {@link #countDown} method; or 220  * <li>Some other thread {@linkplain Thread#interrupt interrupts} 221  * the current thread. 222  * </ul> 223  * 224  * <p>If the current thread: 225  * <ul> 226  * <li>has its interrupted status set on entry to this method; or 227  * <li>is {@linkplain Thread#interrupt interrupted} while waiting, 228  * </ul> 229  * then {@link InterruptedException} is thrown and the current thread's 230  * interrupted status is cleared. 231  * 232  * @throws InterruptedException if the current thread is interrupted 233  * while waiting 234 */ 235 public void await() throws InterruptedException { 236 sync.acquireSharedInterruptibly(1); 237  } 238 239 /** 240  * Causes the current thread to wait until the latch has counted down to 241  * zero, unless the thread is {@linkplain Thread#interrupt interrupted}, 242  * or the specified waiting time elapses. 243  * 244  * <p>If the current count is zero then this method returns immediately 245  * with the value {@code true}. 246  * 247  * <p>If the current count is greater than zero then the current 248  * thread becomes disabled for thread scheduling purposes and lies 249  * dormant until one of three things happen: 250  * <ul> 251  * <li>The count reaches zero due to invocations of the 252  * {@link #countDown} method; or 253  * <li>Some other thread {@linkplain Thread#interrupt interrupts} 254  * the current thread; or 255  * <li>The specified waiting time elapses. 256  * </ul> 257  * 258  * <p>If the count reaches zero then the method returns with the 259  * value {@code true}. 260  * 261  * <p>If the current thread: 262  * <ul> 263  * <li>has its interrupted status set on entry to this method; or 264  * <li>is {@linkplain Thread#interrupt interrupted} while waiting, 265  * </ul> 266  * then {@link InterruptedException} is thrown and the current thread's 267  * interrupted status is cleared. 268  * 269  * <p>If the specified waiting time elapses then the value {@code false} 270  * is returned. If the time is less than or equal to zero, the method 271  * will not wait at all. 272  * 273  * @param timeout the maximum time to wait 274  * @param unit the time unit of the {@code timeout} argument 275  * @return {@code true} if the count reached zero and {@code false} 276  * if the waiting time elapsed before the count reached zero 277  * @throws InterruptedException if the current thread is interrupted 278  * while waiting 279 */ 280 public boolean await(long timeout, TimeUnit unit) 281 throws InterruptedException { 282 return sync.tryAcquireSharedNanos(1, unit.toNanos(timeout)); 283  } 284 285 /** 286  * Decrements the count of the latch, releasing all waiting threads if 287  * the count reaches zero. 288  * 289  * <p>If the current count is greater than zero then it is decremented. 290  * If the new count is zero then all waiting threads are re-enabled for 291  * thread scheduling purposes. 292  * 293  * <p>If the current count equals zero then nothing happens. 294 */ 295 public void countDown() { 296 sync.releaseShared(1); 297  } 298 299 /** 300  * Returns the current count. 301  * 302  * <p>This method is typically used for debugging and testing purposes. 303  * 304  * @return the current count 305 */ 306 public long getCount() { 307 return sync.getCount(); 308  } 309 310 /** 311  * Returns a string identifying this latch, as well as its state. 312  * The state, in brackets, includes the String {@code "Count ="} 313  * followed by the current count. 314  * 315  * @return a string identifying this latch, as well as its state 316 */ 317 public String toString() { 318 return super.toString() + "[Count = " + sync.getCount() + "]"; 319  } 320 }

View Code

CountDownLatch是通过“共享锁”实现的。下面,我们分析CountDownLatch中3个核心函数: CountDownLatch(int count), await(), countDown()。

 

1. CountDownLatch(int count)

public CountDownLatch(int count) { if (count < 0) throw new IllegalArgumentException("count < 0"); this.sync = new Sync(count); }

说明:该函数是创建一个Sync对象,而Sync是继承于AQS类。Sync构造函数如下:

Sync(int count) { setState(count); }

 

setState()在AQS中实现,源码如下:

protected final void setState(long newState) { state = newState; }

说明:在AQS中,state是一个private volatile long类型的对象。对于CountDownLatch而言,state表示的”锁计数器“。CountDownLatch中的getCount()最终是调用AQS中的getState(),返回的state对象,即”锁计数器“。

 

2. await()

public void await() throws InterruptedException { sync.acquireSharedInterruptibly(1); }

说明:该函数实际上是调用的AQS的acquireSharedInterruptibly(1);

AQS中的acquireSharedInterruptibly()的源码如下:

public final void acquireSharedInterruptibly(long arg) throws InterruptedException { if (Thread.interrupted()) throw new InterruptedException(); if (tryAcquireShared(arg) < 0) doAcquireSharedInterruptibly(arg); }

说明:acquireSharedInterruptibly()的作用是获取共享锁。
如果当前线程是中断状态,则抛出异常InterruptedException。否则,调用tryAcquireShared(arg)尝试获取共享锁;尝试成功则返回,否则就调用doAcquireSharedInterruptibly()。doAcquireSharedInterruptibly()会使当前线程一直等待,直到当前线程获取到共享锁(或被中断)才返回。

tryAcquireShared()在CountDownLatch.java中被重写,它的源码如下:

protected int tryAcquireShared(int acquires) { return (getState() == 0) ? 1 : -1; }

说明:tryAcquireShared()的作用是尝试获取共享锁。
如果”锁计数器=0″,即锁是可获取状态,则返回1;否则,锁是不可获取状态,则返回-1。

private void doAcquireSharedInterruptibly(long arg) throws InterruptedException { // 创建"当前线程"的Node节点,且Node中记录的锁是"共享锁"类型;并将该节点添加到CLH队列末尾。 final Node node = addWaiter(Node.SHARED); boolean failed = true; try { for (;;) { // 获取上一个节点。 // 如果上一节点是CLH队列的表头,则"尝试获取共享锁"。 final Node p = node.predecessor(); if (p == head) { long r = tryAcquireShared(arg); if (r >= 0) { setHeadAndPropagate(node, r); p.next = null; // help GC failed = false; return; } } // (上一节点不是CLH队列的表头) 当前线程一直等待,直到获取到共享锁。 // 如果线程在等待过程中被中断过,则再次中断该线程(还原之前的中断状态)。 if (shouldParkAfterFailedAcquire(p, node) && parkAndCheckInterrupt()) throw new InterruptedException(); } } finally { if (failed) cancelAcquire(node); } }

说明
(01) addWaiter(Node.SHARED)的作用是,创建”当前线程“的Node节点,且Node中记录的锁的类型是”共享锁“(Node.SHARED);并将该节点添加到CLH队列末尾。关于Node和CLH在”Java多线程系列–“JUC锁”03之 公平锁(一)“已经详细介绍过,这里就不再重复说明了。
(02) node.predecessor()的作用是,获取上一个节点。如果上一节点是CLH队列的表头,则”尝试获取共享锁“。
(03) shouldParkAfterFailedAcquire()的作用和它的名称一样,如果在尝试获取锁失败之后,线程应该等待,则返回true;否则,返回false。
(04) 当shouldParkAfterFailedAcquire()返回ture时,则调用parkAndCheckInterrupt(),当前线程会进入等待状态,直到获取到共享锁才继续运行。
doAcquireSharedInterruptibly()中的shouldParkAfterFailedAcquire(), parkAndCheckInterrupt等函数在”Java多线程系列–“JUC锁”03之 公平锁(一)“中介绍过,这里也就不再详细说明了。

 

3. countDown()

public void countDown() { sync.releaseShared(1); }

说明:该函数实际上调用releaseShared(1)释放共享锁。

releaseShared()在AQS中实现,源码如下:

public final boolean releaseShared(int arg) { if (tryReleaseShared(arg)) { doReleaseShared(); return true; } return false; }

说明:releaseShared()的目的是让当前线程释放它所持有的共享锁。
它首先会通过tryReleaseShared()去尝试释放共享锁。尝试成功,则直接返回;尝试失败,则通过doReleaseShared()去释放共享锁。

tryReleaseShared()在CountDownLatch.java中被重写,源码如下:

protected boolean tryReleaseShared(int releases) { // Decrement count; signal when transition to zero for (;;) { // 获取“锁计数器”的状态 int c = getState(); if (c == 0) return false; // “锁计数器”-1 int nextc = c-1; // 通过CAS函数进行赋值。 if (compareAndSetState(c, nextc)) return nextc == 0; } }

说明:tryReleaseShared()的作用是释放共享锁,将“锁计数器”的值-1。

 

总结:CountDownLatch是通过“共享锁”实现的。在创建CountDownLatch中时,会传递一个int类型参数count,该参数是“锁计数器”的初始状态,表示该“共享锁”最多能被count给线程同时获取。当某线程调用该CountDownLatch对象的await()方法时,该线程会等待“共享锁”可用时,才能获取“共享锁”进而继续运行。而“共享锁”可用的条件,就是“锁计数器”的值为0!而“锁计数器”的初始值为count,每当一个线程调用该CountDownLatch对象的countDown()方法时,才将“锁计数器”-1;通过这种方式,必须有count个线程调用countDown()之后,“锁计数器”才为0,而前面提到的等待线程才能继续运行!

以上,就是CountDownLatch的实现原理。

 

CountDownLatch的使用示例

下面通过CountDownLatch实现:”主线程”等待”5个子线程”全部都完成”指定的工作(休眠1000ms)”之后,再继续运行。

 1 import java.util.concurrent.CountDownLatch;  2 import java.util.concurrent.CyclicBarrier;  3  4 public class CountDownLatchTest1 {  5  6 private static int LATCH_SIZE = 5;  7 private static CountDownLatch doneSignal;  8 public static void main(String[] args) {  9 10 try { 11 doneSignal = new CountDownLatch(LATCH_SIZE); 12 13 // 新建5个任务 14 for(int i=0; i<LATCH_SIZE; i++) 15 new InnerThread().start(); 16 17 System.out.println("main await begin."); 18 // "主线程"等待线程池中5个任务的完成 19  doneSignal.await(); 20 21 System.out.println("main await finished."); 22 } catch (InterruptedException e) { 23  e.printStackTrace(); 24  } 25  } 26 27 static class InnerThread extends Thread{ 28 public void run() { 29 try { 30 Thread.sleep(1000); 31 System.out.println(Thread.currentThread().getName() + " sleep 1000ms."); 32 // 将CountDownLatch的数值减1 33  doneSignal.countDown(); 34 } catch (InterruptedException e) { 35  e.printStackTrace(); 36  } 37  } 38  } 39 }

运行结果

main await begin. Thread-0 sleep 1000ms. Thread-2 sleep 1000ms. Thread-1 sleep 1000ms. Thread-4 sleep 1000ms. Thread-3 sleep 1000ms. main await finished.

结果说明:主线程通过doneSignal.await()等待其它线程将doneSignal递减至0。其它的5个InnerThread线程,每一个都通过doneSignal.countDown()将doneSignal的值减1;当doneSignal为0时,main被唤醒后继续执行。

  


更多内容

1. Java多线程系列–“JUC锁”01之 框架 

2. Java多线程系列–“JUC锁”02之 互斥锁ReentrantLock 

3. Java多线程系列–“JUC锁”03之 公平锁(一) 

4. Java多线程系列–“JUC锁”04之 公平锁(二)

5. Java多线程系列–“JUC锁”05之 非公平锁

6. Java多线程系列–“JUC锁”06之 Condition条件

7. Java多线程系列–“JUC锁”07之 LockSupport 

8. Java多线程系列–“JUC锁”08之 共享锁和ReentrantReadWriteLock

9. Java多线程系列目录(共xx篇)

 

版权声明:本文内容由互联网用户自发贡献,该文观点仅代表作者本人。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如发现本站有涉嫌侵权/违法违规的内容, 请发送邮件至 举报,一经查实,本站将立刻删除。

发布者:全栈程序员-用户IM,转载请注明出处:https://javaforall.cn/154386.html原文链接:https://javaforall.cn

【正版授权,激活自己账号】: Jetbrains全家桶Ide使用,1年售后保障,每天仅需1毛

【官方授权 正版激活】: 官方授权 正版激活 支持Jetbrains家族下所有IDE 使用个人JB账号...

(0)
blank

相关推荐

  • phpspreadsheet 读取 Excel 表格问题

    phpspreadsheet 读取 Excel 表格问题要读取大量数据,需要ReadFilter,指定读取范围,下面是我的ReadFilter类<?phpnamespacecommon\models;classMyExcelReadFilterimplements\PhpOffice\PhpSpreadsheet\Reader\IReadFilter{private$startRowNo;//如1private$endRowNo;//如1000private$

  • gitbook如何_github入门与实践

    gitbook如何_github入门与实践  本文从“是什么”、“为什么”、“怎么办”、“好不好”四个维度来介绍GitBook,带你从黑暗之中走出来,get这种美妙的写作方式。是什么?  在我认识GitBook之前,我已经在使用Git了,毋容置疑,Git是目前世界上最先进的分布式版本控制系统。  我认为Git不仅是程序员管理代码的工具,它的分布式协作方式同样适用于很多场合,其中一个就是写作(这会是一个…

  • mysql好还是oracle好_oracle优缺点

    mysql好还是oracle好_oracle优缺点Oracle与MySQL的区别以及优缺点MySQL的特点1、性能卓越,服务稳定,很少出现异常宕机;2、开放源代码无版本制约,自主性及使用成本低;3、历史悠久,社区和用户非常活跃,遇到问题及时寻求帮助;4、软件体积小,安装使用简单且易于维护,维护成本低;品牌口碑效应;5、支持多种OS,提供多种API接口,支持多种开发语言,对流行的PHP,Java很好的支持MySQL的缺点1、MySQL最大的缺点是其安全系统,主要是复杂而非标准,另外只有到调用mysqladmin来重读用户权限才会发生改变;2

  • PotPlayer 64 bit快捷键大全

    PotPlayer 64 bit快捷键大全PotPlayer64bit快捷键大全前言-`д´-最近用PotPlayer64bit,在不知道的情况下视频翻转一下或者各种奇葩样子,总归一句话,弄不好了(눈_눈)。今天有时间总结一下(¬、¬)。  下载安装ヽ(o・་།・o)ノPotPlayer中文网PotPlayer官网ᕦ(・ㅂ・)ᕤ 快捷键方法一????安装完之后,右键–>关于–>有没有看到快捷键列表,就是那个????方法二????看下面看下面,没错,又是它

  • outsystems

    outsystemsoutsystems从入门到精通-朱家俊的文章-知乎https://zhuanlan.zhihu.com/p/322582052outsystemsoutsystemsoutsystemsoutsystemsoutsystemsoutsystems

    2022年10月22日
  • js Map对象的用法[通俗易懂]

    js Map对象的用法[通俗易懂]第一篇:Map:Map是一组键值对的结构,具有极快的查找速度。举个例子,假设要根据同学的名字查找对应的成绩,如果用Array实现,需要两个Array:varnames=[‘Michael’,’Bob’,’Tracy’];varscores=[95,75,85];给定一个名字,要查找对应的成绩,就先要在names中找到对应的位置,再从scores取出对应…

发表回复

您的电子邮箱地址不会被公开。

关注全栈程序员社区公众号