并发编程之深入理解Condition

并发编程之深入理解Condition

在并发编程中的开发中,我们难免会使用到等待通知模式,比如我们生产者消费者模式中,当生产者生产的东西填满了容器,则需要停止生产,当消费者把容器内的东西消费完了,也需要停止消费,同样的当容器内有新的东西生产出来,会通知消费者继续生产。可能我们平时使用synchronized比较多,一般我们使用使用object.wait()和object.notify()、notifyAll()。然而今天我们一起学习的是当我们使用jdk提供的并发编程的Lock实现等待通知模式,此时我们就需要使用Condition来实现—条件等待通知。

一、condition的简单使用

如果我们之前没用过condition,那么先来了解一下如何使用,其实和synchronized结合object.wait()和object.notify()思想是一致的。这里举了一个实例,面包生产者生产面包扔到容器中和面包消费者从容器中取出面包进行消费,当容器满了则生产者阻塞,停止生产,直到容器不满是被唤醒;当容器空了,停止消费,直到容器不空被唤醒。

简单实例代码如下:

BreadContainer.java

package com.taolong.concurrent.condition;

import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.locks.Condition;
import java.util.concurrent.locks.ReentrantLock;

/** * @Author taolong.hong * @Date 2020/5/10 16:12 * @Version 1.0 * @Description 装面包的容器 */
public class BreadContainer {
   

    //锁
    private final ReentrantLock lock;

    private final Condition condition;

    private final List<Bread> breadList;

    private final int containerSize;

    private static final int MAX_SIZE = 20;


    public BreadContainer(int containerSize) {
   
        this.lock = new ReentrantLock();
        condition = lock.newCondition();
        if (containerSize <=0 || containerSize > MAX_SIZE){
   
            this.containerSize = MAX_SIZE;
        }else{
   
            this.containerSize = containerSize;
        }
        breadList = new ArrayList<>(containerSize);
    }

    /*** * 往容器里添加生产的面包 */
    public void produceBread(Bread bread){
   
       lock.lock();
       try {
   
           //已经装满了,需要等待,并且唤醒阻塞的线程
           while (breadList.size() == containerSize){
   
               System.out.println("容器已经满了,生产者停止生产...");
               condition.await();
           }
           System.out.println("正在往容器里添加 id="+ bread.getId()+"的面包");
           breadList.add(bread);
           condition.signal();
       } catch (InterruptedException e) {
   
           e.printStackTrace();
       } finally {
   
           lock.unlock();
       }
    }

    /*** * 从容器里拿出面包 */
    public void consumeBread(){
   
        lock.lock();
        try {
   
            //当容器里没有面包则停止消费...
            while(breadList.size() == 0){
   
                System.out.println("当前容器没有面包,停止消费...");
                condition.await();
            }
            Bread bread = breadList.get(0);
            breadList.remove(0);
            System.out.println("正在消费id="+bread.getId()+"的面包");
            condition.signal();
        } catch (InterruptedException e) {
   
            e.printStackTrace();
        } finally {
   
            lock.unlock();
        }
    }


}

Bread.java

package com.taolong.concurrent.condition;

/** * @Author taolong.hong * @Date 2020/5/8 16:15 * @Version 1.0 * @Description 面包 */
public class Bread {
   

    private String name;

    private final int id;

    public Bread(int id) {
   
        this.id = id;
    }

    public String getName() {
   
        return name;
    }

    public void setName(String name) {
   
        this.name = name;
    }

    public int getId() {
   
        return id;
    }


}

ConditionTest.java

package com.taolong.concurrent.condition;

/** * @Author taolong.hong * @Date 2020/5/8 16:39 * @Version 1.0 * @Description */
public class ConditionTest {
   

    public static void main(String[] args) {
   
        BreadContainer breadContainer = new BreadContainer(20);
        Thread producer = new Thread(new BreadProducer(breadContainer));
        Thread consumer = new Thread(new BreadConsumer(breadContainer));
        producer.start();
        consumer.start();
    }


    /** * 面包生产者 */
    static class BreadProducer implements Runnable{
   
        final BreadContainer breadContainer;

        BreadProducer(BreadContainer breadContainer){
   
            this.breadContainer = breadContainer;
        }

        @Override
        public void run() {
   
            //生产100000个面包
            for (int i = 0; i < 100000; i++) {
   
                Bread bread = new Bread(i);
                breadContainer.produceBread(bread);
            }
        }
    }


    /*** * 面包消费者 */
    static class BreadConsumer implements Runnable{
   

        final BreadContainer breadContainer;

        BreadConsumer(BreadContainer breadContainer){
   
            this.breadContainer = breadContainer;
        }

        @Override
        public void run() {
   
            int i = 0;
            //消费100000个面包
            while(i<100000){
   
                breadContainer.consumeBread();
                i++;
            }
        }
    }
}

上面只是一个简单的应用实例,介绍如何使用

二、Condition原理

前面简单介绍了Condition的使用,现在开始介绍Condition的原理,本节主要结合图和文字描述condition的原理,下一节将从源码的角度分析。在分析condition的原理之前,最好先了解AQS的原理,我之前有写过一篇文章,如果不了解的可以参考(深入理解ReentrantLock和AQS)。先看下面一副condition的原理图

在这里插入图片描述

这里我用不同的颜色线条和文字标注了对应的关系,这里为什么有两个condition等待队列,是因为一个lock可以有多个不同的阻塞条件,比如大家可以参考ArrayBlockingQueue的原理,里面就定义了一个lock,两个condition(notEmpty,notFull)用于提升性能

/** Main lock guarding all access */
final ReentrantLock lock;

/** Condition for waiting takes */
private final Condition notEmpty;

/** Condition for waiting puts */
private final Condition notFull;

下面我用下面的问题来解释上面的逻辑图

1、当调用condition.await()方法时,AQS同步队列中获取锁的线程(一般是Header指向的Node)会释放锁(state同步状态置为0),同时创建一个Node节点封装thread信息,加入到condition等待队列的队尾(如果有多个condition,哪一个condition调用了就加入到对应condition的队尾)

2、AQS同步队列中释放锁之后,会唤醒同步队列下面一个Node节点,让其去竞争锁资源(修改同步状态state)

3、当AQS同步队列中持有锁的线程调用condition.signal()时,则会将condition等待队列中的第一个节点Node加入到同步队列的队尾(当然也要用cas咯),如果调用的是condition.signalAll()则会将该condition队列的所有的节点唤醒加入到AQS的队尾,后面的逻辑就和之前分析AQS的逻辑一致。

这里一定要注意,一个lock可以有多个condition,也就意味着有多个condition等待队列,调用不同的condition则处理不同的conditin的等待队列

到这里相信结合图和文字大家对condition的原理已经有了一个比较深刻的了解了,下面开始分析condition的源码

三、深入源码分析

condition是在AbstractQueuedSynchronizer内部类ConditionObject,它实现了Condition接口。我们今天主要分析它的await()和signal()方法,其他的方法大家可自行研究

1、condition.await()

public final void await() throws InterruptedException {
   
    if (Thread.interrupted())
        throw new InterruptedException();
    //1.构建Node节点,加入到condition等待队列
    Node node = addConditionWaiter();
    //2.释放全部锁(重入的话,会全部释放)
    int savedState = fullyRelease(node);
    int interruptMode = 0;
    //3.判断node是否在AQS同步队列中,如果不在则阻塞
    while (!isOnSyncQueue(node)) {
   
        //阻塞
        LockSupport.park(this);
        if ((interruptMode = checkInterruptWhileWaiting(node)) != 0)
            break;
    }
    //4.下面的逻辑就是在AQS同步队列中了,和之前分析的AQS一样的
    if (acquireQueued(node, savedState) && interruptMode != THROW_IE)
        interruptMode = REINTERRUPT;
    if (node.nextWaiter != null) // clean up if cancelled
        unlinkCancelledWaiters();
    if (interruptMode != 0)
        reportInterruptAfterWait(interruptMode);
}

我们详细来分析下上面的注释部分,其实不难。

注释1:构建Node节点,加入到condition等待队列

private Node addConditionWaiter() {
   
    //1.判断下当前线程事否是获取到锁的线程,只有获取锁的线程才能调用await方法
    if (!isHeldExclusively())
        throw new IllegalMonitorStateException();
    Node t = lastWaiter;
    // If lastWaiter is cancelled, clean out.
    //2.清除已经取消的节点,里面有个while循环,判断节点状态,如果已取消,则剔除队列,大家自行阅读
    if (t != null && t.waitStatus != Node.CONDITION) {
   
        unlinkCancelledWaiters();
        t = lastWaiter;
    }

    //3.创建节点,注意下这里的状态是CONDITION
    Node node = new Node(Node.CONDITION);
	//4.如果队列中还没有,则该节点就是第一个
    if (t == null)
        firstWaiter = node;
    //否则加入到队列尾部
    else
        t.nextWaiter = node;
    lastWaiter = node;
    return node;
}

注释2:释放全部锁(重入的话,会全部释放)

这里就不细看了,简单的描述一下:就是将当前线程持有的这个lock全部释放,如果有重入(假如state=2)说明对该锁获取了两次,那么都要释放,说白了就是放弃对这个锁的持有;另外会记录这个state的次数,当这个线程再次获取锁时这个线程要恢复这个锁状态(依然state=2),因为这个线程要在它park的地方继续执行。

注释3:判断node是否在AQS同步队列中,如果不在则阻塞

final boolean isOnSyncQueue(Node node) {
   
    //第一次刚创建的肯定返回false 
    if (node.waitStatus == Node.CONDITION || node.prev == null)
            return false;
        if (node.next != null) // If has successor, it must be on queue
            return true;
        /* * node.prev can be non-null, but not yet on queue because * the CAS to place it on queue can fail. So we have to * traverse from tail to make sure it actually made it. It * will always be near the tail in calls to this method, and * unless the CAS failed (which is unlikely), it will be * there, so we hardly ever traverse much. */
        return findNodeFromTail(node);
    }

这里判断的是该node是否在同步队列AQS中(注意不是condition队列哦),新创建出来的肯定是返回false,那么while(!isOnSyncQueue(node))就返回true,进循环体阻塞,当被唤醒时,它就会加入到AQS同步队列了,细节代码大家自行阅读。

注释4:下面的逻辑就是在AQS同步队列中了,和之前分析的AQS一样的

这里就不再继续啰嗦了,就是跟获取锁的流程一样,无非这个state的值可能不是1,而是之前释放是记录的(有可能之前有重入),不熟悉的可以参考我之前的一片文章
深入理解ReentrantLock和AQS

上面代码总逻辑分析如下

(1)创建节点,并且加入到condition的等待队列的队尾

(2)当前线程需要释放同步状态(将state置为0),并且唤醒AQS后继节点,让其竞争锁

(3)判断node是否在AQS中,此时node一般是下面几种情况

在condition队列中:处于等待队列中,状态为CONDITION,需要等待调用signal,加入到AQS同步队列

在AQS同步队列中:在AQS队列中,如果被前一个节点唤醒,则有竞争锁的资格,状态为SIGNAL,一般是调用condition.signal()后从condition的等待队列转移到AQS队列中

取消状态:过时或者状态为取消状态CANCELED,则会剔除出队列

上面awiat()方法已经基本上分析完了,接下来我们来看看signal方法

此时上面condition.await()中的while循环会阻塞,直到调用condition.signal()或者condition.signalAll(),将该node从condition等待队列中移到AQS同步队列的尾部。接下来我们就看condition.signal()方法

4、condition.signal()

/** * Moves the longest-waiting thread, if one exists, from the * wait queue for this condition to the wait queue for the * owning lock. * * @throws IllegalMonitorStateException if {@link #isHeldExclusively} * returns {@code false} */
public final void signal() {
   
    //1.只有持有锁的线程,才能调用signal,首先判断
    if (!isHeldExclusively())
        throw new IllegalMonitorStateException();
    Node first = firstWaiter;
    if (first != null)
        //2.直接看这个方法
        doSignal(first);
}

这里很简单,首先判断当前线程是否持有锁,如果没有持有则会抛异常,也就是说持有锁(同步状态)的线程才有资格调用signal()方法,下面看doSignal()

5、doSignal()

/** * Removes and transfers nodes until hit non-cancelled one or * null. Split out from signal in part to encourage compilers * to inline the case of no waiters. * @param first (non-null) the first node on condition queue */
private void doSignal(Node first) {
   
    do {
   
        if ( (firstWaiter = first.nextWaiter) == null)
            lastWaiter = null;
        first.nextWaiter = null;
    } while (!transferForSignal(first) &&
             (first = firstWaiter) != null);
}

这里也很简单,使用do-while,其实就是从condition的第一个node节点开始,把一个node节点转移到AQS同步队列中,如果失败,则再找下一个…依次循环(注意这里是转移一个,如果转移成功后就退出循环),接着看transferForSignal()

6、transferForSignal()

final boolean transferForSignal(Node node) {
   
    /* * If cannot change waitStatus, the node has been cancelled. */
    //1.首先要将node节点状态从CONDITION改变成0,失败则认为该节点被取消,则剔除
    if (!compareAndSetWaitStatus(node, Node.CONDITION, 0))
        return false;

    /* * Splice onto queue and try to set waitStatus of predecessor to * indicate that thread is (probably) waiting. If cancelled or * attempt to set waitStatus fails, wake up to resync (in which * case the waitStatus can be transiently and harmlessly wrong). */
    //2.使用自旋的cas将node节点加入到AQS队列中,返回的是node的前一个节点
    Node p = enq(node);
    int ws = p.waitStatus;
    //前面一个节点不管是CANCELLED状态还是设置signal失败,都唤醒node节点
    if (ws > 0 || !compareAndSetWaitStatus(p, ws, Node.SIGNAL))
        //唤醒节点
        LockSupport.unpark(node.thread);
    return true;
}

这段逻辑也不难,首先使用cas将node从CONDITION状态变成0,如果失败则表示该节点已取消(CANCELLED),不用转移到AQS同步队列,否则使用enq(node)自旋cas加入到AQS同步队列,这个enq(node)方法在之前获取锁失败,加入AQS同步队列时也调用了,之前的文章分析过,这里不再继续分析。接着看最后一个if条件,前一个节点的状态>0(CANNELLED)或者使用cas尝试将前一个节点的状态修改成SIGNAL,失败后则唤醒当前节点,为什么要这么做呢?

(1)如果前一个节点时SIGNAL状态,则不需要唤醒当前节点,因为AQS队列自动会通知后继节点(如果同之前没有取消的话),能保证当前节点一定会被唤醒

(2)如果前一个节点时取消,或者无法将前一个节点修改成SIGNAL状态,则当前节点有可能唤醒不了,因为前一个节点不会通知它,此时就需要手动唤醒,当手动唤醒后,该节点会执行下面的方法,将它前面取消状态(CANCELED)移除,这样就保证自己有机会被唤醒竞争锁。

private static boolean shouldParkAfterFailedAcquire(Node pred, Node node) {
   
    int ws = pred.waitStatus;
    if (ws == Node.SIGNAL)
        /* * This node has already set status asking a release * to signal it, so it can safely park. */
        return true;
    if (ws > 0) {
   
        /* * Predecessor was cancelled. Skip over predecessors and * indicate retry. */
        do {
   
            node.prev = pred = pred.prev;
        } while (pred.waitStatus > 0);
        pred.next = node;
    } else {
   
        /* * waitStatus must be 0 or PROPAGATE. Indicate that we * need a signal, but don't park yet. Caller will need to * retry to make sure it cannot acquire before parking. */
        compareAndSetWaitStatus(pred, ws, Node.SIGNAL);
    }
    return false;
}

这个方法在之前的文章分析过,这里不再继续分析。到这里基本上所有的逻辑都执行完了,此时该节点已经正常进入了同步AQS队列,和之前竞争锁失败进入同步AQS队列效果是一样的。后面竞争锁的逻辑也是一样的,这里就不再继续分析。

前面描述的在之前的文章分析过,该文章就是
深入理解ReentrantLock和AQS),本文就分享到这里,希望可以给大家带来一点帮助,同时如果有错误,欢迎大家指正!

版权声明:本文内容由互联网用户自发贡献,该文观点仅代表作者本人。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如发现本站有涉嫌侵权/违法违规的内容, 请发送邮件至 举报,一经查实,本站将立刻删除。

发布者:全栈程序员-用户IM,转载请注明出处:https://javaforall.cn/111209.html原文链接:https://javaforall.cn

【正版授权,激活自己账号】: Jetbrains全家桶Ide使用,1年售后保障,每天仅需1毛

【官方授权 正版激活】: 官方授权 正版激活 支持Jetbrains家族下所有IDE 使用个人JB账号...

(0)


相关推荐

  • mui 底部导航菜单功能(原创)[通俗易懂]

    mui 底部导航菜单功能(原创)[通俗易懂]复制下来就能用不能用就加下我的前端交流QQ群问下简单版 底部导航菜单切换 首页 9 消息 9 消息 mui.init({ subpages:[//先加载首页 { ur

  • 浮点数规格化表示例题_浮点数规格化阶码表示

    浮点数规格化表示例题_浮点数规格化阶码表示一、浮点数的表示格式浮点数表示法是指以适当的形式将比例因子表示在数据中,让小数点的位置根据需要而浮动。这样,在位数有限的情况下,既扩大了数的表示范围,又保持了数的有效精度。阶码:阶码是整数,阶符和m位阶码的数值部分共同反映浮点数的表示范围及小数点的实际位置,常用移码或补码表示。IEEE754标准中采用移码的表示形式。尾数:数符表示浮点数的符号,尾数的数值部分的位数n反映浮点…

    2022年10月31日
  • pojAGTC(LCS,DP)

    pojAGTC(LCS,DP)

  • matlab实现香农编码原理_香农编码c语言实现

    matlab实现香农编码原理_香农编码c语言实现最近有个实验是用MATLAB实现香农编码的,在网上看到了别人写的程序,大部分都不支持手动输入信源,我自己就加上了几行,能够直接输入信源分布,下面是程序:pa=input(‘请输入信源分布:’)k=length(pa);   %计算信源符号个数ifmin(pa)&lt;0||max(pa)&gt;1  %判断信源概率值是否介于0到1之间  %disp([‘信源分布pa(x)=[‘,…

  • 识别手写数字的神经网络_基于神经网络的数字分类

    识别手写数字的神经网络_基于神经网络的数字分类神经网络之手写数字文章目录神经网络之手写数字00.写在之前01.代码框架02.开始做一些准备工作03.框架的开始04.训练模型构建05.手写数字的识别06.想看源码的同学戳这里07.思考首先鼓掌,又是一个有收获的五一小假期,想前年五一出门旅游,去年五一疫情在家写了爬虫【就是我博客里的那个口袋妖怪】,这个五一就写了一个神经网络。代码参考学习于python神经网络编程这本书。实话实说,这本书看了好几次,之前打算写来着,但不知道为什么总是不敢轻易尝试,今天把五一的任务的任务都完成了,早上就想

发表回复

您的电子邮箱地址不会被公开。

关注全栈程序员社区公众号