转载

抽象队列式同步器AQS详解

AQS源码详解

AQS,抽象的队列式的同步器,AQS定义了一套多线程访问共享资源的同步器框架,ReetrantLock、Semaphore、CountDownLatch的实现均依赖于AQS,所以详细了解其源码实现,有助于理解依赖其实现的

锁和同步器。

AQS维护一个双向的FIFO阻塞队列和一个volatile int类型的成员变量,如图所示,其实简而言之,多个线程同时对资源进行抢占时,失败的线程会添加到阻塞队列中,进入一个阻塞等待的状态。当线程使用完资源后,会相应得唤醒队列中得阻塞线程。

AQS的类图

AQS通过head, tail成员变量,维护一个双向FIFO阻塞队列,head、tail均为Node类型。

Node的waitStatus共有四种状态

  • CANCELLED(1):表示当前结点已取消调度。当timeout或被中断(响应中断的情况下),会触发变更为此状态,进入该状态后的结点将不会再变化。
  • SIGNAL(-1):表示后继结点在等待当前结点唤醒。后继结点入队时,会将前继结点的状态更新为SIGNAL。
  • CONDITION(-2):表示结点等待在Condition上,当其他线程调用了Condition的signal()方法后,CONDITION状态的结点将从等待队列转移到同步队列中,等待获取同步锁。
  • PROPAGATE(-3):共享模式下,前继结点不仅会唤醒其后继结点,同时也可能会唤醒后继的后继结点。
  • 0:新结点入队时的默认状态。

负值表示结点处于有效等待状态,而正值表示结点已被取消。

AQS支持两种资源模式,一是独占资源(EXCLUSIVE),二是共享资源(SHARED).

Node.mode == EXCLUSIVE,代表其阻塞于等待独占资源

Node.mode == SHARED,代表其阻塞于等待共享资源。

所以其资源的获取有两套模式,下面从源码角度分别介绍。

其中tryAcquire(), tryRelease(), tryAcquireShared(),tryReleaseShared()需要自定义实现,以实现特定的功能,ReetrantLock、Semaphore、CountDownLatch等都对其中的几个函数进行了自定义的实现,由于不属于AQS的源码范畴,所以在此不详细赘述

独占资源获取

acquire函数

  • 线程尝试获取共享资源, 成功则直接返回,否则调用acquireQueued函数,进入AQS阻塞队列尾部,并标注为EXCLUSIVE(独占模式)。线程在阻塞过程中,是不响应中断的,acquireQueued函数的返回值可以标记线程的中断状态,返回true代表阻塞过程中发生过中断,所以需要进行selfInterrupt,将中断额外补上。

    public final void acquire(int arg) {
    	if (!tryAcquire(arg) && acquireQueued(addWaiter(Node.EXCLUSIVE), arg))
    		selfInterrupt();
    }
    

addWaiter函数

  • addWaiterenq函数实现一个入队功能。需要注意的是enq函数是个自旋过程,队列为空时(head、tail均为null),首先会创建一个哨兵节点,并且head、tail均指向其,然后,进行正常的入队操作,当前节点的prev是哨兵节点,tail指向当前节点,哨兵节点的next指向当前节点

    private Node addWaiter(Node mode) {
        //以给定模式构造结点。mode有两种:EXCLUSIVE(独占)和SHARED(共享)
        Node node = new Node(Thread.currentThread(), mode);
        
        //尝试快速方式直接放到队尾。
        Node pred = tail;
        if (pred != null) {
            node.prev = pred;
            if (compareAndSetTail(pred, node)) {
                pred.next = node;
                return node;
            }
        }
        
        //上一步失败则通过enq入队。
        enq(node);
        return node;
    }
    
    private Node enq(final Node node) {
        //CAS"自旋",直到成功加入队尾
        for (;;) {
            Node t = tail;
            if (t == null) { 
                if (compareAndSetHead(new Node()))
                    tail = head;
            } else {//正常流程,放入队尾
                node.prev = t;
                if (compareAndSetTail(t, node)) {
                    t.next = node;
                    return t;
                }
            }
        }
    }
    

acquireQueued函数

  • acquireQueued函数是AQS阻塞队列的一个核心。首先获取当前节点的前驱节点,如果是head(哨兵节点),说明其可以获取资源,获取成功直接返回,获取失败同样也需要阻塞等待。如果不是, 通过shouldParkAfterFailedAcquire函数判断其是否需要进行阻塞等待,如果需要,则调用parkAndCheckInterrupt函数进行阻塞,等待唤醒

    final boolean acquireQueued(final Node node, int arg) {
        boolean failed = true;
        try {
            //中断标记
            boolean interrupted = false;
            
            //自旋
            for (;;) {
                final Node p = node.predecessor();//拿到前驱
               	//队列的队首(除哨兵节点的第二节点)尝试获取资源
                if (p == head && tryAcquire(arg)) {
                    setHead(node);
                    p.next = null; //方便GC
                    failed = false; // 成功获取资源
                    return interrupted;//返回等待过程中是否被中断过
                }
                
                //通过shouldParkAfterFailedAcquire函数判断自己是否需要阻塞
                //如果需要,则进入阻塞状态,如果被中断唤醒,则记录中断标志,继续阻塞等待
                if (shouldParkAfterFailedAcquire(p, node) &&
                    parkAndCheckInterrupt())
                    interrupted = true;
            }
        } finally {
            if (failed) 
                // 如果等待过程中没有成功获取资源(如timeout,或者可中断的情况下被中断了)
                // 那么取消结点在队列中的等待。
                cancelAcquire(node);
        }
    }
    
  • shouldParkAfterFailedAcquire和parkAndCheckInterrupt函数

    shouldParkAfterFailedAcquire函数主要的功能:

    1、判断其是否需要阻塞。(依据函数的返回值)

    2、通知其前驱,获取完资源后,唤醒其。由于可能存在前驱节点放弃等待的情况,所以需要从后往前遍历,知道找到waitStatus小于等于0的前驱节点,并将其状态设置为SIGNAL。

    private static boolean shouldParkAfterFailedAcquire(Node pred, Node node) {
        int ws = pred.waitStatus;//拿到前驱的状态
        if (ws == Node.SIGNAL)
            return true;
        if (ws > 0) {
            //从后往前遍历,知道找到waitStatus小于等于0的前驱节点
            do {
                node.prev = pred = pred.prev;
            } while (pred.waitStatus > 0);
            pred.next = node;
        } else {
            compareAndSetWaitStatus(pred, ws, Node.SIGNAL);
        }
        return false;
    }
    
    private final boolean parkAndCheckInterrupt() {
    	LockSupport.park(this);
    	return Thread.interrupted();
    }
    

总结一下独占资源的获取过程

  • 调用tryAcquire函数尝试获取,如果成功直接返回
  • 如果不成功,调用addWaiter函数进入阻塞队列
  • 调用acquireQueued函数,在阻塞队列中休息,等待唤醒。其中shouldParkAfterFailedAcquire用于找到当前节点安全的休息点,以保证其可以被前驱节点唤醒

独占资源释放

release函数

  • 尝试释放资源,如果成功,则唤醒队首节点(哨兵节点的next节点)

    public final boolean release(int arg) {
        if (tryRelease(arg)) {
        Node h = head;
        //如果waitStatus的状态设置为SIGNAL,则说明它的next节点需要其唤醒
        if (h != null && h.waitStatus != 0)
            unparkSuccessor(h);
            return true;
        }
        return false;
    }
    

unparkSucessor函数

  • 唤醒队首节点(哨兵节点的next节点),并重置头节点,也就是哨兵节点的waitStatus状态

    private void unparkSuccessor(Node node) {
        int ws = node.waitStatus;
        if (ws < 0)
        	//重置头节点waitStatus为0
        	compareAndSetWaitStatus(node, ws, 0);
    
        Node s = node.next;
        if (s == null || s.waitStatus > 0) {
            s = null;
            //从后往前查找,寻找需要唤醒的节点
            for (Node t = tail; t != null && t != node; t = t.prev)
                if (t.waitStatus <= 0)  s = t;
        }
        if (s != null) LockSupport.unpark(s.thread);
    }
    

释放资源的过程相对简单,主要功能就是唤醒阻塞等待的队首节点

共享资源获取

acquireShared函数

  • acquireShared函数的主要功能是获取共享资源,成功返回一个非负数,失败则调用

    doAcquireShared函数,进入阻塞队列。

    public final void acquireShared(int arg) {
        if (tryAcquireShared(arg) < 0)
        	doAcquireShared(arg);
    }
    

doAcquireShared函数

  • doAcquireShared函数类似于acquireQueued函数,如果是队首节点(哨兵节点的next节点),尝试获取资源,成功则返回一个非负数。并且如果r>0,调用setHeadAndPropagate不仅会设置head节点,并且会唤醒其后继节点

    private void doAcquireShared(int arg) {
        final Node node = addWaiter(Node.SHARED);
        boolean failed = true;
        try {
            boolean interrupted = false;
            for (;;) {
                final Node p = node.predecessor();
                if (p == head) {
                    int r = tryAcquireShared(arg);
                    if (r >= 0) {
                        setHeadAndPropagate(node, r);
                        p.next = null; // help GC
                        if (interrupted)
                            selfInterrupt();
                        failed = false;
                        return;
                    }
                }
                if (shouldParkAfterFailedAcquire(p, node) &&
                    parkAndCheckInterrupt())
                    interrupted = true;
            }
        } finally {
            if (failed)
                cancelAcquire(node);
        }
    }
    

setHeadAndPropagate函数

  • 设置当前节点为头节点,如果还有额外资源propagate>0,则唤醒后继节点

    private void setHeadAndPropagate(Node node, int propagate) {
        Node h = head; // Record old head for check below
        setHead(node);
        if (propagate > 0 || h == null || h.waitStatus < 0 ||
            (h = head) == null || h.waitStatus < 0) {
            Node s = node.next;
            if (s == null || s.isShared())
                doReleaseShared();
        }
    }
    

共享资源的释放

  • releaseShared函数释放资源

    public final boolean releaseShared(int arg) {
        if (tryReleaseShared(arg)) {
            doReleaseShared();
            return true;
        }
        return false;
    }
    
  • doReleaseShared函数唤醒阻塞队列后继线程, 如果所有锁都释放了,则将哨兵节点设置为PROPAGATE,

    表示传播唤醒队列中的阻塞的线程,其实函数核心的代码是if(h == head) break;,体现共享资源的释放过程,只要head一直在发生改变,则一直进行唤醒操作。head未发生改变主要分为一下几种情况,h==null

    或者h==tail或者资源耗尽不足以唤醒下个线程。

    private void doReleaseShared() {
        for (;;) {
            Node h = head;
            if (h != null && h != tail) {
                int ws = h.waitStatus;
                if (ws == Node.SIGNAL) {
                    if (!compareAndSetWaitStatus(h, Node.SIGNAL, 0))
                        continue;   
                    //自旋设置SIGNAL为0
                    unparkSuccessor(h);
                }
                else if (ws == 0 &&
                         !compareAndSetWaitStatus(h, 0, Node.PROPAGATE))
                    // 如果是 0,那么所有的锁都已经释放了,将h的状态设置为 PROPAGATE,表示传播唤醒队列中阻塞的线程
                    continue;              
            }
            // 如果头结点没有变,也就是 h == null 或者 h == tail 成立,那么退出
            if (h == head)                 
                break;
        }
    }
    

参考资料

Java并发编程之美

Java并发之AQS详解

AQS基本架构和原理分析

正文到此结束
本文目录