Java并发(十):读写锁ReentrantReadWriteLock

2018-11-20 03:18:40来源:博客园 阅读 ()

新老客户大回馈,云服务器低至5折

读写锁维护着一对锁,一个读锁和一个写锁。通过分离读锁和写锁,使得并发性比一般的排他锁有了较大的提升:在同一时间可以允许多个读线程同时访问,但是在写线程访问时,所有读线程和写线程都会被阻塞。

读写锁的主要特性:

  1. 公平性:支持公平性和非公平性。
  2. 重入性:支持重入。读写锁最多支持65535个递归写入锁和65535个递归读取锁。
  3. 锁降级:遵循获取写锁、获取读锁在释放写锁的次序,写锁能够降级成为读锁

一、类结构

public class ReentrantReadWriteLock implements ReadWriteLock, java.io.Serializable {
    // 属性
    private final ReentrantReadWriteLock.ReadLock readerLock;
    private final ReentrantReadWriteLock.WriteLock writerLock;
    final Sync sync; //// 内部类
    abstract static class Sync extends AbstractQueuedSynchronizer {}
    static final class FairSync extends Sync {}
    static final class NonfairSync extends Sync {}
    public static class ReadLock implements Lock, java.io.Serializable {}
    public static class WriteLock implements Lock, java.io.Serializable {}
}

二、读写锁实现

ReadLock 使用了共享模式,WriteLock 使用了独占模式。

ReadLock 和 WriteLock都是通过同一个Sync实例实现的。

AQS 将 state(int)分为高 16 位和低16位,高16位用于共享模式ReadLock,低16位用于独占模式WriteLock 。

    static final int SHARED_SHIFT   = 16;
    static final int SHARED_UNIT    = (1 << SHARED_SHIFT);
    static final int MAX_COUNT      = (1 << SHARED_SHIFT) - 1;
    static final int EXCLUSIVE_MASK = (1 << SHARED_SHIFT) - 1;

    static int sharedCount(int c)    { return c >>> SHARED_SHIFT; } // 无符号补0右移16位 - 读锁
    static int exclusiveCount(int c) { return c & EXCLUSIVE_MASK; } // 抹掉高16位 - 写锁

WriteLock:

1.state的低16位(0代表没有被占用,大于0代表有线程持有当前锁(锁可以重入,每次重入都+1) 最多2^16-1次重入

2.exclusiveOwnerThread == Thread.currentThread()

ReadLock:

1.state的高16位(0代表没有被占用,大于0代表有线程持有当前锁(锁可以重入,每次重入都+1) 最多2^16-1次重入

2.ThreadLocalHoldCounter readHolds; // 记录线程持有的读锁数量

  readHolds.threadLocals - Map<Thread, HoldCounter>

  HoldCounter - count  tid

三、源码分析

 写锁获取:

    // ReentrantReadWriteLock.WriteLock.lock()
    public void lock() {
        sync.acquire(1);
    }
    
    // AbstractQueuedSynchronizer.acquire(int)
    public final void acquire(int arg) {
        if (!tryAcquire(arg) &&
            acquireQueued(addWaiter(Node.EXCLUSIVE), arg))
            selfInterrupt();
    }
    
    /**
     * ReentrantReadWriteLock.Sync.tryAcquire(int)
     * 可以获取写锁的两种情况:
     * 1.没有线程占用该锁(读锁和写锁都没有被占用)
     * 2.当前线程已经拿到过该写锁,重入
     */
    protected final boolean tryAcquire(int acquires) {
        Thread current = Thread.currentThread();
        int c = getState();
        int w = exclusiveCount(c); // 写锁
        if (c != 0) { // 有锁 
            if (w == 0 || current != getExclusiveOwnerThread()) // 有锁且没有写锁(即有读锁) || 其他线程占用了写锁
                return false;
            if (w + exclusiveCount(acquires) > MAX_COUNT) // 重入锁上限 2^16-1
                throw new Error("Maximum lock count exceeded");
            setState(c + acquires);
            return true;
        }
        // 没有线程占用该锁,直接获取锁
        if (writerShouldBlock() || // 如果是公平锁需要排队
            !compareAndSetState(c, c + acquires))
            return false;
        setExclusiveOwnerThread(current);
        return true;
    }

 写锁释放:

    // ReentrantReadWriteLock.WriteLock.unlock()
    public void unlock() {
        sync.release(1);
    }
    
    // AbstractQueuedSynchronizer.release(int)
    public final boolean release(int arg) {
        if (tryRelease(arg)) {
            Node h = head;
            if (h != null && h.waitStatus != 0)
                unparkSuccessor(h);
            return true;
        }
        return false;
    }
    
    /**
     * ReentrantReadWriteLock.Sync.tryRelease(int)
     * 释放写锁:维护state和exclusiveOwnerThread
     * 
     */
    protected final boolean tryRelease(int releases) {
        if (!isHeldExclusively())
            throw new IllegalMonitorStateException();
        int nextc = getState() - releases;
        boolean free = exclusiveCount(nextc) == 0;
        if (free)
            setExclusiveOwnerThread(null);
        setState(nextc);
        return free;
    }

读锁

    abstract static class Sync extends AbstractQueuedSynchronizer {
        // 这个嵌套类的实例用来记录每个线程持有的读锁数量(读锁重入)
        static final class HoldCounter {
            int count = 0; // 持有的读锁数
            final long tid = getThreadId(Thread.currentThread()); // 线程 id
        }

        static final class ThreadLocalHoldCounter extends ThreadLocal<HoldCounter> {
            public HoldCounter initialValue() {
                return new HoldCounter();
            }
        }
        
        /**
          * 组合使用上面两个类,用一个 ThreadLocal 来记录线程持有的读锁数量
          */ 
        private transient ThreadLocalHoldCounter readHolds;

        /**
         * 用于缓存,记录最后一个获取读锁的线程的读锁重入次数
         * 不管哪个线程获取到读锁后,就把这个值占为已用,这样就不用到 ThreadLocal 中查询 map 了
         * 通常读锁的获取很快就会伴随着释放,在 获取->释放 读锁这段时间,如果没有其他线程获取读锁的话,此缓存就能帮助提高性能
         */
        private transient HoldCounter cachedHoldCounter;

        /**
         * 第一个获取读锁的线程(并且其未释放读锁),以及它持有的读锁数量
         * 提高性能用
         */
        private transient Thread firstReader = null;
        private transient int firstReaderHoldCount;

    }    

读锁获取:

// ReentrantReadWriteLock.ReadLock.lock()
    public void lock() {
        sync.acquireShared(1);
    }
    
    // AbstractQueuedSynchronizer.acquireShared(int)
    public final void acquireShared(int arg) {
        if (tryAcquireShared(arg) < 0)
            doAcquireShared(arg);
    }
    
    /**
     * ReentrantReadWriteLock.Sync.tryAcquireShared(int)
     * 可以获取读锁情况:
     * 1.没有线程占用该锁
     * 2.只有读锁
     * 3.有写锁,写锁被当前线程占用,锁降级
     * 三种情况 - 只要没有其他线程占用写锁就可以获取读锁
     */
    protected final int tryAcquireShared(int unused) {
        Thread current = Thread.currentThread();
        int c = getState();
        if (exclusiveCount(c) != 0 &&
            getExclusiveOwnerThread() != current) // 其他线程占用写锁
            return -1;
        int r = sharedCount(c); // 读锁
        if (!readerShouldBlock() && r < MAX_COUNT && compareAndSetState(c, c + SHARED_UNIT)) { // 进入此if,表示可以拿到读锁了 
            if (r == 0) { // 将"第一个"获取读锁的线程记录在firstReader属性中
                firstReader = current;
                firstReaderHoldCount = 1;
            } else if (firstReader == current) {
                firstReaderHoldCount++;
            } else { // 1.当前线程及对应读锁次数存入cachedHoldCounter 2.当前线程及对应读锁次数存入readHolds
                HoldCounter rh = cachedHoldCounter;
                if (rh == null || rh.tid != getThreadId(current))
                    cachedHoldCounter = rh = readHolds.get();// readHolds中取当先线程的ThreadLocal(没有就创建一个)
                else if (rh.count == 0)
                    readHolds.set(rh);
                rh.count++;
            }
            return 1;
        }
        return fullTryAcquireShared(current);// 公平锁排队非公平锁下一个是写锁/读锁重入次数上限/CAS失败  重新拿读锁
    }
    
    /**
     * ReentrantReadWriteLock.Sync.fullTryAcquireShared(Thread)
     */
    final int fullTryAcquireShared(Thread current) {
        HoldCounter rh = null;
        for (;;) { // 循环CAS拿锁
            int c = getState();
            if (exclusiveCount(c) != 0) {
                if (getExclusiveOwnerThread() != current) // 其他线程占用写锁
                    return -1;
            } else if (readerShouldBlock()) { // 处理读锁重入,将cachedHoldCounter设置为当前线程
                if (firstReader == current) {
                    // assert firstReaderHoldCount > 0;
                } else {
                    if (rh == null) {
                        rh = cachedHoldCounter;
                        if (rh == null || rh.tid != getThreadId(current)) {
                            rh = readHolds.get();
                            if (rh.count == 0)
                                readHolds.remove();
                        }
                    }
                    if (rh.count == 0) // 不是重入,返回-1
                        return -1;
                }
            }
            if (sharedCount(c) == MAX_COUNT)
                throw new Error("Maximum lock count exceeded");
            if (compareAndSetState(c, c + SHARED_UNIT)) { // 正常拿读锁
                if (sharedCount(c) == 0) {
                    firstReader = current;
                    firstReaderHoldCount = 1;
                } else if (firstReader == current) {
                    firstReaderHoldCount++;
                } else {
                    if (rh == null)
                        rh = cachedHoldCounter;
                    if (rh == null || rh.tid != getThreadId(current))
                        rh = readHolds.get();
                    else if (rh.count == 0)
                        readHolds.set(rh);
                    rh.count++;
                    cachedHoldCounter = rh; // cache for release
                }
                return 1;
            }
        }
    }

锁降级:

持有写锁的线程,去获取读锁的过程称为锁降级

读锁释放:

    // ReentrantReadWriteLock.ReadLock.unlock()
    public void unlock() {
        sync.releaseShared(1);
    }
    
    // AbstractQueuedSynchronizer.releaseShared(int)
    public final boolean releaseShared(int arg) {
        if (tryReleaseShared(arg)) {
            doReleaseShared();
            return true;
        }
        return false;
    }
    
    /**
     * ReentrantReadWriteLock.Sync.tryReleaseShared(int)
     * 维护readHolds state
     */
    protected final boolean tryReleaseShared(int unused) {
        Thread current = Thread.currentThread();
        if (firstReader == current) { // 第一个获取读锁的线程
            if (firstReaderHoldCount == 1)
                firstReader = null;
            else
                firstReaderHoldCount--;
        } else { // readHolds中次数-1
            HoldCounter rh = cachedHoldCounter;
            if (rh == null || rh.tid != getThreadId(current))
                rh = readHolds.get();
            int count = rh.count;
            if (count <= 1) {
                readHolds.remove();
                if (count <= 0)
                    throw unmatchedUnlockException();
            }
            --rh.count;
        }
        for (;;) { // state
            int c = getState();
            int nextc = c - SHARED_UNIT;
            if (compareAndSetState(c, nextc))
                // Releasing the read lock has no effect on readers,
                // but it may allow waiting writers to proceed if
                // both read and write locks are now free.
                return nextc == 0;
        }
    }

 

 

参考资料 / 相关推荐

Java 读写锁 ReentrantReadWriteLock 源码分析

【死磕Java并发】—–J.U.C之读写锁:ReentrantReadWriteLock

标签:

版权申明:本站文章部分自网络,如有侵权,请联系:west999com@outlook.com
特别注意:本站所有转载文章言论不代表本站观点,本站所提供的摄影照片,插画,设计作品,如需使用,请与原作者联系,版权归原作者所有

上一篇:StringBuffer用法

下一篇:javaDay06_2函数的重载(overload)