Java AQS
overview
先上官方doc。
Provides a framework for implementing blocking locks and related synchronizers (semaphores, events, etc) that rely on first-in-first-out (FIFO) wait queues. This class is designed to be a useful basis for most kinds of synchronizers that rely on a single atomic int value to represent state. Subclasses must define the protected methods that change this state, and which define what that state means in terms of this object being acquired or released. Given these, the other methods in this class carry out all queuing and blocking mechanics. Subclasses can maintain other state fields, but only the atomically updated int value manipulated using methods getState(), setState(int) and compareAndSetState(int, int) is tracked with respect to synchronization.
Subclasses should be defined as non-public internal helper classes that are used to implement the synchronization properties of their enclosing class. Class AbstractQueuedSynchronizer does not implement any synchronization interface. Instead it defines methods such as acquireInterruptibly(int) that can be invoked as appropriate by concrete locks and related synchronizers to implement their public methods.
- 重点,数据结构使用个先进先出的等待队列(与名字中的Q呼应),并且rely on single atomic int value to represent state。虽然有读写锁这个锁,维护了“两把锁”,其实这两把锁与线程池的数目与状态一样,在同一个字段里面,也就是这atomic int,低16位是写,高16位是读。
- 名字中的A,虽然是抽象的,但是类中并没有abstract的方法,大多数的方法是有默认的throw ex的实现的,比如:
protected boolean tryAcquire(int arg) {
throw new UnsupportedOperationException();
}
- 默认的抛出异常的protect实现,主要是由些场景下,并不是所有的这些方法都需要实现,可以只重写两个方法,而其他方法可以不重写,这种是protect的方式,也不会暴露出去。
- 这个抽象的同步器没有实现任何的同步接口,而是提供了一个类似的框架,包括一些同步非阻塞的设置操作setState()、compareAndSetState()。都说是框架了,当然不会只提供这些操作,还有一些关键的如线程的排队,等待与唤醒,主要是这Q的管理。还有,比如子类只要实现了tryAcquire()方法,(实现也是使用AQS提供的一些方法如hasQueuedPredecessors()、compareAndSetState()、操作state并且返回true或false即可,然后就自动的有了一些附加的方法如tryAcquireNanos,附带中断的acquire等方法。
This class supports either or both a default exclusive mode and a shared mode.
- 支持两种排他模式,共享式和独占式的
同步队列
- 同步队列是个FIFO的双向队列,当线程获取失败后,会把线程的等待信息构造成一个节点并加入同步队列并阻塞当前线程。由于可能有多个线程获取失败同时加入tail,所以加入tail并不是安全的,需要使用CAS放置。入队的时候是个死循环操作,一直到入队成功为止。
- 设置队列的头的操作是获取到锁的线程做的事情,这个时候对队列操作是安全的,直接操作即可。
- 当前一个线程释放的时候,会设置state,并且使用LockSupport.unpark(s.thread)方式唤醒下一个节点。这样看上去好像都是先进先出的,但是忽略了一点,还有些新的竞争的线程根本不需要unpark,本来就是runnable的,上来就cmp获取锁的。所以,非公平锁内的等待的线程,也是能够保证一定顺序的,只是外面新的线程可能插队。
- 这种非公平锁的实现,其实能很大提高吞吐量,尤其是获取锁竞争非常激烈的场景,因为在前面线程释放锁后,竞争越激烈,新的活动的线程是非常有可能获取到锁的。
- 对于doAcquireNanos这方法,底层就是用的LockSupport.parkNanos方法,但是在进入此方法前会判断下需要sleep的时间是否小于一个阈值(1000纳秒),如果小于此值,直接在循环一次,应为又系统的调度,直接在自旋一次的精度会更高。性能换精度
ReentrantLock
支持公平与非公平的方式,非公平锁获取锁的方式:
final boolean nonfairTryAcquire(int acquires) {
final Thread current = Thread.currentThread();
int c = getState();
if (c == 0) {
// 不管什么同步队列,上来就cmp来获取锁
if (compareAndSetState(0, acquires)) {
setExclusiveOwnerThread(current);
return true;
}
}
// 重入的方式
else if (current == getExclusiveOwnerThread()) {
int nextc = c + acquires;
if (nextc < 0) // overflow
throw new Error("Maximum lock count exceeded");
setState(nextc);
return true;
}
return false;
}
而公平锁获取锁获取锁
protected final boolean tryAcquire(int acquires) {
final Thread current = Thread.currentThread();
int c = getState();
if (c == 0) {
// 先判断是否为第一个节点,如果自己是第一个节点,在CMP
if (!hasQueuedPredecessors() &&
compareAndSetState(0, acquires)) {
setExclusiveOwnerThread(current);
return true;
}
}
else if (current == getExclusiveOwnerThread()) {
int nextc = c + acquires;
if (nextc < 0)
throw new Error("Maximum lock count exceeded");
setState(nextc);
return true;
}
return false;
}
ReentrantReadWriteLock
有锁的降级的,能把写锁降级为读锁,保证本线程内的后续对以前的写操作是可见的。以下为写锁的获取与释放:
protected final boolean tryAcquire(int acquires) {
/*
* Walkthrough:
* 1. If read count nonzero or write count nonzero
* and owner is a different thread, fail.
* 2. If count would saturate, fail. (This can only
* happen if count is already nonzero.)
* 3. Otherwise, this thread is eligible for lock if
* it is either a reentrant acquire or
* queue policy allows it. If so, update state
* and set owner.
*/
Thread current = Thread.currentThread();
int c = getState();
int w = exclusiveCount(c);
if (c != 0) {
// (Note: if c != 0 and w == 0 then shared count != 0)
if (w == 0 || current != getExclusiveOwnerThread())
return false;
if (w + exclusiveCount(acquires) > MAX_COUNT)
throw new Error("Maximum lock count exceeded");
// Reentrant acquire
setState(c + acquires);
return true;
}
if (writerShouldBlock() ||
!compareAndSetState(c, c + acquires))
return false;
setExclusiveOwnerThread(current);
return true;
}
// (Note: if c != 0 and w == 0 then shared count != 0),难得的注释,这里只要shared count不是零,就直接返回false,没有再去判断是否是自己在持有写锁,因为即使现在判断了,可能其他线程在判断后也持有了读锁,这样就破坏了语义。这样读写锁可能死锁的一个坑,一个线程先获取了读锁,然后读锁没有释放,在获取写锁,就死锁了,验证代码:
public static void main(String[] args) {
ReentrantReadWriteLock reentrantReadWriteLock = new ReentrantReadWriteLock();
reentrantReadWriteLock.readLock().lock();
reentrantReadWriteLock.writeLock().lock();
System.out.println("end.");
}
下获取写锁,在获取的锁,是ok的
public static void main(String[] args) {
ReentrantReadWriteLock reentrantReadWriteLock = new ReentrantReadWriteLock();
reentrantReadWriteLock.writeLock().lock();
reentrantReadWriteLock.readLock().lock();
System.out.println("end.");
}
输出end。
读锁的获取与释放:
protected final int tryAcquireShared(int unused) {
/*
* Walkthrough:
* 1. If write lock held by another thread, fail.
* 2. Otherwise, this thread is eligible for
* lock wrt state, so ask if it should block
* because of queue policy. If not, try
* to grant by CASing state and updating count.
* Note that step does not check for reentrant
* acquires, which is postponed to full version
* to avoid having to check hold count in
* the more typical non-reentrant case.
* 3. If step 2 fails either because thread
* apparently not eligible or CAS fails or count
* saturated, chain to version with full retry loop.
*/
Thread current = Thread.currentThread();
int c = getState();
if (exclusiveCount(c) != 0 &&
getExclusiveOwnerThread() != current)
return -1;
int r = sharedCount(c);
if (!readerShouldBlock() &&
r < MAX_COUNT &&
compareAndSetState(c, c + SHARED_UNIT)) {
if (r == 0) {
firstReader = current;
firstReaderHoldCount = 1;
} else if (firstReader == current) {
firstReaderHoldCount++;
} else {
HoldCounter rh = cachedHoldCounter;
if (rh == null || rh.tid != getThreadId(current))
cachedHoldCounter = rh = readHolds.get();
else if (rh.count == 0)
readHolds.set(rh);
rh.count++;
}
return 1;
}
return fullTryAcquireShared(current);
}
主要是经过
if (exclusiveCount(c) != 0 &&
getExclusiveOwnerThread() != current)
return -1;
这个判断后,如果有写锁,那写锁必须是自己。即使过了这个判断,后续有其他写锁持有了,这个时候也不会有问题,因为之前的状态是没有写锁,在CMP的时候写锁的次数为0.
Condition
- 一个lock可以new多个condition,而一个synchronize只有一个条件队列
- 为什么是个单向的列表?
- 条件队列都是公平的,因为await的时候线程是持有锁的,先持有锁在await,await的并发不会像锁那样高。而且await的时候肯定是要上线文切换的,不存在优化空间。
- await后,相当于从同步队列的头部(也可能是新的竞争到锁的线程)进入到条件队列的尾部,不需要CAS,因为await表明有锁的
- 当一个线程发送single后,会唤醒队列中的第一个node的线程,然后竞争的加入到同步队列中等待获取到锁
什么同步队列是双向的,而条件队列是单向么?
主要是同步队列有些操作需要向前遍历,如判断是否需要park,有的节点状态是CANCELLED的,在tryRelease失败后,这线程基本就是CANCELLED的状态,因为没有释放成功,不能在此获取到锁。也就是说,一些线程在获取到锁之后,释放锁的时候失败了,但是还能运行(变成cancelled状态,不能获取锁),可以再次tryRelease的,这也是try的意义所在。
下面代码就是在
private static boolean shouldParkAfterFailedAcquire(Node pred, Node node) {
int ws = pred.waitStatus;
if (ws == Node.SIGNAL)
/*
* This node has already set status asking a release
* to signal it, so it can safely park.
*/
return true;
if (ws > 0) {
/*
* Predecessor was cancelled. Skip over predecessors and
* indicate retry.
*/
do {
node.prev = pred = pred.prev;
} while (pred.waitStatus > 0);
pred.next = node;
} else {
/*
* waitStatus must be 0 or PROPAGATE. Indicate that we
* need a signal, but don't park yet. Caller will need to
* retry to make sure it cannot acquire before parking.
*/
compareAndSetWaitStatus(pred, ws, Node.SIGNAL);
}
return false;
}
Semaphore
信号量,可以理解为一种可重入的共享读锁,只不过是shared的数目是有上限的,不能无限地accquire到。而读写锁的读锁是没有一个次数的,也分为公平与非公平的实现,在tryAcquireShared的逻辑中多了个remaining判断。
protected int tryAcquireShared(int acquires) {
for (;;) {
if (hasQueuedPredecessors())
return -1;
int available = getState();
int remaining = available - acquires;
if (remaining < 0 ||
compareAndSetState(available, remaining))
return remaining;
}
}
countDownLanch
AQS实现,实现的同步器的核心的逻辑如下:
protected int tryAcquireShared(int acquires) {
return (getState() == 0) ? 1 : -1;
}
protected boolean tryReleaseShared(int releases) {
// Decrement count; signal when transition to zero
for (;;) {
int c = getState();
if (c == 0)
return false;
int nextc = c-1;
if (compareAndSetState(c, nextc))
return nextc == 0;
}
}
共享的模型,countDown的时候去releaseShared,await的时候去获取shared,在获取的时候比较特殊,如上,只有getState() == 0的时候才会获取成功。
public void countDown() {
sync.releaseShared(1);
}
public void await() throws InterruptedException {
sync.acquireSharedInterruptibly(1);
}