【JDK源码分析】通过源码深入分析AbstractQueued…

2018-07-24 07:52:33来源:博客园 阅读 ()

新老客户大回馈,云服务器低至5折

前言

AbstractQueuedSynchronizer是并发包的核心基础类,它是构建阻塞锁和相关同步器(信号量、事件,等等)的框架,内部为FIFO队列,采用的是一种类似CLH锁队列的同步队列。引用他人对CLH的解释,CLH锁也是一种基于链表的可扩展、高性能、公平的自旋锁,申请线程仅仅在本地变量上自旋,它不断轮询前驱的状态,假设发现前驱释放了锁就结束自旋。

源码

1. 父类AbstractOwnableSynchronizer

通过源码可以看到AbstractQueuedSynchronizer 继承了AbstractOwnableSynchronizer,而AbstractOwnableSynchronizer包含获取和设置当前独占锁的线程。

public abstract class AbstractOwnableSynchronizer implements java.io.Serializable {
    ...
    private transient Thread exclusiveOwnerThread;

    // 设置当前独占的线程
    protected final void setExclusiveOwnerThread(Thread thread) {
        exclusiveOwnerThread = thread;
    }

    // 获取当前独占的线程
    protected final Thread getExclusiveOwnerThread() {
        return exclusiveOwnerThread;
    }
}

2. 内部类Node

Node类是同步队列的节点类,保存节点状态(waitStatus),前驱节点(prev),后继节点(next)、线程(thread),每个节点带有指向前驱节点以及后继节点的引用。
同步队列结构图(其中waitStatus的状态为CANCELLED、SIGNAL、PROPAGATE、0),可以看出它是一个双向链表实现的队列
同步队列结构图

    static final class Node {
        //表示共享模式
        static final Node SHARED = new Node();
        //表示独占模式
        static final Node EXCLUSIVE = null;

        // 由于超时或者中断,节点被设置成取消状态后会一直保持这个状态,与节点关联的线程不会再次进入等待。
        static final int CANCELLED =  1;
        // 表示后继节点关联的线程处于阻塞状态,当当前节点关联的线程取消或者释放锁时通知后继节点结束阻塞或者自旋
        // 通俗点讲就是当前节点发出释放锁信号给等待获取锁的后继节点
        static final int SIGNAL    = -1;
        // 表示该节点在条件队列上,经转换后会当作同步节点(状态被设置成0)
        static final int CONDITION = -2;
        // 表示共享模式同步状态被传播下去
        static final int PROPAGATE = -3;
        // 节点等待状态
        volatile int waitStatus;
        // 前驱节点
        volatile Node prev;
        // 后继节点
        volatile Node next;
        // 当前节点关联的线程
        volatile Thread thread;
        // 条件队列(只有独占模式)下一个等待的节点或者当为SHARED时表示为共享模式的等待状态
        Node nextWaiter;
        // 获取前驱节点
        final Node predecessor() throws NullPointerException {
            Node p = prev;
            if (p == null)
                throw new NullPointerException();
            else
                return p;
        }

        Node() {    // Used to establish initial head or SHARED marker
        }

        Node(Thread thread, Node mode) {     // Used by addWaiter
            this.nextWaiter = mode;
            this.thread = thread;
        }

        Node(Thread thread, int waitStatus) { // Used by Condition
            this.waitStatus = waitStatus;
            this.thread = thread;
        }

3. 内部类ConditionObject

ConditionObject用来描述条件队列节点。
条件队列结构图(其中waitStatus只会为CONDITION或者0),条件队列(单向链表)节点会在转成同步队列节点时将waitStatus从CONDITION变为0,并且会添加具体指向next和prev的引用)
condition

public class ConditionObject implements Condition, java.io.Serializable {
        private static final long serialVersionUID = 1173984872572414699L;
        // 条件队列的头节点
        private transient Node firstWaiter;
        // 条件队列的尾节点
        private transient Node lastWaiter;
        public ConditionObject() { }

        //条件节点入列
        private Node addConditionWaiter() {
            Node t = lastWaiter;
            // 如果尾节点状态已改变(可能为已取消状态)
            if (t != null && t.waitStatus != Node.CONDITION) {
                // 遍历条件队列,移除状态为取消状态的节点
                unlinkCancelledWaiters();
                t = lastWaiter;
            }
            Node node = new Node(Thread.currentThread(), Node.CONDITION);
            // 尾节点为null时,将入列节点设置成条件队列的头节点
            // 反之则将其添加在尾节点后面
            if (t == null)
                firstWaiter = node;
            else
                t.nextWaiter = node;
            // 将入列的节点设置成条件队列的尾节点
            lastWaiter = node;
            return node;
        }

        /**
         * Removes and transfers nodes until hit non-cancelled one or
         * null. Split out from signal in part to encourage compilers
         * to inline the case of no waiters.
         * @param first (non-null) the first node on condition queue
         */
        // 从先入条件队列的节点移除或者转到同步队列,直到有取消的或为null的节点为止
        private void doSignal(Node first) {
            do {
                if ( (firstWaiter = first.nextWaiter) == null)
                    // first节点的nextWaiter为null,则将尾节点设置null
                    lastWaiter = null;
                first.nextWaiter = null;
                } while (! (first) &&
                     (first = firstWaiter) != null);
        }

         //将所有节点从条件队列中移除或者转到同步队列
        private void doSignalAll(Node first) {
            lastWaiter = firstWaiter = null;
            do {
                Node next = first.nextWaiter;
                first.nextWaiter = null;
                transferForSignal(first);
                first = next;
            } while (first != null);
        }

        // 遍历条件队列,移除状态为取消状态的节点
        private void unlinkCancelledWaiters() {
            Node t = firstWaiter;
            Node trail = null;
            while (t != null) {
                Node next = t.nextWaiter;
                // t节点为取消状态,将其移除
                if (t.waitStatus != Node.CONDITION) {
                    t.nextWaiter = null;
                    if (trail == null)
                        firstWaiter = next;
                    else
                        trail.nextWaiter = next;
                    if (next == null)
                        lastWaiter = trail;
                }
                else
                    trail = t;
                t = next;
            }
        }

        // public methods

         // 将最先入列的条件节点转到同步队列中
        public final void signal() {
            if (!isHeldExclusively())
                throw new IllegalMonitorStateException();
            Node first = firstWaiter;
            if (first != null)
                doSignal(first);
        }

        // 将依次入列的条件转到同步队列中
        public final void signalAll() {
            if (!isHeldExclusively())
                throw new IllegalMonitorStateException();
            Node first = firstWaiter;
            if (first != null)
                doSignalAll(first);
        }

         // 不可中断的条件等待
        public final void awaitUninterruptibly() {
            // 添加一个条件队列节点
            Node node = addConditionWaiter();
            // 获取节点释放锁之前的状态
            int savedState = fullyRelease(node);
            boolean interrupted = false;
            while (!isOnSyncQueue(node)) {
                //如果条件队列节点还没有被转到同步队列上,则阻塞当前线程
                LockSupport.park(this);
                // 线程中断
                if (Thread.interrupted())
                    interrupted = true;
            }
            // 此时node已经在同步队列上
            // 此时去竞争独占锁
            if (acquireQueued(node, savedState) || interrupted)
                // 竞争锁中断或者之前已被中断时,则中断当前线程
                selfInterrupt();
        }

        // 表示从等待中退出时再次中断
        private static final int REINTERRUPT =  1;
        // 表示从等待中退出时抛出异常
        private static final int THROW_IE    = -1;

         // 检查中断类型
         // 当条件节点在singalled之前取消,则返回THROW_IE 
         // 在之后,则REINTERRUPT
         // 返回0表示未中断
        private int checkInterruptWhileWaiting(Node node) {
            return Thread.interrupted() ?
                (transferAfterCancelledWait(node) ? THROW_IE : REINTERRUPT) :
                0;
        }

        // 是否抛出异常或中断当前线程
        private void reportInterruptAfterWait(int interruptMode)
            throws InterruptedException {
            if (interruptMode == THROW_IE)
                throw new InterruptedException();
            else if (interruptMode == REINTERRUPT)
                selfInterrupt();
        }

        // 加入条件等待队列
        public final void await() throws InterruptedException {
            if (Thread.interrupted())
                throw new InterruptedException();
            // 条件等待入列
            Node node = addConditionWaiter();
            // 释放当前节点,并返回当前锁状态
            int savedState = fullyRelease(node);
            int interruptMode = 0;
            while (!isOnSyncQueue(node)) {
                //如果条件队列节点还没有被转到同步队列上,则阻塞当前线程,直到被中断或者unpark
                LockSupport.park(this);
                // 检查中断类型
                if ((interruptMode = checkInterruptWhileWaiting(node)) != 0)
                    break;
            }
            // 获取当前锁状态和打断类型
            if (acquireQueued(node, savedState) && interruptMode != THROW_IE)
                interruptMode = REINTERRUPT;
            if (node.nextWaiter != null) // clean up if cancelled
                // 遍历条件队列,移除状态为取消状态的节点
                unlinkCancelledWaiters();
            if (interruptMode != 0)
                reportInterruptAfterWait(interruptMode);
        }

        // 加入条件等待队列,直到被中断、unpark、超时
        // 和上面方法类似
        public final long awaitNanos(long nanosTimeout)
                throws InterruptedException {
            if (Thread.interrupted())
                throw new InterruptedException();
            Node node = addConditionWaiter();
            int savedState = fullyRelease(node);
            final long deadline = System.nanoTime() + nanosTimeout;
            int interruptMode = 0;
            while (!isOnSyncQueue(node)) {
                //超时
                if (nanosTimeout <= 0L) {
                    transferAfterCancelledWait(node);
                    break;
                }
                // 超过自旋阈值时间时开始阻塞nanosTimeout时间
                if (nanosTimeout >= spinForTimeoutThreshold)
                    LockSupport.parkNanos(this, nanosTimeout);
                // 中断
                if ((interruptMode = checkInterruptWhileWaiting(node)) != 0)
                    break;
                nanosTimeout = deadline - System.nanoTime();
            }
            if (acquireQueued(node, savedState) && interruptMode != THROW_IE)
                interruptMode = REINTERRUPT;
            if (node.nextWaiter != null)
                // 遍历条件队列,移除状态为取消状态的节点
                unlinkCancelledWaiters();
            if (interruptMode != 0)
                reportInterruptAfterWait(interruptMode);
            return deadline - System.nanoTime();
        }

        // 加入条件等待队列,直到被中断、unpark、超时
        // 和上面几个方法类似
        public final boolean awaitUntil(Date deadline)
                throws InterruptedException {
            long abstime = deadline.getTime();
            if (Thread.interrupted())
                throw new InterruptedException();
            Node node = addConditionWaiter();
            int savedState = fullyRelease(node);
            boolean timedout = false;
            int interruptMode = 0;
            while (!isOnSyncQueue(node)) {
                if (System.currentTimeMillis() > abstime) {
                    timedout = transferAfterCancelledWait(node);
                    break;
                }
                LockSupport.parkUntil(this, abstime);
                if ((interruptMode = checkInterruptWhileWaiting(node)) != 0)
                    break;
            }
            if (acquireQueued(node, savedState) && interruptMode != THROW_IE)
                interruptMode = REINTERRUPT;
            if (node.nextWaiter != null)
                unlinkCancelledWaiters();
            if (interruptMode != 0)
                reportInterruptAfterWait(interruptMode);
            return !timedout;
        }

        // 加入条件等待队列,直到被中断、unpark、超时
        // 和上面几个方法类似
        public final boolean await(long time, TimeUnit unit)
                throws InterruptedException {
            long nanosTimeout = unit.toNanos(time);
            if (Thread.interrupted())
                throw new InterruptedException();
            Node node = addConditionWaiter();
            int savedState = fullyRelease(node);
            final long deadline = System.nanoTime() + nanosTimeout;
            boolean timedout = false;
            int interruptMode = 0;
            while (!isOnSyncQueue(node)) {
                if (nanosTimeout <= 0L) {
                    timedout = transferAfterCancelledWait(node);
                    break;
                }
                if (nanosTimeout >= spinForTimeoutThreshold)
                    LockSupport.parkNanos(this, nanosTimeout);
                if ((interruptMode = checkInterruptWhileWaiting(node)) != 0)
                    break;
                nanosTimeout = deadline - System.nanoTime();
            }
            if (acquireQueued(node, savedState) && interruptMode != THROW_IE)
                interruptMode = REINTERRUPT;
            if (node.nextWaiter != null)
                unlinkCancelledWaiters();
            if (interruptMode != 0)
                reportInterruptAfterWait(interruptMode);
            return !timedout;
        }

        //  support for instrumentation

        // 检查是否是当前condition
        final boolean isOwnedBy(AbstractQueuedSynchronizer sync) {
            return sync == AbstractQueuedSynchronizer.this;
        }

        // 条件队列中是否还有等待者
        protected final boolean hasWaiters() {
            if (!isHeldExclusively())
                throw new IllegalMonitorStateException();
            for (Node w = firstWaiter; w != null; w = w.nextWaiter) {
                // 如果节点的状态是CONDITION, 即该节点为条件队列中的节点
                if (w.waitStatus == Node.CONDITION)
                    return true;
            }
            return false;
        }

        // 获取有多少个节点正在等待队列
        protected final int getWaitQueueLength() {
            // 不是当前线程进来抛出异常
            if (!isHeldExclusively())
                throw new IllegalMonitorStateException();
            int n = 0;
            // 遍历条件队列
            for (Node w = firstWaiter; w != null; w = w.nextWaiter) {
                if (w.waitStatus == Node.CONDITION)
                    ++n;
            }
            return n;
        }

       // 获取正在条件队列中的线程集合
        protected final Collection<Thread> getWaitingThreads() {
            if (!isHeldExclusively())
                throw new IllegalMonitorStateException();
            ArrayList<Thread> list = new ArrayList<Thread>();
            for (Node w = firstWaiter; w != null; w = w.nextWaiter) {
                if (w.waitStatus == Node.CONDITION) {
                    Thread t = w.thread;
                    if (t != null)
                        list.add(t);
                }
            }
            return list;
        }
    }

4. AbstractQueuedSynchronizer类

AQS为一个抽象类,在使用时需要重写其tryAcquire和tryRelease、tryAcquireShared和tryReleaseShared、isHeldExclusively方法。
下面先看看AQS的属性

    // 头节点
    private transient volatile Node head;

    //尾节点
    private transient volatile Node tail;

    //同步状态 0表示锁可获取状态
    private volatile int state;
    // 常量 线程自旋等待时间(微秒) 
    static final long spinForTimeoutThreshold = 1000L;

    // 提供操作内存空间的能力,主要使用了其CAS的原子操作
    private static final Unsafe unsafe = Unsafe.getUnsafe();
    // 同步状态state的内存偏移量
    private static final long stateOffset;
    // 头节点head的内存偏移量
    private static final long headOffset;
    // 尾节点head的内存偏移量
    private static final long tailOffset;
    // Node节点中等待状态waitStatus内存偏移量
    private static final long waitStatusOffset;
    // Node节点中next内存偏移量
    private static final long nextOffset;
4.1 AQS主要方法

为了便于理解,先从获取锁方法,然后再释放锁的顺序开始分析。

4.1.1 acquire 获取锁方法

以独占模式获取锁并且忽略中断。实现锁,需要子类去重写tryAcquire方法。

    public final void acquire(int arg) {
        if (!tryAcquire(arg) &&
            acquireQueued(addWaiter(Node.EXCLUSIVE), arg))
            selfInterrupt();
    }

 

通过tryAcquire 尝试获取锁,如果获取成功则直接返回。获取不到锁则调用acquireQueued方法,acquireQueued方法又以addWaiter返回值作为参数。
先看addWaiter方法

    private Node addWaiter(Node mode) {
        // 生成一个当前线程的节点(mode为Node.EXCLUSIVE时表示独占、为Node.SHARED时共享模式)
        Node node = new Node(Thread.currentThread(), mode);

        Node pred = tail;
        if (pred != null) {
            // 尾节点存在时,将新生成的节点添加在尾节点tail后面
            node.prev = pred;
            // 将尾节点指向新生成的节点,之前尾节点tail的后继节点指向新生成的节点node
            if (compareAndSetTail(pred, node)) {
                pred.next = node;
                return node;
            }
        }
        // 尾节点不存在,表示同步队列还未初始化先初始化,然后将新生成的node节点添加到
        enq(node);
        return node;
    }
        // 上面使用的构造器
        Node(Thread thread, Node mode) {     // Used by addWaiter
            this.nextWaiter = mode;
            this.thread = thread;
        }
enq方法将node作为尾节点添加到同步队列(同时该方法也是条件队列节点转换成同步队列节点的方法)

    private Node enq(final Node node) {
        // 保证并发时能正确的将node节点添加队列尾端
        for (;;) {
            Node t = tail;
            // 尾节点不存在
            if (t == null) { // Must initialize
                // CAS操作生成一个默认的节点作为头节点
                if (compareAndSetHead(new Node()))
                    tail = head;
            } else {
                // node节点的前驱节点指向尾节点
                node.prev = t;
                // CAS操作将尾节点指向node节点
                if (compareAndSetTail(t, node)) {
                    t.next = node;
                    return t;
                }
            }
        }
    }

再来看acquireQueued方法,队列获取锁

    final boolean acquireQueued(final Node node, int arg) {
        boolean failed = true;
        try {
            boolean interrupted = false;
            for (;;) {
                final Node p = node.predecessor();
                if (p == head && tryAcquire(arg)) {
                    // node节点的前驱节点为头节点,并且通过tryAcquire成功获取到锁
                    // 将node节点设置成头节点,并将node节点的前驱节点和关联线程设置为null
                    setHead(node);
                    // 将之前的头节点的后继节点连接断开
                    p.next = null; // help GC
                    failed = false;
                    return interrupted;
                }
                // 如node节点的前驱节点不为头节点或者没有成功获取到锁
                // 检测是否阻塞
                if (shouldParkAfterFailedAcquire(p, node) &&
                    parkAndCheckInterrupt())
                    interrupted = true;
            }
        } finally {
            if (failed)
                cancelAcquire(node);
        }
    }

再来看看shouldParkAfterFailedAcquire,此方法由于判断线程是否需要阻塞

    private static boolean shouldParkAfterFailedAcquire(Node pred, Node node) {
        int ws = pred.waitStatus;
        if (ws == Node.SIGNAL)
            /*
             * This node has already set status asking a release
             * to signal it, so it can safely park.
             */
            return true;
        if (ws > 0) {
            // waitStatus>0 即为CANCELLED 表示该节点已经被取消
            // 删除node前驱节点中被取消的节点
            do {
                node.prev = pred = pred.prev;
            } while (pred.waitStatus > 0);
            pred.next = node;
        } else {
            /*
             * waitStatus must be 0 or PROPAGATE.  Indicate that we
             * need a signal, but don't park yet.  Caller will need to
             * retry to make sure it cannot acquire before parking.
             */
             // 此时的waitStatus必定为0或者为PROPAGATE,此时将waitStatus设置成SIGNAL,但是不会挂起
             // 然后会再次在挂起前去获取锁
            compareAndSetWaitStatus(pred, ws, Node.SIGNAL);
        }
        return false;
    }

如果node节点的前驱节点pred再此之前节点等待状态就已经是SIGNAL,则会继续调用parkAndCheckInterrupt,使当前线程挂起

    private final boolean parkAndCheckInterrupt() {
        // 挂起当前线程
        LockSupport.park(this);
        // 线程被唤醒后,然后返回线程是否被中断的状态
        return Thread.interrupted();
    }

在acquireQueued方法中线程如果被中断后还会调用cancelAcquire方法

    private void cancelAcquire(Node node) {
        // node节点不存在直接返回
        if (node == null)
            return;
        // 将node节点不再关联线程
        node.thread = null;

        // 跳过被cancelled的线程节点
        Node pred = node.prev;
        while (pred.waitStatus > 0)
            node.prev = pred = pred.prev;

        //node的未被cancelled的前驱节点的后继节点
        Node predNext = pred.next;
        //将node节点的状态设置成cancelled
        node.waitStatus = Node.CANCELLED;

        //如果node为尾节点并且成功将node节点的前驱节点设置成尾节点,
        if (node == tail && compareAndSetTail(node, pred)) {
            // 将node节点的前驱节点的后继节点引用设置为null
            compareAndSetNext(pred, predNext, null);
        } else {
            // If successor needs signal, try to set pred's next-link
            // so it will get one. Otherwise wake it up to propagate.
            int ws;
            if (pred != head &&
                ((ws = pred.waitStatus) == Node.SIGNAL ||
                 (ws <= 0 && compareAndSetWaitStatus(pred, ws, Node.SIGNAL))) &&
                pred.thread != null) {
                Node next = node.next;
                if (next != null && next.waitStatus <= 0)
                    compareAndSetNext(pred, predNext, next);
            } else {
                unparkSuccessor(node);
            }

            node.next = node; // help GC
        }
    }

acquire方法调用流程图:
acquire

4.1.2 release 释放锁
    public final boolean release(int arg) {
        if (tryRelease(arg)) {
            //释放成功
            Node h = head;
            if (h != null && h.waitStatus != 0)
                // 唤醒头节点的后继节点
                unparkSuccessor(h);
            return true;
        }
        return false;
    }

unparkSuccessor方法唤醒前驱节点

    private void unparkSuccessor(Node node) {
         // node的等待状态小于0
        int ws = node.waitStatus;
        if (ws < 0)
            compareAndSetWaitStatus(node, ws, 0);

        // 节点的后继节点
        Node s = node.next;
        if (s == null || s.waitStatus > 0) {
            //当node的后继节点为null或者状态为cancelled时从尾节点向前遍历,找到最先入列且状态不是cancelled的节点
            s = null;
            for (Node t = tail; t != null && t != node; t = t.prev)
                if (t.waitStatus <= 0)
                    s = t;
        }
        if (s != null)
            LockSupport.unpark(s.thread);
    }

总结

  1. AQS对于已经入列的阻塞的线程,会遵循FIFO规则,先入列未被取消的线程会先被唤醒;
  2. 条件队列节点转换成同步队列节点时,节点才会设置next、prev引用。

标签:

版权申明:本站文章部分自网络,如有侵权,请联系:west999com@outlook.com
特别注意:本站所有转载文章言论不代表本站观点,本站所提供的摄影照片,插画,设计作品,如需使用,请与原作者联系,版权归原作者所有

上一篇:JVM内存模型

下一篇:Java:字节流和字符流(输入流和输出流)