AQS

2021/7/26 posted in  Java并发编程的艺术 Java

Java 并发编程核心在于 juc 包,而其中大多数同步器实现都是围绕着共同的基础行为,比如等待队列、条件队列、独占获取、共享获取等,而这个行为的抽象就是 AQS。

AQS 定义了一套多线程访问共享资源的同步器框架,是一个依赖状态(volatile int state)的同步器。

Provides a framework for implementing blocking locks and related synchronizers (semaphores, events, etc) that rely on first-in-first-out (FIFO) wait queues. This class is designed to be a useful basis for most kinds of synchronizers that rely on a single atomic int value to represent state.

AQS 特性:

  • 阻塞等待队列
  • 共享、独占
  • 公平、非公平
  • 可重入
  • 允许中断

一般都是通过定义内部类 Sync 继承 AQS
将同步器所有调用都映射到 Sync 对应的方法

AQS 内部维护属性 volatile int state 表示资源的可用状态

state 三种访问方式
getState、setState、compareAndSetState

AQS 定义两种资源共享方式

  • Exclusive,独占,只有一个线程能执行,如 ReentrantLock
  • Share,共享,多个线程可以同时执行,入 Semaphore、CountDownLatch

AQS 定义两种队列

  • 同步等待队列
  • 条件等待队列

AQS,重点
自旋、加锁(CAS)、LockSupport、队列

下面通过 ReentrantLock 看下 AQS 的实现。

ReentrantLock

ReentrantLock 内部 定义一个内部类 Sync,Sync 继承自 AQS,Sync 还有两个子类,FairSync、NonfairSync 区别公平锁与非公平锁。
new ReentrantLock(true) 为公平锁
构造函数:

// 默认非公平锁
public ReentrantLock() {
    sync = new NonfairSync();
}

public ReentrantLock(boolean fair) {
    sync = fair ? new FairSync() : new NonfairSync();
}

公平锁,按照线程过来的顺序依次去获取锁,FIFO
非公平锁,可以插队

加锁过程

假设现在 ReentrantLock 是 FairSync 公平锁

static final class FairSync extends Sync {
    private static final long serialVersionUID = -3000897897090466540L;
    // new ReentrantLock().lock() 会调用到这里
    final void lock() {
        acquire(1);
    }
    

// acquire 在 AQS 的代码
// public final void acquire(int arg) {
//	if (!tryAcquire(arg) && acquireQueued(addWaiter(Node.EXCLUSIVE), arg))
//    	selfInterrupt();
//}

    
    protected final boolean tryAcquire(int acquires) {
        final Thread current = Thread.currentThread();
        // 先获取 state 的值
        int c = getState();
        // 这一步是 第一次加锁
        // 如果 state 的值为 0,并且队列中没有等待的线程,并且 CAS 设置 state 值为1成功,执行 setExclusiveOwnerThread ,设置当前线程为 current,表示当前线程获得了锁
        if (c == 0) {
            if (!hasQueuedPredecessors() &&
                compareAndSetState(0, acquires)) {
                setExclusiveOwnerThread(current);
                return true;
            }
        }
        // 这一步是为了重入锁
        // 如果 state 不是0,并且 AQS 中的当前执行线程就是当前线程,
        // state + 1
        else if (current == getExclusiveOwnerThread()) {
            int nextc = c + acquires;
            if (nextc < 0)
                throw new Error("Maximum lock count exceeded");
            setState(nextc);
            return true;
        }
        // false 就是没有获得到锁,没有获取锁,会在执行 acquireQueued(addWaiter(Node.EXCLUSIVE), arg) 这个逻辑
        return false;
    }
}

当调用 ReentrantLock 的 lock 方法时会调用到 FairSync 类的 lock 方法。

acquire(1) 这个方法主要是加锁,然后设置 state 为 1,会先调用本类实现的 tryAcquire 方法 尝试去加锁,这个方法里面分为了第一次为当前线程加锁,和当前线程的重入锁两种情况,

如果是 state 为 0 ,hasQueuedPredecessors 方法,判断队列是否为空,或者当前线程是否是队列里第一个线程,
如果队列为空,然后 CAS 比较并交换 state 的值为 1,这两步都成功了,则当前线程获取了锁,设置 AQS 内部的当前线程变量为 current。

如果 state 不是 0,判断 AQS 中的内部线程变量与 current 是否相等,是不是同一个线程来获取锁,是同一个线程,state + 1。可重入锁

如果当前线程没有获取锁,会执行 acquireQueued(addWaiter(Node.EXCLUSIVE), arg) 操作,将线程先添加队列,然后阻塞当前线程

Node.EXCLUSIVE 表示独占锁,addWaiter 这一步就是往队列里添加当前线程,添加到队列的尾部,Node 里有对当前线程的引用

private Node addWaiter(Node mode) {
	// 新建 Node,绑定当前线程 与 独占模式
    Node node = new Node(Thread.currentThread(), mode);
    Node pred = tail;
    // tail 为null,表示队列为空 ??tail 初始值是不是 null
    // 不为 null,表示队列不为空,将 node 添加到队尾
    if (pred != null) {
        node.prev = pred;
        if (compareAndSetTail(pred, node)) {
            pred.next = node;
            return node;
        }
    }
    // 自旋,保证当前node 添加成功
    enq(node);
    return node;
}

private Node enq(final Node node) {
	// 自旋 与 CAS 保证入队的线程 是一个一个入队的,原子性、同步
	// 并且双向队列指针指向正确,此处也有并发情况,如果不是同步进行的有可能队列乱掉
    for (;;) {
        Node t = tail;
        // 队列为空,表明是第一个线程进来,创建一个头结点,CAS 确保并发情况下只有一个线程能创建成功
        // 设置头结点,尾结点等于头结点,相当于 初始化 队列,
        // 但是没有对当前 node 操作,也就是说当前这个 node 还没有入队
        if (t == null) { // Must initialize
            if (compareAndSetHead(new Node()))
                tail = head;
        } else {
        //队列不为空,添加到队尾
            node.prev = t;
            if (compareAndSetTail(t, node)) {
                t.next = node;
                return t;
            }
        }
    }
}

addWaiter 方法中,第一个线程获取lock 的时候,tail应该是 null ,然后 enq 执行的时候,如果 tail 是 null,只是进行了初始化,并没有将 node 设置到队尾,在下一次循环的时候添加到队尾,这样就 head 就是头结点,然后第一个线程对应的 node 其实是第二个节点???

arg 就是 state,acquireQueued 这一步是为了阻塞队列中的线程,

final boolean acquireQueued(final Node node, int arg) {
    boolean failed = true;
    try {
        boolean interrupted = false;
        for (;;) {
            // 当前 node 的前一个 node
            final Node p = node.predecessor();
            // 如果 p 是头结点,说明队列中只有一个node ,
            // 则再去重试获取锁,如果获取成功了,当前线程就不用阻塞,直接执行
            // 下一步 selfInterrupt 就不用执行,一定程度上提高了并发
            if (p == head && tryAcquire(arg)) {
                setHead(node);
                p.next = null; // help GC
                failed = false;
                return interrupted;
            }
            // 判断 前置节点 p 的 waitStatus
            if (shouldParkAfterFailedAcquire(p, node) &&
                parkAndCheckInterrupt())
                
                interrupted = true;
        }
    } finally {
        if (failed)
            cancelAcquire(node);
    }
}


private static boolean shouldParkAfterFailedAcquire(Node pred, Node node) {
// 前置节点的 ws,等于这个 Node.SIGNAL 的时候,才可以阻塞当前调用线程
// 当前节点能否被 park 取决于其前置节点的 waitStatus
    int ws = pred.waitStatus;
    if (ws == Node.SIGNAL)
        return true;
    if (ws > 0) {
    // 大于0 的就只有 CANCELLED,删除大于 0 的node
        do {
            node.prev = pred = pred.prev;
        } while (pred.waitStatus > 0);
        pred.next = node;
    } else {
        // 设置前置节点的状态为 Node.SIGNAL
        compareAndSetWaitStatus(pred, ws, Node.SIGNAL);
    }
    return false;
}


private final boolean parkAndCheckInterrupt() {
    // 阻塞当前调用线程
    LockSupport.park(this);
    // 返回线程中断状态,清除中断状态
    return Thread.interrupted();
}

// 中断当前调用线程,上面的 parkAndCheckInterrupt 方法清理掉了线程的中断信号状态
static void selfInterrupt() {
    // 中断线程
    Thread.currentThread().interrupt();
}

Node 的 waitStatus
CANCELLED = 1,表示已经失效的,需要删除
SIGNAL = -1,表示可以被阻塞
CONDITION = -2
PROPAGATE = -3

shouldParkAfterFailedAcquire
当前节点对应的线程是否要 park,取决于其前置节点的 waitStatus 的值,当是 -1 的时候返回 true,表示当前节点对应的线程可以被阻塞,
如果是初始化状态,waitStatus = 0,则设置其前置节点的 waitStatus 为 -1。

ReentrantLock unlock 解锁过程

unlock 会调用 release 方法。唤醒队列头部线程
线程中断唤醒


public final boolean release(int arg) {
    // 尝试释放锁,修改 state 值为 原值减去 arg 的值
    if (tryRelease(arg)) {
        Node h = head;
        // 头结点不为 null,并且头结点的 waitStatus 为 -1 时,才去 unpark 当前线程,然后把头结点的 waitStatus 改为 0
        if (h != null && h.waitStatus != 0)
            unparkSuccessor(h);
        return true;
    }
    return false;
}


protected final boolean tryRelease(int releases) {
    int c = getState() - releases;
    if (Thread.currentThread() != getExclusiveOwnerThread())
        throw new IllegalMonitorStateException();
    boolean free = false;
    if (c == 0) {
        free = true;
        setExclusiveOwnerThread(null);
    }
    setState(c);
    return free;
}

// node 为头结点,头结点的 waitStatus = -1 时,执行该方法
// 将 头结点的 waitStatus 通过 CAS 方式 修改为 0
// unpark 头结点后面第一个 waitStatus <= 0  的节点对应的线程
private void unparkSuccessor(Node node) {
    int ws = node.waitStatus;
    if (ws < 0)
        compareAndSetWaitStatus(node, ws, 0);
    Node s = node.next;
    if (s == null || s.waitStatus > 0) {
        s = null;
        for (Node t = tail; t != null && t != node; t = t.prev)
            if (t.waitStatus <= 0)
                s = t;
    }
    if (s != null)
        LockSupport.unpark(s.thread);
}

上面加锁的过程,不惜循环两次,把头结点的 waitStatus 由 初始值 0 改为了 -1,
解锁的过程又把头结点的 waitStatus 由 -1 改为 0

非公平锁

static final class NonfairSync extends Sync {

    private static final long serialVersionUID = 7316153563782823691L;

    /**
     * Performs lock.  Try immediate barge, backing up to normal
     * acquire on failure.
     */
    final void lock() {
        // 这里是插队,新来的线程是否能够立即获取到锁,
        // 如果获取到则执行新来的线程,获取不到则添加到队列尾部
        if (compareAndSetState(0, 1))
            setExclusiveOwnerThread(Thread.currentThread());
        else
            acquire(1);
    }

    protected final boolean tryAcquire(int acquires) {
        return nonfairTryAcquire(acquires);
    }
}


final boolean nonfairTryAcquire(int acquires) {
    final Thread current = Thread.currentThread();
    int c = getState();
    if (c == 0) {
        // 与公平锁的区别 少了一步判断 !hasQueuedPredecessors()
        if (compareAndSetState(0, acquires)) {
            setExclusiveOwnerThread(current);
            return true;
        }
    }
    else if (current == getExclusiveOwnerThread()) {
        int nextc = c + acquires;
        if (nextc < 0) // overflow
            throw new Error("Maximum lock count exceeded");
        setState(nextc);
        return true;
    }
    return false;
}

LockSupport

当线程调用 LockSupport.park 方法后,线程会阻塞在这,有两种方式唤醒,
一种是 外部唤醒,unpark
另一种是线程中断

Thread t0 = new Thread(new Runnable() {
	@Override
	public void run() {
		Thread thread = Thread.currentThread();
		String name = thread.getName();
		System.out.println( name + " 开始执行");
		for (;;){
			System.out.println("准备 park 当前线程 " + name);
			LockSupport.park();
			System.out.println("当前线程 " + name + " 已被唤醒");
			if (thread.isInterrupted()){
				System.out.println("线程已被中断");
				return;
			}
		}
	}
});

t0.start();
TimeUnit.SECONDS.sleep(2);
System.out.println("准备唤醒 " + t0.getName());
LockSupport.unpark(t0);

TimeUnit.SECONDS.sleep(2);
t0.interrupt();

--- 运行结果
Thread-0 开始执行
准备 park 当前线程 Thread-0
准备唤醒 Thread-0
当前线程 Thread-0 已被唤醒
准备 park 当前线程 Thread-0
当前线程 Thread-0 已被唤醒
线程已被中断

interrupt() 方法中断线程,但并不是立即停止线程,而是等待线程运行结束后中断