2025-06-12
内卷九股文
0

目录

AbstractQueuedSynchronizer
核心设计
核心方法解析
独占模式(如ReentrantLock)
共享模式(如Semaphore)
需重写的模板方法
内部机制详解
CLH队列(FIFO)
线程阻塞与唤醒
条件变量(ConditionObject)
加锁流程源码剖析
加锁流程概述
三种加锁源码分析
lock方法
tryLock方法
lockInterruptibly方法
释放锁流程源码剖析
释放锁流程概述
释放锁源码分析
AQS中常见的问题
AQS中为什么要有一个虚拟的head节点
AQS中为什么使用双向链表
ConditionObject
ConditionObject的介绍&应用
Condition的构建方式&核心属性
Condition的await方法分析(前置分析)
Condition的signal方法分析
Conditiond的await方法分析(后置分析)
Condition的awaitNanos&signalAll方法分析
总结

之前阅读过了 ReentrantLockReentrantReadWriteLock 的源码,我们都发现其继承了 AbstractQueuedSynchronizer 类,平常我们说的 AQS 就是 AbstractQueuedSynchronizer 的简称。其实在我们的juc包下,很多的工具都是依赖于它,今天我们就将了解下AQS的魅力。

AbstractQueuedSynchronizer

AQS 就是 AbstractQueuedSynchronizer 抽象类,AQS其实就是JUC包下的一个基类,JUC下的很多内容都是基于AQS实现了部分功能,比如 ReentrantLockReentrantReadWriteLockThreadPoolExecutorCountDownLatchSemaphore 等等都是基于AQS实现。

image.png

核心设计

AQS采用模板方法模式,开发者只需重写特定方法(如tryAcquiretryRelease),而队列管理、线程阻塞/唤醒等复杂逻辑由AQS完成。其核心包括:

  1. 同步状态(state)

首先AQS中提供了一个由volatile修饰,并且采用CAS方式修改的int类型的state变量表示资源状态(如锁的重入次数、信号量许可数)。。

  1. CLH队列

AQS中维护了一个双向链表结构的等待队列,存储阻塞线程,有head,有tail,并且每个节点都是Node对象

java
static final class Node { static final Node SHARED = new Node(); static final Node EXCLUSIVE = null; static final int CANCELLED = 1; static final int SIGNAL = -1; static final int CONDITION = -2; static final int PROPAGATE = -3; volatile int waitStatus; volatile Node prev; volatile Node next; volatile Thread thread; }
  1. 条件队列

每个条件变量对应一个单向链表(ConditionObject),用于实现等待/通知机制。

image.png

核心方法解析

AQS的核心方法分为两类:独占模式共享模式

独占模式(如ReentrantLock)

  • 获取锁

acquire(int arg) → 调用tryAcquire(需重写)尝试获取,失败则入队阻塞。

  • 释放锁

release(int arg) → 调用tryRelease(需重写)释放状态,唤醒后继线程。

共享模式(如Semaphore)

  • 获取许可

acquireShared(int arg) → 调用tryAcquireShared(需重写)获取,失败则入队。

  • 释放许可

releaseShared(int arg) → 调用tryReleaseShared(需重写)释放,唤醒等待线程。

需重写的模板方法

方法名作用
tryAcquire(int arg)尝试独占获取资源(返回boolean)
tryRelease(int arg)尝试独占释放资源(返回boolean)
tryAcquireShared(int arg)尝试共享获取资源(返回int:负数为失败,0为成功但无剩余,正数为成功且有剩余)
tryReleaseShared(int arg)尝试共享释放资源(返回boolean)
isHeldExclusively()当前线程是否独占资源

内部机制详解

CLH队列(FIFO)

  • 节点(Node):存储线程引用、等待状态(WAITING/CANCELLED)、前驱/后继指针。

  • 入队逻辑:竞争失败的线程被包装为Node,通过CAS加入队尾。

  • 出队逻辑:头节点(虚拟节点)释放资源后唤醒后继节点。

线程阻塞与唤醒

  • LockSupport.park()阻塞线程,LockSupport.unpark(thread)唤醒线程。

  • 中断处理:AQS在唤醒时检查中断标志并恢复中断状态。

条件变量(ConditionObject)

  • await():释放锁,创建Node加入条件队列,阻塞线程。

  • signal():将条件队列的头节点转移到CLH队列,等待重新获取锁。

加锁流程源码剖析

加锁流程概述

这个是非公平锁的流程

image.png

三种加锁源码分析

lock方法

执行lock方法后,公平锁和非公平锁的执行套路不一样

java
// 非公平锁 final void lock() { // 上来就先基于CAS的方式,尝试将state从0改为1 if (compareAndSetState(0, 1)) // 获取锁资源成功,会将当前线程设置到exclusiveOwnerThread属性,代表是当前线程持有着锁资源 setExclusiveOwnerThread(Thread.currentThread()); else // 执行acquire,尝试获取锁资源 acquire(1); } // 公平锁 final void lock() { // 执行acquire,尝试获取锁资源 acquire(1); }

acquire方法,是公平锁和非公平锁的逻辑一样

java
public final void acquire(int arg) { // tryAcquire:再次查看,当前线程是否可以尝试获取锁资源 if (!tryAcquire(arg) && // 没有拿到锁资源 // addWaiter(Node.EXCLUSIVE):将当前线程封装为Node节点,插入到AQS的双向链表的结尾 // acquireQueued:查看我是否是第一个排队的节点,如果是可以再次尝试获取锁资源,如果长时间拿不到,挂起线程 // 如果不是第一个排队的额节点,就尝试挂起线程即可 acquireQueued(addWaiter(Node.EXCLUSIVE), arg)) // 中断线程的操作 selfInterrupt(); }

tryAcquire方法竞争锁最资源的逻辑,分为公平锁和非公平锁

java
// 非公平锁实现 final boolean nonfairTryAcquire(int acquires) { // 获取当前线程 final Thread current = Thread.currentThread(); // 获取了state熟属性 int c = getState(); // 判断state当前是否为0,之前持有锁的线程释放了锁资源 if (c == 0) { // 再次抢一波锁资源 if (compareAndSetState(0, acquires)) { setExclusiveOwnerThread(current); // 拿锁成功返回true return true; } } // 不是0,有线程持有着锁资源,如果是,证明是锁重入操作 else if (current == getExclusiveOwnerThread()) { // 将state + 1 int nextc = c + acquires; if (nextc < 0) // 说明对重入次数+1后,超过了int正数的取值范围 // 01111111 11111111 11111111 11111111 // 10000000 00000000 00000000 00000000 // 说明重入的次数超过界限了。 throw new Error("Maximum lock count exceeded"); // 正常的将计算结果,复制给state setState(nextc); // 锁重入成功 return true; } // 返回false return false; } // 公平锁实现 protected final boolean tryAcquire(int acquires) { // 获取当前线程 final Thread current = Thread.currentThread(); // .... int c = getState(); if (c == 0) { // 查看AQS中是否有排队的Node // 没人排队抢一手 。有人排队,如果我是第一个,也抢一手 if (!hasQueuedPredecessors() && // 抢一手~ compareAndSetState(0, acquires)) { setExclusiveOwnerThread(current); return true; } } // 锁重入~~~ else if (current == getExclusiveOwnerThread()) { int nextc = c + acquires; if (nextc < 0) throw new Error("Maximum lock count exceeded"); setState(nextc); return true; } return false; } // 查看是否有线程在AQS的双向队列中排队 // 返回false,代表没人排队 public final boolean hasQueuedPredecessors() { // 头尾节点 Node t = tail; Node h = head; // s为头结点的next节点 Node s; // 如果头尾节点相等,证明没有线程排队,直接去抢占锁资源 return h != t && // s节点不为null,并且s节点的线程为当前线程(排在第一名的是不是我) (s == null || s.thread != Thread.currentThread()); }

addWaite方法,将没有拿到锁资源的线程扔到AQS队列中去排队

java
// 没有拿到锁资源,过来排队, mode:代表互斥锁 private Node addWaiter(Node mode) { // 将当前线程封装为Node, Node node = new Node(Thread.currentThread(), mode); // 拿到尾结点 Node pred = tail; // 如果尾结点不为null if (pred != null) { // 当前节点的prev指向尾结点 node.prev = pred; // 以CAS的方式,将当前线程设置为tail节点 if (compareAndSetTail(pred, node)) { // 将之前的尾结点的next指向当前节点 pred.next = node; return node; } } // 如果CAS失败,以死循环的方式,保证当前线程的Node一定可以放到AQS队列的末尾 enq(node); return node; } private Node enq(final Node node) { for (;;) { // 拿到尾结点 Node t = tail; // 如果尾结点为空,AQS中一个节点都没有,构建一个伪节点,作为head和tail if (t == null) { if (compareAndSetHead(new Node())) tail = head; } else { // 比较熟悉了,以CAS的方式,在AQS中有节点后,插入到AQS队列的末尾 node.prev = t; if (compareAndSetTail(t, node)) { t.next = node; return t; } } } }

acquireQueued方法,判断当前线程是否还能再次尝试获取锁资源,如果不能再次获取锁资源,或者又没获取到,尝试将当前线程挂起

java
// 当前没有拿到锁资源后,并且到AQS排队了之后触发的方法。 中断操作这里不用考虑 final boolean acquireQueued(final Node node, int arg) { // 不考虑中断 // failed:获取锁资源是否失败(这里简单掌握落地,真正触发的,还是tryLock和lockInterruptibly) boolean failed = true; try { boolean interrupted = false; // 死循环………… for (;;) { // 拿到当前节点的前继节点 final Node p = node.predecessor(); // 前继节点是否是head,如果是head,再次执行tryAcquire尝试获取锁资源。 if (p == head && tryAcquire(arg)) { // 获取锁资源成功 // 设置头结点为当前获取锁资源成功Node,并且取消thread信息 setHead(node); // help GC p.next = null; // 获取锁失败标识为false failed = false; return interrupted; } // 没拿到锁资源…… // shouldParkAfterFailedAcquire:基于上一个节点转改来判断当前节点是否能够挂起线程,如果可以返回true, // 如果不能,就返回false,继续下次循环 if (shouldParkAfterFailedAcquire(p, node) && // 这里基于Unsafe类的park方法,将当前线程挂起 parkAndCheckInterrupt()) interrupted = true; } } finally { if (failed) // 在lock方法中,基本不会执行。 cancelAcquire(node); } } // 获取锁资源成功后,先执行setHead private void setHead(Node node) { // 当前节点作为头结点 伪 head = node; // 头结点不需要线程信息 node.thread = null; node.prev = null; } // 当前Node没有拿到锁资源,或者没有资格竞争锁资源,看一下能否挂起当前线程 private static boolean shouldParkAfterFailedAcquire(Node pred, Node node) { // -1,SIGNAL状态:代表当前节点的后继节点,可以挂起线程,后续我会唤醒我的后继节点 // 1,CANCELLED状态:代表当前节点以及取消了 int ws = pred.waitStatus; if (ws == Node.SIGNAL) // 上一个节点为-1之后,当前节点才可以安心的挂起线程 return true; if (ws > 0) { // 如果当前节点的上一个节点是取消状态,我需要往前找到一个状态不为1的Node,作为他的next节点 // 找到状态不为1的节点后,设置一下next和prev do { node.prev = pred = pred.prev; } while (pred.waitStatus > 0); pred.next = node; } else { // 上一个节点的状态不是1或者-1,那就代表节点状态正常,将上一个节点的状态改为-1 compareAndSetWaitStatus(pred, ws, Node.SIGNAL); } return false; }
tryLock方法

tryLock方法,无论公平锁还有非公平锁。都会走非公平锁抢占锁资源的操作。 其本质就是拿到state的值, 如果是0,直接CAS浅尝一下,state 不是0,那就看下是不是锁重入操作,如果没抢到,或者不是锁重入操作,告辞,返回false

java
public boolean tryLock() { // 非公平锁的竞争锁操作 return sync.nonfairTryAcquire(1); } final boolean nonfairTryAcquire(int acquires) { final Thread current = Thread.currentThread(); int c = getState(); if (c == 0) { if (compareAndSetState(0, acquires)) { setExclusiveOwnerThread(current); return true; } } else if (current == getExclusiveOwnerThread()) { int nextc = c + acquires; if (nextc < 0) // overflow throw new Error("Maximum lock count exceeded"); setState(nextc); return true; } return false; }

以独占定时模式进行处理

java
// tryLock(time,unit)执行的方法 public final boolean tryAcquireNanos(int arg, long nanosTimeout)throws InterruptedException { // 线程的中断标记位,是不是从false,别改为了true,如果是,直接抛异常 if (Thread.interrupted()) throw new InterruptedException(); // tryAcquire分为公平和非公平锁两种执行方式,如果拿锁成功, 直接告辞, return tryAcquire(arg) || // 如果拿锁失败,在这要等待指定时间 doAcquireNanos(arg, nanosTimeout); } private boolean doAcquireNanos(int arg, long nanosTimeout) throws InterruptedException { // 如果等待时间是0秒,直接告辞,拿锁失败 if (nanosTimeout <= 0L) return false; // 设置结束时间。 final long deadline = System.nanoTime() + nanosTimeout; // 先扔到AQS队列 final Node node = addWaiter(Node.EXCLUSIVE); // 拿锁失败,默认true boolean failed = true; try { for (;;) { // 如果在AQS中,当前node是head的next,直接抢锁 final Node p = node.predecessor(); if (p == head && tryAcquire(arg)) { setHead(node); p.next = null; // help GC failed = false; return true; } // 结算剩余的可用时间 nanosTimeout = deadline - System.nanoTime(); // 判断是否是否用尽的位置 if (nanosTimeout <= 0L) return false; // shouldParkAfterFailedAcquire:根据上一个节点来确定现在是否可以挂起线程 if (shouldParkAfterFailedAcquire(p, node) && // 避免剩余时间太少,如果剩余时间少就不用挂起线程 nanosTimeout > spinForTimeoutThreshold) // 如果剩余时间足够,将线程挂起剩余时间 LockSupport.parkNanos(this, nanosTimeout); // 如果线程醒了,查看是中断唤醒的,还是时间到了唤醒的。 if (Thread.interrupted()) // 是中断唤醒的! throw new InterruptedException(); } } finally { if (failed) cancelAcquire(node); } }

取消节点分析:

image.png

java
// 取消在AQS中排队的Node private void cancelAcquire(Node node) { // 如果当前节点为null,直接忽略。 if (node == null) return; //1. 线程设置为null node.thread = null; //2. 往前跳过被取消的节点,找到一个有效节点 Node pred = node.prev; while (pred.waitStatus > 0) node.prev = pred = pred.prev; //3. 拿到了上一个节点之前的next Node predNext = pred.next; //4. 当前节点状态设置为1,代表节点取消 node.waitStatus = Node.CANCELLED; // 脱离AQS队列的操作 // 当前Node是尾结点,将tail从当前节点替换为上一个节点 if (node == tail && compareAndSetTail(node, pred)) { compareAndSetNext(pred, predNext, null); } else { // 到这,上面的操作CAS操作失败 int ws = pred.waitStatus; // 不是head的后继节点 if (pred != head && // 拿到上一个节点的状态,只要上一个节点的状态不是取消状态,就改为-1 (ws == Node.SIGNAL || (ws <= 0 && compareAndSetWaitStatus(pred, ws, Node.SIGNAL))) && pred.thread != null) { // 上面的判断都是为了避免后面节点无法被唤醒。 // 前继节点是有效节点,可以唤醒后面的节点 Node next = node.next; if (next != null && next.waitStatus <= 0) compareAndSetNext(pred, predNext, next); } else { // 当前节点是head的后继节点 unparkSuccessor(node); } node.next = node; // help GC } }
lockInterruptibly方法
java
// 这个是lockInterruptibly和tryLock(time,unit)唯一的区别 // lockInterruptibly,拿不到锁资源,就死等,等到锁资源释放后,被唤醒,或者是被中断唤醒 private void doAcquireInterruptibly(int arg) throws InterruptedException { final Node node = addWaiter(Node.EXCLUSIVE); boolean failed = true; try { for (;;) { final Node p = node.predecessor(); if (p == head && tryAcquire(arg)) { setHead(node); p.next = null; // help GC failed = false; return; } if (shouldParkAfterFailedAcquire(p, node) && parkAndCheckInterrupt()) // 中断唤醒抛异常! throw new InterruptedException(); } } finally { if (failed) cancelAcquire(node); } } private final boolean parkAndCheckInterrupt() { LockSupport.park(this); // 这个方法可以确认,当前挂起的线程,是被中断唤醒的,还是被正常唤醒的。 // 中断唤醒,返回true,如果是正常唤醒,返回false return Thread.interrupted(); }

释放锁流程源码剖析

释放锁流程概述

image.png

释放锁源码分析

java
public void unlock() { // 释放锁资源不分为公平锁和非公平锁,都是一个sync对象 sync.release(1); } // 释放锁的核心流程 public final boolean release(int arg) { // 核心释放锁资源的操作之一 if (tryRelease(arg)) { // 如果锁已经释放掉了,走这个逻辑 Node h = head; // h不为null,说明有排队的(录课时估计脑袋蒙圈圈。) // 如果h的状态不为0(为-1),说明后面有排队的Node,并且线程已经挂起了。 if (h != null && h.waitStatus != 0) // 唤醒排队的线程 unparkSuccessor(h); return true; } return false; } // ReentrantLock释放锁资源操作 protected final boolean tryRelease(int releases) { // 拿到state - 1(并没有赋值给state) int c = getState() - releases; // 判断当前持有锁的线程是否是当前线程,如果不是,直接抛出异常 if (Thread.currentThread() != getExclusiveOwnerThread()) throw new IllegalMonitorStateException(); // free,代表当前锁资源是否释放干净了。 boolean free = false; if (c == 0) { // 如果state - 1后的值为0,代表释放干净了。 free = true; // 将持有锁的线程置位null setExclusiveOwnerThread(null); } // 将c设置给state setState(c); // 锁资源释放干净返回true,否则返回false return free; } // 唤醒后面排队的Node private void unparkSuccessor(Node node) { // 拿到头节点状态 int ws = node.waitStatus; if (ws < 0) // 先基于CAS,将节点状态从-1,改为0 compareAndSetWaitStatus(node, ws, 0); // 拿到头节点的后续节点。 Node s = node.next; // 如果后续节点为null或者,后续节点的状态为1,代表节点取消了。 if (s == null || s.waitStatus > 0) { s = null; // 如果后续节点为null,或者后续节点状态为取消状态,从后往前找到一个有效节点环境 for (Node t = tail; t != null && t != node; t = t.prev) // 从后往前找到状态小于等于0的节点 // 找到离head最新的有效节点,并赋值给s if (t.waitStatus <= 0) s = t; } // 只要找到了这个需要被唤醒的节点,执行unpark唤醒 if (s != null) LockSupport.unpark(s.thread); }

AQS中常见的问题

AQS中为什么要有一个虚拟的head节点

AQS可以没有head,设计之初指定head只是为了更方便的操作。方便管理双向链表而已,一个哨兵节点的存在。

比如ReentrantLock中释放锁资源时,会考虑是否需要唤醒后继节点。如果头结点的状态不是-1。就不需要去唤醒后继节点。唤醒后继节点时,需要找到head.next节点,如果head.next为null,或者是取消了,此时需要遍历整个双向链表,从后往前遍历,找到离head最近的Node。规避了一些不必要的唤醒操作。

如果不用虚拟节点(哨兵节点),当前节点挂起,当前节点的状态设置为-1。可行。AQS本身就是使用了哨兵节点做双向链表的一些操作。

网上说了,虚拟的head,可以避免重复唤醒操作。虚拟的head并没有处理这个问题。

AQS中为什么使用双向链表

AQS的双向链表就为了更方便的操作Node节点。

在执行tryLock,lockInterruptibly方法时,如果在线程阻塞时,中断了线程,此时线程会执行cancelAcquire取消当前节点,不在AQS的双向链表中排队。如果是单向链表,此时会导致取消节点,无法直接将当前节点的prev节点的next指针,指向当前节点的next节点。

如何利用AQS实现一个自定义的CountDownLatch?

java
// 经典用法示例:使用AQS实现CountDownLatch public class MyLatch { private static class Sync extends AbstractQueuedSynchronizer { protected int tryAcquireShared(int acquires) { return (getState() == 0) ? 1 : -1; } protected boolean tryReleaseShared(int releases) { // CAS递减state至0 for (;;) { int c = getState(); if (c == 0) return false; int nextc = c-1; if (compareAndSetState(c, nextc)) return nextc == 0; } } } // 使用代码省略... }

ConditionObject

ConditionObject的介绍&应用

ConditionObject 是 AbstractQueuedSynchronizer 类中锁条件机制的子类实现。像synchronized提供了wait和notify的方法实现线程在持有锁时,可以实现挂起,已经唤醒的操作。

基于 AbstractQueuedSynchronizer 实现的ReentrantLock也拥有这个功能,它的实现就是 ConditionObject。想执行await或者是signal就必须先持有lock锁的资源。

先look一下Condition的代码示例:

java
// 源码 public class ReentrantLock implements Lock, java.io.Serializable { final ConditionObject newCondition() { return new ConditionObject(); } } // demo public static void main(String[] args) throws InterruptedException, IOException { ReentrantLock lock = new ReentrantLock(); Condition condition = lock.newCondition(); new Thread(() -> { lock.lock(); System.out.println("子线程获取锁资源并await挂起线程"); try { Thread.sleep(5000); } catch (InterruptedException e) { e.printStackTrace(); } try { condition.await(); } catch (InterruptedException e) { e.printStackTrace(); } System.out.println("子线程挂起后被唤醒!持有锁资源"); }).start(); Thread.sleep(100); // =================main====================== lock.lock(); System.out.println("主线程等待5s拿到锁资源,子线程执行了await方法"); condition.signal(); System.out.println("主线程唤醒了await挂起的子线程"); lock.unlock(); }

Condition的构建方式&核心属性

发现在通过lock锁对象执行newCondition方法时,本质就是直接new的AQS提供的ConditionObject对象

java
final ConditionObject newCondition() { return new ConditionObject(); }

其实lock锁中可以有多个Condition对象。

在对Condition1进行操作时,不会影响到Condition2的单向链表。

其次可以发现ConditionObject中,只有两个核心属性:

java
/** First node of condition queue. */ private transient Node firstWaiter; /** Last node of condition queue. */ private transient Node lastWaiter;

虽然Node对象有prev和next,但是在ConditionObject中是不会使用这两个属性的,只要在Condition队列中,这两个属性都是null。在ConditionObject中只会使用nextWaiter的属性实现单向链表的效果。

Condition的await方法分析(前置分析)

持有锁的线程在执行await方法后会做几个操作:

  • 判断线程是否中断,如果中断了,什么都不做。
  • 没有中断,就讲当前线程封装为Node添加到Condition的单向链表中
  • 一次性释放掉锁资源。
  • 如果当前线程没有在AQS队列,就正常执行LockSupport.park(this)挂起线程。
java
// await方法的前置分析,只分析到线程挂起 public final void await() throws InterruptedException { // 先判断线程的中断标记位是否是true if (Thread.interrupted()) // 如果是true,就没必要执行后续操作挂起了。 throw new InterruptedException(); // 在线程挂起之前,先将当前线程封装为Node,并且添加到Condition队列中 Node node = addConditionWaiter(); // fullyRelease在释放锁资源,一次性将锁资源全部释放,并且保留重入的次数 int savedState = fullyRelease(node); // 省略一行代码…… // 当前Node是否在AQS队列中? // 执行fullyRelease方法后,线程就释放锁资源了,如果线程刚刚释放锁资源,其他线程就立即执行了signal方法, // 此时当前线程就被放到了AQS的队列中,这样一来线程就不需要执行LockSupport.park(this);去挂起线程了 while (!isOnSyncQueue(node)) { // 如果没有在AQS队列中,正常在Condition单向链表里,正常挂起线程。 LockSupport.park(this); // 省略部分代码…… } // 省略部分代码…… } // 线程挂起先,添加到Condition单向链表的业务~~ private Node addConditionWaiter() { // 拿到尾节点。 Node t = lastWaiter; // 如果尾节点有值,并且尾节点的状态不正常,不是-2,尾节点可能要拜拜了~ if (t != null && t.waitStatus != Node.CONDITION) { // 如果尾节点已经取消了,需要干掉取消的尾节点~ unlinkCancelledWaiters(); // 重新获取lastWaiter t = lastWaiter; } // 构建当前线程的Node,并且状态设置为-2 Node node = new Node(Thread.currentThread(), Node.CONDITION); // 如果last节点为null。直接将当前节点设置为firstWaiter if (t == null) firstWaiter = node; else // 如果last节点不为null,说明有值,就排在lastWaiter的后面 t.nextWaiter = node; // 把当前节点设置为最后一个节点 lastWaiter = node; // 返回当前节点 return node; } // 干掉取消的尾节点。 private void unlinkCancelledWaiters() { // 拿到头节点 Node t = firstWaiter; // 声明一个节点,爱啥啥~~~ Node trail = null; // 如果t不为null,就正常执行~~ while (t != null) { // 拿到t的next节点 Node next = t.nextWaiter; // 如果t的状态不为-2,说明有问题 if (t.waitStatus != Node.CONDITION) { // t节点的next为null t.nextWaiter = null; // 如果trail为null,代表头结点状态就是1, if (trail == null) // 将头结点指向next节点 firstWaiter = next; else // 如果trail有值,说明不是头结点位置 trail.nextWaiter = next; // 如果next为null,说明单向链表遍历到最后了,直接结束 if (next == null) lastWaiter = trail; } // 如果t的状态是-2,一切正常 else { // 临时存储t trail = t; } // t指向之前的next t = next; } } // 一次性释放锁资源 final int fullyRelease(Node node) { // 标记位,释放锁资源默认失败! boolean failed = true; try { // 拿到现在state的值 int savedState = getState(); // 一次性释放干净全部锁资源 if (release(savedState)) { // 释放锁资源失败了么? 没有! failed = false; // 返回对应的锁资源信息 return savedState; } else { throw new IllegalMonitorStateException(); } } finally { if (failed) // 如果释放锁资源失败,将节点状态设置为取消 node.waitStatus = Node.CANCELLED; } }

Condition的signal方法分析

分为了几个部分:

  • 确保执行signal方法的是持有锁的线程
  • 脱离Condition的队列
  • 将Node状态从-2改为0
  • 将Node添加到AQS队列
  • 为了避免当前Node无法在AQS队列正常唤醒做了一些判断和操作
java
// 线程挂起后,可以基于signal唤醒~ public final void signal() { // 在ReentrantLock中,如果执行signal的线程没有持有锁资源,直接扔异常 if (!isHeldExclusively()) throw new IllegalMonitorStateException(); // 拿到排在Condition首位的Node Node first = firstWaiter; // 有Node在排队,才需要唤醒,如果没有,直接告辞~~ if (first != null) doSignal(first); } // 开始唤醒Condition中的Node中的线程 private void doSignal(Node first) { // 先一波do-while走你~~~ do { // 获取到第二个节点,并且将第二个节点设置为firstWaiter if ( (firstWaiter = first.nextWaiter) == null) // 说明就一个节点在Condition队列中,那么直接将firstWaiter和lastWaiter置位null lastWaiter = null; // 如果还有nextWaiter节点,因为当前节点要被唤醒了,脱离整个Condition队列。将nextWaiter置位null first.nextWaiter = null; // 如果transferForSignal返回true,一切正常,退出while循环 } while (!transferForSignal(first) && // 如果后续节点还有,往后面继续唤醒,如果没有,退出while循环 (first = firstWaiter) != null); } // 准备开始唤醒在Condition中排队的Node final boolean transferForSignal(Node node) { // 将在Condition队列中的Node的状态从-2,改为0,代表要扔到AQS队列了。 if (!compareAndSetWaitStatus(node, Node.CONDITION, 0)) // 如果失败了,说明在signal之前应当是线程被中断了,从而被唤醒了。 return false; // 如果正常的将Node的状态从-2改为0,这是就要将Condition中的这个Node扔到AQS的队列。 // 将当前Node扔到AQS队列,返回的p是当前Node的prev Node p = enq(node); // 获取上一个Node的状态 int ws = p.waitStatus; // 如果ws > 0 ,说明这个Node已经被取消了。 // 如果ws状态不是取消,将prev节点的状态改为-1,。 if (ws > 0 || !compareAndSetWaitStatus(p, ws, Node.SIGNAL)) // 如果prev节点已经取消了,可能会导致当前节点永远无法被唤醒。立即唤醒当前节点,基于acquireQueued方法, // 让当前节点找到一个正常的prev节点,并挂起线程 // 如果prev节点正常,但是CAS修改prev节点失败了。证明prev节点因为并发原因导致状态改变。还是为了避免当前 // 节点无法被正常唤醒,提前唤醒当前线程,基于acquireQueued方法,让当前节点找到一个正常的prev节点,并挂起线程 LockSupport.unpark(node.thread); // 返回true return true; }

Conditiond的await方法分析(后置分析)

分为了几个部分:

  • 唤醒之后,要先确认是中断唤醒还是signal唤醒,还是signal唤醒后被中断
  • 确保当前线程的Node已经在AQS队列中
  • 执行acquireQueued方法,等待锁资源。
  • 在获取锁资源后,要确认是否在获取锁资源的阶段被中断过,如果被中断过,并且不是THROW_IE,那就确保interruptMode是REINTERRUPT。
  • 确认当前Node已经不在Condition队列中了
  • 最终根据interruptMode来决定具体做的事情
  • 0:嘛也不做。
  • THROW_IE:抛出异常
  • REINTERRUPT:执行线程的interrupt方法
java
// 现在分析await方法的后半部分 public final void await() throws InterruptedException { if (Thread.interrupted()) throw new InterruptedException(); Node node = addConditionWaiter(); int savedState = fullyRelease(node); // 中断模式~ int interruptMode = 0; while (!isOnSyncQueue(node)) { LockSupport.park(this); // 如果线程执行到这,说明现在被唤醒了。 // 线程可以被signal唤醒。(如果是signal唤醒,可以确认线程已经在AQS队列中) // 线程可以被interrupt唤醒,线程被唤醒后,没有在AQS队列中。 // 如果线程先被signal唤醒,然后线程中断了。。。。(做一些额外处理) // checkInterruptWhileWaiting可以确认当前中如何唤醒的。 // 返回的值,有三种 // 0:正常signal唤醒,没别的事(不知道Node是否在AQS队列) // THROW_IE(-1):中断唤醒,并且可以确保在AQS队列 // REINTERRUPT(1):signal唤醒,但是线程被中断了,并且可以确保在AQS队列 if ((interruptMode = checkInterruptWhileWaiting(node)) != 0) break; } // Node一定在AQS队列 // 执行acquireQueued,尝试在ReentrantLock中获取锁资源。 // acquireQueued方法返回true:代表线程在AQS队列中挂起时,被中断过 if (acquireQueued(node, savedState) && interruptMode != THROW_IE) // 如果线程在AQS队列排队时,被中断了,并且不是THROW_IE状态,确保线程的interruptMode是REINTERRUPT // REINTERRUPT:await不是中断唤醒,但是后续被中断过!!! interruptMode = REINTERRUPT; // 如果当前Node还在condition的单向链表中,脱离Condition的单向链表 if (node.nextWaiter != null) unlinkCancelledWaiters(); // 如果interruptMode是0,说明线程在signal后以及持有锁的过程中,没被中断过,什么事都不做! if (interruptMode != 0) // 如果不是0~ reportInterruptAfterWait(interruptMode); } // 判断当前线程被唤醒的模式,确认interruptMode的值。 private int checkInterruptWhileWaiting(Node node) { // 判断线程是否中断了。 return Thread.interrupted() ? // THROW_IE:代表线程是被interrupt唤醒的,需要向上排除异常 // REINTERRUPT:代表线程是signal唤醒的,但是在唤醒之后,被中断了。 (transferAfterCancelledWait(node) ? THROW_IE : REINTERRUPT) : // 线程是正常的被signal唤醒,并且线程没有中断过。 0; } // 判断线程到底是中断唤醒的,还是signal唤醒的! final boolean transferAfterCancelledWait(Node node) { // 基于CAS将Node的状态从-2改为0 if (compareAndSetWaitStatus(node, Node.CONDITION, 0)) { // 说明是中断唤醒的线程。因为CAS成功了。 // 将Node添加到AQS队列中~(如果是中断唤醒的,当前线程同时存在Condition的单向链表以及AQS的队列中) enq(node); // 返回true return true; } // 判断当前的Node是否在AQS队列(signal唤醒的,但是可能线程还没放到AQS队列) // 等到signal方法将线程的Node扔到AQS队列后,再做后续操作 while (!isOnSyncQueue(node)) // 如果没在AQS队列上,那就线程让步,稍等一会,Node放到AQS队列再处理(看CPU) Thread.yield(); // signal唤醒的,返回false return false; } // 确认Node是否在AQS队列上 final boolean isOnSyncQueue(Node node) { // 如果线程状态为-2,肯定没在AQS队列 // 如果prev节点的值为null,肯定没在AQS队列 if (node.waitStatus == Node.CONDITION || node.prev == null) // 返回false return false; // 如果节点的next不为null。说明已经在AQS队列上。、 if (node.next != null) // 确定AQS队列上有! return true; // 如果上述判断都没有确认节点在AQS队列上,在AQS队列中寻找一波 return findNodeFromTail(node); } // 在AQS队列中找当前节点 private boolean findNodeFromTail(Node node) { // 拿到尾节点 Node t = tail; for (;;) { // tail是否是当前节点,如果是,说明在AQS队列 if (t == node) // 可以跳出while循环 return true; // 如果节点为null,AQS队列中没有当前节点 if (t == null) // 进入while,让步一手 return false; // t向前引用 t = t.prev; } } private void reportInterruptAfterWait(int interruptMode) throws InterruptedException { // 如果是中断唤醒的await,直接抛出异常! if (interruptMode == THROW_IE) throw new InterruptedException(); // 如果是REINTERRUPT,signal后被中断过 else if (interruptMode == REINTERRUPT) // 确认线程的中断标记位是true // Thread.currentThread().interrupt(); selfInterrupt(); }

Condition的awaitNanos&signalAll方法分析

awaitNanos:仅仅是在await方法的基础上,做了一内内的改变,整体的逻辑思想都是一样的。

挂起线程时,传入要阻塞的时间,时间到了,自动唤醒,走添加到AQS队列的逻辑

java
// await指定时间,多了个时间到了自动醒。 public final long awaitNanos(long nanosTimeout) throws InterruptedException { if (Thread.interrupted()) throw new InterruptedException(); Node node = addConditionWaiter(); int savedState = fullyRelease(node); // deadline:当前线程最多挂起到什么时间点 final long deadline = System.nanoTime() + nanosTimeout; int interruptMode = 0; while (!isOnSyncQueue(node)) { // nanosTimeout的时间小于等于0,直接告辞!! if (nanosTimeout <= 0L) { // 正常扔到AQS队列 transferAfterCancelledWait(node); break; } // nanosTimeout的时间大于1000纳秒时,才可以挂起线程 if (nanosTimeout >= spinForTimeoutThreshold) // 如果大于,正常挂起 LockSupport.parkNanos(this, nanosTimeout); if ((interruptMode = checkInterruptWhileWaiting(node)) != 0) break; // 计算剩余的挂起时间,可能需要重新的走while循环,再次挂起线程 nanosTimeout = deadline - System.nanoTime(); } if (acquireQueued(node, savedState) && interruptMode != THROW_IE) interruptMode = REINTERRUPT; if (node.nextWaiter != null) unlinkCancelledWaiters(); if (interruptMode != 0) reportInterruptAfterWait(interruptMode); // 剩余的挂起时间 return deadline - System.nanoTime(); }

signalAll方法。这个方法一看就懂,之前signal是唤醒1个,这个是全部唤醒

java
// 以do-while的形式,将Condition单向链表中的所有Node,全部唤醒并扔到AQS队列 private void doSignalAll(Node first) { // 将头尾都置位null~ lastWaiter = firstWaiter = null; do { // 拿到next节点的引用 Node next = first.nextWaiter; // 断开当前Node的nextWaiter first.nextWaiter = null; // 修改Node状态,扔AQS队列,是否唤醒! transferForSignal(first); // 指向下一个节点 first = next; } while (first != null); }

总结

AQS是Java并发包的引擎,理解其原理能帮助开发者更高效地使用锁和同步器,甚至自定义高性能同步组件。核心价值在于:将复杂的线程排队、阻塞/唤醒逻辑标准化,开发者只需关注状态管理。

本文作者:柳始恭

本文链接:

版权声明:本博客所有文章除特别声明外,均采用 BY-NC-SA 许可协议。转载请注明出处!