Lock(三)利用 Lock 实现 Condition

Condition 简介

Condition 主要有两类方法:

  • await,释放锁并阻塞线程直到 signal 被调用,恢复后会重新获得锁
  • signal,唤醒阻塞在这个 Condition 上的一个或全部线程

利用条件变量前需要先获得锁

所以 Condition 的所有方法都需要加锁

lock.lock();
// ...
condition.await();
// ...
lock.unlock();

Condition 的 await/signal 对标 Object 的 wait/notify,wait/notify 使用对象监视器实现的,而 await/signal 使用 Lock 实现的(Lock 对标 synchronized 关键字)

ConditionObject 自己持有一个双向链表的排队队列 condition queue,所有阻塞在此条件变量上的线程都在此排队

被唤醒的线程会被转移到 AQS 队列尾部(又叫 sync queue)

public class ConditionObject implements Condition, java.io.Serializable {
    private transient Node firstWaiter; /** First node of condition queue. */
    private transient Node lastWaiter;  /** Last node of condition queue. */
}

阻塞

整个 await 大体就是以是否在 sync queue 为标识的循环,当节点转移到 sync queue 时表示线程被唤醒,跳出阻塞循环

void ConditionObject.await() throws InterruptedException {
    // ... 在 condition queue 队尾添加一个 Node.CONDITION 类型的新节点(所有阻塞在此条件变量上的线程都在此排队)
    Node node = addConditionWaiter();
    // 进入阻塞状态前需要释放锁,同时唤醒下一个等待锁的线程
    int savedState = fullyRelease(node);
    int interruptMode = 0;
    // 进入阻塞
    // 被唤醒的线程会从 condition queue 转移到 AQS 排队队列(又叫同步队列,sync queue)
    // isOnSyncQueue 返回 true 表示此节点已被转移到 sync queue,跳出阻塞循环
    while (!isOnSyncQueue(node)) {
        LockSupport.park(this);
        // await() 是可被中断的,awaitUninterruptibly() 不会被中断
        // 线程恢复后,如果发生了中断,要跳出阻塞状态
        if ((interruptMode = checkInterruptWhileWaiting(node)) != 0)
            break;
    }
    // 线程恢复后需要重新获得锁
    if (acquireQueued(node, savedState) && interruptMode != THROW_IE)
        interruptMode = REINTERRUPT;
    if (node.nextWaiter != null) // clean up if cancelled
        unlinkCancelledWaiters();
    if (interruptMode != 0)
        reportInterruptAfterWait(interruptMode);
}

private Node ConditionObject.addConditionWaiter() {
    Node t = lastWaiter;
    // If lastWaiter is cancelled, clean out.
    if (t != null && t.waitStatus != Node.CONDITION) {
        unlinkCancelledWaiters();
        t = lastWaiter;
    }
    Node node = new Node(Node.CONDITION)
    if (t == null)
        firstWaiter = node;
    else
        t.nextWaiter = node;
    lastWaiter = node;
    return node;
}

int AbstractQueuedSynchronizer.fullyRelease(Node node) {
    try {
        int savedState = getState();
        if (release(savedState))
            return savedState;
        throw new IllegalMonitorStateException();
    } catch (Throwable t) {
        node.waitStatus = Node.CANCELLED;
        throw t;
    }
}
boolean AbstractQueuedSynchronizer.release(int arg) {
    if (tryRelease(arg)) {
        Node h = head;
        if (h != null && h.waitStatus != 0)
            unparkSuccessor(h);
        return true;
    }
    return false;
}
boolean ReentrantLock.Sync.tryRelease(int releases) {
    int c = getState() - releases;
    if (Thread.currentThread() != getExclusiveOwnerThread())
        throw new IllegalMonitorStateException();
    boolean free = false;
    if (c == 0) {
        free = true;
        setExclusiveOwnerThread(null);
    }
    setState(c);
    return free;
}

唤醒一个阻塞的线程(从队头节点开始)

将在 condition queue 上节点转移到 sync queue 上

// 把 condition queue 第一个节点(等待最久的线程)从队列里移除,添加到 sync queue 并唤醒其线程
void ConditionObject.signal() {
    if (!isHeldExclusively())
        throw new IllegalMonitorStateException();
    Node first = firstWaiter;
    if (first != null)
        doSignal(first);
}

// 从 condition queue 里移除
void ConditionObject.doSignal(Node first) {
    do {
        if ( (firstWaiter = first.nextWaiter) == null)
            lastWaiter = null;
        first.nextWaiter = null;
    } while (!transferForSignal(first) &&
             (first = firstWaiter) != null);
}

// 插入至 sync queue 队尾并唤醒其线程
boolean ConditionObject.transferForSignal(Node node) {
    if (!node.compareAndSetWaitStatus(Node.CONDITION, 0))
        return false;
    Node p = enq(node);
    int ws = p.waitStatus;
    if (ws > 0 || !p.compareAndSetWaitStatus(ws, Node.SIGNAL))
        LockSupport.unpark(node.thread);
    return true;
}
Node AbstractQueuedSynchronizer.enq(Node node) {
    for (;;) {
        Node oldTail = tail;
        if (oldTail != null) {
            U.putObject(node, Node.PREV, oldTail);
            if (compareAndSetTail(oldTail, node)) {
                oldTail.next = node;
                return oldTail;
            }
        } else {
            initializeSyncQueue();
        }
    }
}

唤醒全部线程

// 跟唤醒第一个线程时一样的:
// 将所有在 Condition 上排队的线程逐个从 Condition 排队队列里移除,添加到 AQS 队尾,并唤醒其线程
void ConditionObject.signalAll() {
    if (!isHeldExclusively())
        throw new IllegalMonitorStateException();
    Node first = firstWaiter;
    if (first != null)
        doSignalAll(first);
}

void ConditionObject.doSignalAll(Node first) {
    lastWaiter = firstWaiter = null;
    do {
        Node next = first.nextWaiter;
        first.nextWaiter = null;
        transferForSignal(first);
        first = next;
    } while (first != null);
}