Java AQS

overview

先上官方doc

Provides a framework for implementing blocking locks and related synchronizers (semaphores, events, etc) that rely on first-in-first-out (FIFO) wait queues. This class is designed to be a useful basis for most kinds of synchronizers that rely on a single atomic int value to represent state. Subclasses must define the protected methods that change this state, and which define what that state means in terms of this object being acquired or released. Given these, the other methods in this class carry out all queuing and blocking mechanics. Subclasses can maintain other state fields, but only the atomically updated int value manipulated using methods getState(), setState(int) and compareAndSetState(int, int) is tracked with respect to synchronization.

Subclasses should be defined as non-public internal helper classes that are used to implement the synchronization properties of their enclosing class. Class AbstractQueuedSynchronizer does not implement any synchronization interface. Instead it defines methods such as acquireInterruptibly(int) that can be invoked as appropriate by concrete locks and related synchronizers to implement their public methods.

  • 重点,数据结构使用个先进先出的等待队列(与名字中的Q呼应),并且rely on single atomic int value to represent state。虽然有读写锁这个锁,维护了“两把锁”,其实这两把锁与线程池的数目与状态一样,在同一个字段里面,也就是这atomic int,低16位是写,高16位是读。
  • 名字中的A,虽然是抽象的,但是类中并没有abstract的方法,大多数的方法是有默认的throw ex的实现的,比如:
    protected boolean tryAcquire(int arg) {
        throw new UnsupportedOperationException();
    }

This class supports either or both a default exclusive mode and a shared mode.

  • 支持两种排他模式,共享式和独占式的

同步队列

ReentrantLock

支持公平与非公平的方式,非公平锁获取锁的方式:

        final boolean nonfairTryAcquire(int acquires) {
            final Thread current = Thread.currentThread();
            int c = getState();
            if (c == 0) {
				// 不管什么同步队列,上来就cmp来获取锁
                if (compareAndSetState(0, acquires)) {
                    setExclusiveOwnerThread(current);
                    return true;
                }
            }
			// 重入的方式
            else if (current == getExclusiveOwnerThread()) {
                int nextc = c + acquires;
                if (nextc < 0) // overflow
                    throw new Error("Maximum lock count exceeded");
                setState(nextc);
                return true;
            }
            return false;
        }

而公平锁获取锁获取锁

        protected final boolean tryAcquire(int acquires) {
            final Thread current = Thread.currentThread();
            int c = getState();
            if (c == 0) {
				// 先判断是否为第一个节点,如果自己是第一个节点,在CMP
                if (!hasQueuedPredecessors() &&
                    compareAndSetState(0, acquires)) {
                    setExclusiveOwnerThread(current);
                    return true;
                }
            }
            else if (current == getExclusiveOwnerThread()) {
                int nextc = c + acquires;
                if (nextc < 0)
                    throw new Error("Maximum lock count exceeded");
                setState(nextc);
                return true;
            }
            return false;
        }

ReentrantReadWriteLock

有锁的降级的,能把写锁降级为读锁,保证本线程内的后续对以前的写操作是可见的。以下为写锁的获取与释放:

        protected final boolean tryAcquire(int acquires) {
            /*
             * Walkthrough:
             * 1. If read count nonzero or write count nonzero
             *    and owner is a different thread, fail.
             * 2. If count would saturate, fail. (This can only
             *    happen if count is already nonzero.)
             * 3. Otherwise, this thread is eligible for lock if
             *    it is either a reentrant acquire or
             *    queue policy allows it. If so, update state
             *    and set owner.
             */
            Thread current = Thread.currentThread();
            int c = getState();
            int w = exclusiveCount(c);
            if (c != 0) {
                // (Note: if c != 0 and w == 0 then shared count != 0)
                if (w == 0 || current != getExclusiveOwnerThread())
                    return false;
                if (w + exclusiveCount(acquires) > MAX_COUNT)
                    throw new Error("Maximum lock count exceeded");
                // Reentrant acquire
                setState(c + acquires);
                return true;
            }
            if (writerShouldBlock() ||
                !compareAndSetState(c, c + acquires))
                return false;
            setExclusiveOwnerThread(current);
            return true;
        }

// (Note: if c != 0 and w == 0 then shared count != 0),难得的注释,这里只要shared count不是零,就直接返回false,没有再去判断是否是自己在持有写锁,因为即使现在判断了,可能其他线程在判断后也持有了读锁,这样就破坏了语义。这样读写锁可能死锁的一个坑,一个线程先获取了读锁,然后读锁没有释放,在获取写锁,就死锁了,验证代码:

    public static void main(String[] args) {
        ReentrantReadWriteLock reentrantReadWriteLock = new ReentrantReadWriteLock();
        reentrantReadWriteLock.readLock().lock();
        reentrantReadWriteLock.writeLock().lock();
        System.out.println("end.");
    }

下获取写锁,在获取的锁,是ok的

    public static void main(String[] args) {
        ReentrantReadWriteLock reentrantReadWriteLock = new ReentrantReadWriteLock();
        reentrantReadWriteLock.writeLock().lock();
        reentrantReadWriteLock.readLock().lock();
        System.out.println("end.");
    }

输出end。

读锁的获取与释放:

        protected final int tryAcquireShared(int unused) {
            /*
             * Walkthrough:
             * 1. If write lock held by another thread, fail.
             * 2. Otherwise, this thread is eligible for
             *    lock wrt state, so ask if it should block
             *    because of queue policy. If not, try
             *    to grant by CASing state and updating count.
             *    Note that step does not check for reentrant
             *    acquires, which is postponed to full version
             *    to avoid having to check hold count in
             *    the more typical non-reentrant case.
             * 3. If step 2 fails either because thread
             *    apparently not eligible or CAS fails or count
             *    saturated, chain to version with full retry loop.
             */
            Thread current = Thread.currentThread();
            int c = getState();
            if (exclusiveCount(c) != 0 &&
                getExclusiveOwnerThread() != current)
                return -1;
            int r = sharedCount(c);
            if (!readerShouldBlock() &&
                r < MAX_COUNT &&
                compareAndSetState(c, c + SHARED_UNIT)) {
                if (r == 0) {
                    firstReader = current;
                    firstReaderHoldCount = 1;
                } else if (firstReader == current) {
                    firstReaderHoldCount++;
                } else {
                    HoldCounter rh = cachedHoldCounter;
                    if (rh == null || rh.tid != getThreadId(current))
                        cachedHoldCounter = rh = readHolds.get();
                    else if (rh.count == 0)
                        readHolds.set(rh);
                    rh.count++;
                }
                return 1;
            }
            return fullTryAcquireShared(current);
        }

主要是经过

            if (exclusiveCount(c) != 0 &&
                getExclusiveOwnerThread() != current)
                return -1;

这个判断后,如果有写锁,那写锁必须是自己。即使过了这个判断,后续有其他写锁持有了,这个时候也不会有问题,因为之前的状态是没有写锁,在CMP的时候写锁的次数为0.

Condition

什么同步队列是双向的,而条件队列是单向么?

主要是同步队列有些操作需要向前遍历,如判断是否需要park,有的节点状态是CANCELLED的,在tryRelease失败后,这线程基本就是CANCELLED的状态,因为没有释放成功,不能在此获取到锁。也就是说,一些线程在获取到锁之后,释放锁的时候失败了,但是还能运行(变成cancelled状态,不能获取锁),可以再次tryRelease的,这也是try的意义所在。

下面代码就是在

    private static boolean shouldParkAfterFailedAcquire(Node pred, Node node) {
        int ws = pred.waitStatus;
        if (ws == Node.SIGNAL)
            /*
             * This node has already set status asking a release
             * to signal it, so it can safely park.
             */
            return true;
        if (ws > 0) {
            /*
             * Predecessor was cancelled. Skip over predecessors and
             * indicate retry.
             */
            do {
                node.prev = pred = pred.prev;
            } while (pred.waitStatus > 0);
            pred.next = node;
        } else {
            /*
             * waitStatus must be 0 or PROPAGATE.  Indicate that we
             * need a signal, but don't park yet.  Caller will need to
             * retry to make sure it cannot acquire before parking.
             */
            compareAndSetWaitStatus(pred, ws, Node.SIGNAL);
        }
        return false;
    }

Semaphore

信号量,可以理解为一种可重入的共享读锁,只不过是shared的数目是有上限的,不能无限地accquire到。而读写锁的读锁是没有一个次数的,也分为公平与非公平的实现,在tryAcquireShared的逻辑中多了个remaining判断。

        protected int tryAcquireShared(int acquires) {
            for (;;) {
                if (hasQueuedPredecessors())
                    return -1;
                int available = getState();
                int remaining = available - acquires;
                if (remaining < 0 ||
                    compareAndSetState(available, remaining))
                    return remaining;
            }
        }

countDownLanch

AQS实现,实现的同步器的核心的逻辑如下:

        protected int tryAcquireShared(int acquires) {
            return (getState() == 0) ? 1 : -1;
        }

        protected boolean tryReleaseShared(int releases) {
            // Decrement count; signal when transition to zero
            for (;;) {
                int c = getState();
                if (c == 0)
                    return false;
                int nextc = c-1;
                if (compareAndSetState(c, nextc))
                    return nextc == 0;
            }
        }

共享的模型,countDown的时候去releaseShared,await的时候去获取shared,在获取的时候比较特殊,如上,只有getState() == 0的时候才会获取成功。

    public void countDown() {
        sync.releaseShared(1);
    }

    public void await() throws InterruptedException {
        sync.acquireSharedInterruptibly(1);
    }

参考

从ReentrantLock的实现看AQS的原理及应用

Table of Contents