2025-06-12
内卷九股文
0

目录

ReentrantReadWriteLock
读写锁的特性
锁的公平性
锁降级
读写锁的实现思想
写锁源码分析
写锁加锁流程概述
写锁加锁源码分析
写锁释放锁流程概述&释放锁源码
读锁源码分析
读锁加锁流程概述
基础读锁流程
读锁重入流程
读锁加锁的后续逻辑fullTryAcquireShared
读线程在AQS队列获取锁资源的后续操作
读锁的释放锁流程
总结

前边聊过 ReentrantLock 互斥锁,知道在并发情况下锁竞争的时候通过CAS在竞争锁。那在多线程环境下,如何平衡数据一致性与并发性能?ReentrantReadWriteLock 通过读写分离策略给出了优雅的解决方案。

ReentrantReadWriteLock

ReentrantReadWriteLock 是 Java 中 java.util.concurrent.locks 包提供的一个可重入的读写锁,它允许多个读线程同时访问共享资源,但在写线程存在时不允许任何读线程或写线程访问。这种机制非常适合读多写少的场景。

锁机制

  • 读锁:共享(SHARED),不互斥(多个线程可同时持有)
  • 写锁:独占(EXCLUSIVE),互斥(与其他读/写锁互斥)

为什么要出现读写锁

ReentrantLock 是互斥锁,在多线程访问共享资源时,读操作通常不会修改数据,可以并发执行,而写操作需要独占访问以保证数据一致性。如果有一个操作是读多写少,同时还需要保证线程安全,那么使用互斥锁(如 synchronizedReentrantLock)在读多写少的场景下会成为性能瓶颈。

读写锁的特性

锁的公平性

ReentrantLock 一致,也是分为公平锁与非公平锁,不过读写锁是在 读锁写锁 的基础上,分别实现的公平锁与非公平锁

java
/** * 默认非公平锁 - Creates a new {@code ReentrantReadWriteLock} with default (nonfair) ordering properties. */ public ReentrantReadWriteLock() { this(false); } /** * Creates a new {@code ReentrantReadWriteLock} with * the given fairness policy. * * @param fair {@code true} if this lock should use a fair ordering policy */ public ReentrantReadWriteLock(boolean fair) { // 创建公平锁、非公平锁 sync = fair ? new FairSync() : new NonfairSync(); readerLock = new ReadLock(this); writerLock = new WriteLock(this); } // 读锁与写锁 public ReentrantReadWriteLock.WriteLock writeLock() { return writerLock; } public ReentrantReadWriteLock.ReadLock readLock() { return readerLock; } // 非公平锁(默认) ReentrantReadWriteLock lock = new ReentrantReadWriteLock(); // 公平锁 ReentrantReadWriteLock fairLock = new ReentrantReadWriteLock(true);

锁降级

锁的降级是关键特性,可以将写锁降级为读锁

java
public class CachedData { private final ReentrantReadWriteLock rwl = new ReentrantReadWriteLock(); private Object data; private volatile boolean cacheValid; public void processCachedData() { rwl.readLock().lock(); // ① 获取读锁 if (!cacheValid) { // 发现缓存失效,准备升级为写锁 rwl.readLock().unlock(); // ② 释放读锁 rwl.writeLock().lock(); // ③ 获取写锁 try { // 双重检查(避免重复初始化) if (!cacheValid) { data = loadDataFromDB(); // 加载数据 cacheValid = true; } // 降级为读锁(在写锁未释放前获取读锁) rwl.readLock().lock(); } finally { rwl.writeLock().unlock(); // ④ 写锁降级为读锁 } } try { use(data); // 使用数据(受读锁保护) } finally { rwl.readLock().unlock(); // ⑤ 释放读锁 } } }

避免锁饥饿

公平模式下,长时间持有写锁可能导致读线程阻塞,非公平模式可能使写线程无限等待

读写锁的实现思想

可重入性

ReentrantReadWriteLock 也是基于AQS实现的,很多功能的实现和ReentrantLock类似,还是对state进行操作,拿到锁资源就去干活,如果没有拿到,依然去AQS队列中排队。

  • 读锁操作:基于state的高16位进行操作。

  • 写锁操作:基于state的低16为进行操作。

写锁重入

读写锁中的写锁的重入方式,基本和ReentrantLock一致,没有什么区别,依然是对state进行+1操作即可,只要确认持有锁资源的线程,是当前写锁线程即可。只不过之前ReentrantLock的重入次数是state的正数取值范围,但是读写锁中写锁范围就变小了。

读锁重入

因为读锁是共享锁。读锁在获取锁资源操作时,是要对state的高16位进行 + 1操作。因为读锁是共享锁,所以同一时间会有多个读线程持有读锁资源。这样一来,多个读操作在持有读锁时,无法确认每个线程读锁重入的次数。为了去记录读锁重入的次数,每个读操作的线程,都会有一个ThreadLocal记录锁重入的次数。

写锁的饥饿问题

读锁是共享锁,当有线程持有读锁资源时,再来一个线程想要获取读锁,直接对state修改即可。在读锁资源先被占用后,来了一个写锁资源,此时,大量的需要获取读锁的线程来请求锁资源,如果可以绕过写锁,直接拿资源,会造成写锁长时间无法获取到写锁资源。

读锁在拿到锁资源后,如果再有读线程需要获取读锁资源,需要去AQS队列排队。如果队列的前面需要写锁资源的线程,那么后续读线程是无法拿到锁资源的。持有读锁的线程,只会让写锁线程之前的读线程拿到锁资源

因为写操作和其他操作是互斥的,代表同一时间,只有一个线程持有着写锁,只要锁重入,就对低位+1即可。而且锁重入的限制,从原来的2^31 - 1,变为了2 ^ 16 -1。变短了~~

读锁的重入不能仿照写锁的方式,因为写锁属于互斥锁,同一时间只会有一个线程持有写锁,但是读锁是共享锁,同一时间会有多个线程持有读锁。所以每个获取到读锁的线程,记录锁重入的方式都是基于自己的ThreadLocal存储锁重入次数。

读锁重入的时候就不操作state了?不对,每次锁重入还要修改state,只是记录当前线程锁重入的次数,需要基于ThreadLocal记录

java
00000000 00000000 00000000 00000000 : state 写锁: 00000000 00000000 00000000 00000001 写锁: 00000000 00000000 00000000 00000010 A读锁:拿不到,排队 00000000 00000000 00000000 00000010 写锁全部释放(唤醒) 00000000 00000000 00000000 00000000 A读锁: 00000000 00000001 00000000 00000000 B读锁: 00000000 00000010 00000000 00000000 B再次读锁: 00000000 00000011 00000000 00000000

每个读操作的线程,在获取读锁时,都需要开辟一个ThreadLocal。读写锁为了优化这个事情,做了两手操作:

  • 第一个拿到读锁的线程,不用ThreadLocal记录重入次数,在读写锁内有有一个firstRead记录重入次数
  • 还记录了最后一个拿到读锁的线程的重入次数,交给cachedHoldCounter属性标识,可以避免频繁的在锁重入时,从TL中获取

上面了解了读锁与写锁的重入后,那么思考下,读写锁之间能不能重入呢?

我们来通过代码示例尝试下,结果发现在单个线程获取读锁后,再次获取写锁,是拿不到锁的,也就是读写锁是不可重入的

java
public class XxxTest { // 读写锁! static ReentrantReadWriteLock lock = new ReentrantReadWriteLock(); // 写锁 static ReentrantReadWriteLock.WriteLock writeLock = lock.writeLock(); // 读锁 static ReentrantReadWriteLock.ReadLock readLock = lock.readLock(); public static void main(String[] args) throws InterruptedException { readLock.lock(); try { System.out.println("拿到读锁!"); } finally { readLock.unlock(); } writeLock.lock(); try { System.out.println("拿到写锁!"); } finally { writeLock.unlock(); } } }

核心原则

  • 读-读不互斥:多个线程可同时持有读锁

  • 读-写/写-读互斥:读锁与写锁不能共存

  • 写-写互斥:同一时刻只允许一个写锁

写锁源码分析

写锁加锁流程概述

image.png

写锁加锁源码分析

写锁加锁流程

java
// 写锁加锁的入口 public void lock() { sync.acquire(1); } // 阿巴阿巴!! public final void acquire(int arg) { if (!tryAcquire(arg) && acquireQueued(addWaiter(Node.EXCLUSIVE), arg)) selfInterrupt(); } // 读写锁的写锁实现tryAcquire protected final boolean tryAcquire(int acquires) { // 拿到当前线程 Thread current = Thread.currentThread(); // 拿到state的值 int c = getState(); // 得到state低16位的值 int w = exclusiveCount(c); // 判断是否有线程持有着锁资源 if (c != 0) { // 当前没有线程持有写锁,读写互斥,告辞。 // 有线程持有写锁,持有写锁的线程不是当前线程,不是锁重入,告辞。 if (w == 0 || current != getExclusiveOwnerThread()) return false; // 当前线程持有写锁。 锁重入。 if (w + exclusiveCount(acquires) > MAX_COUNT) throw new Error("Maximum lock count exceeded"); // 没有超过锁重入的次数,正常 + 1 setState(c + acquires); return true; } // 尝试获取锁资源 if (writerShouldBlock() || // CAS拿锁 !compareAndSetState(c, c + acquires)) return false; // 拿锁成功,设置占有互斥锁的线程 setExclusiveOwnerThread(current); // 返回true return true; } // ================================================================ // 这个方法是将state的低16位的值拿到 int w = exclusiveCount(c); state & ((1 << 16) - 1) 00000000 00000000 00000000 00000001 == 1 00000000 00000001 00000000 00000000 == 1 << 16 00000000 00000000 11111111 11111111 == (1 << 16) - 1 &运算,一个为0,必然为0,都为1,才为1 // ================================================================ // writerShouldBlock方法查看公平锁和非公平锁的效果 // 非公平锁直接返回false执行CAS尝试获取锁资源 // 公平锁需要查看是否有排队的,如果有排队的,我是否是head的next

写锁释放锁流程概述&释放锁源码

释放的流程和ReentrantLock一致,只是在判断释放是否干净时,判断低16位的值

java
// 写锁释放锁的tryRelease方法 protected final boolean tryRelease(int releases) { // 判断当前持有写锁的线程是否是当前线程 if (!isHeldExclusively()) throw new IllegalMonitorStateException(); // 获取state - 1 int nextc = getState() - releases; // 判断低16位结果是否为0,如果为0,free设置为true boolean free = exclusiveCount(nextc) == 0; if (free) // 将持有锁的线程设置为null setExclusiveOwnerThread(null); // 设置给state setState(nextc); // 释放干净,返回true。 写锁有冲入,这里需要返回false,不去释放排队的Node return free; }

读锁源码分析

读锁加锁流程概述

  1. 分析读锁加速的基本流程

  2. 分析读锁的可重入锁实现以及优化

  3. 解决ThreadLocal内存泄漏问题

  4. 读锁获取锁自后,如果唤醒AQS中排队的读线程

基础读锁流程

image.png

针对上述简单逻辑的源码分析

java
// 读锁加锁的方法入口 public final void acquireShared(int arg) { // 竞争锁资源滴干活 if (tryAcquireShared(arg) < 0) // 没拿到锁资源,去排队 doAcquireShared(arg); } // 读锁竞争锁资源的操作 protected final int tryAcquireShared(int unused) { // 拿到当前线程 Thread current = Thread.currentThread(); // 拿到state int c = getState(); // 拿到state的低16位,判断 != 0,有写锁占用着锁资源 // 并且,当前占用锁资源的线程不是当前线程 if (exclusiveCount(c) != 0 && getExclusiveOwnerThread() != current) // 写锁被其他线程占用,无法获取读锁,直接返回 -1,去排队 return -1; // 没有线程持有写锁、当前线程持有写锁 // 获取读锁的信息,state的高16位。 int r = sharedCount(c); // 公平锁:就查看队列是由有排队的,有排队的,直接告辞,进不去if,后面也不用判断(没人排队继续走) // 非公平锁:没有排队的,直接抢。 有排队的,但是读锁其实不需要排队,如果出现这个情况,大部分是写锁资源刚刚释放, // 后续Node还没有来记得拿到读锁资源,当前竞争的读线程,可以直接获取 if (!readerShouldBlock() && // 判断持有读锁的临界值是否达到 r < MAX_COUNT && // CAS修改state,对高16位进行 + 1 compareAndSetState(c, c + SHARED_UNIT)) { // 省略部分代码!!!! return 1; } return fullTryAcquireShared(current); } // 非公平锁的判断 final boolean apparentlyFirstQueuedIsExclusive() { Node h, s; return (h = head) != null && // head为null,可以直接抢占锁资源 (s = h.next) != null && // head的next为null,可以直接抢占锁资源 !s.isShared() && // 如果排在head后面的Node,是共享锁,可以直接抢占锁资源。 s.thread != null; // 后面排队的thread为null,可以直接抢占锁资源 }

读锁重入流程

前面阐述过,读锁为了记录锁重入的次数,需要让每个读线程用 ThreadLocal 存储重入次数,ReentrantReadWriteLock 对读锁重入做了一些优化操作

ReentrantReadWriteLock 在内部对ThreadLocal做了封装,基于HoldCount的对象存储重入次数,在内部有个count属性记录,而且每个线程都是自己的ThreadLocalHoldCounter,所以可以直接对内部的count进行++操作。

第一个拿到读锁资源的线程,不需要通过 ThreadLocal存储,内部提供了两个属性来记录第一个拿到读锁资源线程的信息

内部提供了 firstReader 记录第一个拿到读锁资源的线程,firstReaderHoldCount记录firstReader的锁重入次数

最后一个拿到读锁资源的线程,也会缓存他的重入次数,这样++起来更方便,基于cachedHoldCounter 缓存最后一个拿到锁资源现成的重入次数

重入次数的流程执行方式

  1. 判断当前线程是否是第一个拿到读锁资源的:如果是,直接将 firstReader 以及firstReaderHoldCount 设置为当前线程的信息

  2. 判断当前线程是否是 firstReader:如果是,直接对 firstReaderHoldCount++ 即可。

  3. firstReader 没关系了,先获取 cachedHoldCounter,判断是否是当前线程。

    如果不是,获取当前线程的重入次数,将cachedHoldCounter设置为当前线程。 如果是,判断当前重入次数是否为0,重新设置当前线程的锁从入信息到readHolds(ThreadLocal)中,算是初始化操作,重入次数是0 前面两者最后都做count++

上述逻辑源码分析

java
protected final int tryAcquireShared(int unused) { Thread current = Thread.currentThread(); int c = getState(); if (exclusiveCount(c) != 0 && getExclusiveOwnerThread() != current) return -1; int r = sharedCount(c); if (!readerShouldBlock() && r < MAX_COUNT && compareAndSetState(c, c + SHARED_UNIT)) { // =============================================================== // 判断r == 0,当前是第一个拿到读锁资源的线程 if (r == 0) { // 将firstReader设置为当前线程 firstReader = current; // 将count设置为1 firstReaderHoldCount = 1; } // 判断当前线程是否是第一个获取读锁资源的线程 else if (firstReader == current) { // 直接++。 firstReaderHoldCount++; } // 到这,就说明不是第一个获取读锁资源的线程 else { // 那获取最后一个拿到读锁资源的线程 HoldCounter rh = cachedHoldCounter; // 判断当前线程是否是最后一个拿到读锁资源的线程 if (rh == null || rh.tid != getThreadId(current)) // 如果不是,设置当前线程为cachedHoldCounter cachedHoldCounter = rh = readHolds.get(); // 当前线程是之前的cacheHoldCounter else if (rh.count == 0) // 将当前的重入信息设置到ThreadLocal中 readHolds.set(rh); // 重入的++ rh.count++; } // =============================================================== return 1; } return fullTryAcquireShared(current); }

读锁加锁的后续逻辑fullTryAcquireShared

java
// tryAcquireShard方法中,如果没有拿到锁资源,走这个方法,尝试再次获取,逻辑跟上面基本一致。 final int fullTryAcquireShared(Thread current) { // 声明当前线程的锁重入次数 HoldCounter rh = null; // 死循环 for (;;) { // 再次拿到state int c = getState(); // 当前如果有写锁在占用锁资源,并且不是当前线程,返回-1,走排队策略 if (exclusiveCount(c) != 0) { if (getExclusiveOwnerThread() != current) return -1; } // 查看当前是否可以尝试竞争锁资源(公平锁和非公平锁的逻辑) else if (readerShouldBlock()) { // 无论公平还是非公平,只要进来,就代表要放到AQS队列中了,先做一波准备 // 在处理ThreadLocal的内存泄漏问题 if (firstReader == current) { // 如果当前当前线程是之前的firstReader,什么都不用做 } else { // 第一次进来是null。 if (rh == null) { // 拿到最后一个获取读锁的线程 rh = cachedHoldCounter; // 当前线程并不是cachedHoldCounter,没到拿到 if (rh == null || rh.tid != getThreadId(current)) { // 从自己的ThreadLocal中拿到重入计数器 rh = readHolds.get(); // 如果计数器为0,说明之前没拿到过读锁资源 if (rh.count == 0) // remove,避免内存泄漏 readHolds.remove(); } } // 前面处理完之后,直接返回-1 if (rh.count == 0) return -1; } } // 判断重入次数,是否超出阈值 if (sharedCount(c) == MAX_COUNT) throw new Error("Maximum lock count exceeded"); // CAS尝试获取锁资源 if (compareAndSetState(c, c + SHARED_UNIT)) { if (sharedCount(c) == 0) { firstReader = current; firstReaderHoldCount = 1; } else if (firstReader == current) { firstReaderHoldCount++; } else { if (rh == null) rh = cachedHoldCounter; if (rh == null || rh.tid != getThreadId(current)) rh = readHolds.get(); else if (rh.count == 0) readHolds.set(rh); rh.count++; cachedHoldCounter = rh; // cache for release } return 1; } } }

读线程在AQS队列获取锁资源的后续操作

正常如果都是读线程来获取读锁资源,不需要使用到AQS队列的,直接CAS操作即可

如果写线程持有着写锁,这是读线程就需要进入到AQS队列排队,可能会有多个读线程在AQS中。

当写锁释放资源后,会唤醒head后面的读线程,当head后面的读线程拿到锁资源后,还需要查看next节点是否也是读线程在阻塞,如果是,直接唤醒

源码分析

java
// 读锁需要排队的操作 private void doAcquireShared(int arg) { // 声明Node,类型是共享锁,并且扔到AQS中排队 final Node node = addWaiter(Node.SHARED); boolean failed = true; try { boolean interrupted = false; for (;;) { // 拿到上一个节点 final Node p = node.predecessor(); // 如果prev节点是head,直接可以执行tryAcquireShared if (p == head) { int r = tryAcquireShared(arg); if (r >= 0) { // 拿到读锁资源后,需要做的后续处理 setHeadAndPropagate(node, r); p.next = null; // help GC if (interrupted) selfInterrupt(); failed = false; return; } } // 找到prev有效节点,将状态设置为-1,挂起当前线程 if (shouldParkAfterFailedAcquire(p, node) && parkAndCheckInterrupt()) interrupted = true; } } finally { if (failed) cancelAcquire(node); } } private void setHeadAndPropagate(Node node, int propagate) { // 拿到head节点 Node h = head; // 将当前节点设置为head节点 setHead(node); // 第一个判断更多的是在信号量有处理JDK1.5 BUG的操作。 if (propagate > 0 || h == null || h.waitStatus < 0 || (h = head) == null || h.waitStatus < 0) { // 拿到当前Node的next节点 Node s = node.next; // 如果next节点是共享锁,直接唤醒next节点 if (s == null || s.isShared()) doReleaseShared(); } }

读锁的释放锁流程

  1. 处理重入以及state的值

  2. 唤醒后续排队的Node

源码分析

java
// 读锁释放锁流程 public final boolean releaseShared(int arg) { // tryReleaseShared:处理state的值,以及可重入的内容 if (tryReleaseShared(arg)) { // AQS队列的事! doReleaseShared(); return true; } return false; } // 1、 处理重入问题 2、 处理state protected final boolean tryReleaseShared(int unused) { // 拿到当前线程 Thread current = Thread.currentThread(); // 如果是firstReader,直接干活,不需要ThreadLocal if (firstReader == current) { // assert firstReaderHoldCount > 0; if (firstReaderHoldCount == 1) firstReader = null; else firstReaderHoldCount--; } // 不是firstReader,从cachedHoldCounter以及ThreadLocal处理 else { // 如果是cachedHoldCounter,正常-- HoldCounter rh = cachedHoldCounter; // 如果不是cachedHoldCounter,从自己的ThreadLocal中拿 if (rh == null || rh.tid != getThreadId(current)) rh = readHolds.get(); int count = rh.count; // 如果为1或者更小,当前线程就释放干净了,直接remove,避免value内存泄漏 if (count <= 1) { readHolds.remove(); // 如果已经是0,没必要再unlock,扔个异常 if (count <= 0) throw unmatchedUnlockException(); } // -- 走你。 --rh.count; } for (;;) { // 拿到state,高16位,-1,成功后,返回state是否为0 int c = getState(); int nextc = c - SHARED_UNIT; if (compareAndSetState(c, nextc)) return nextc == 0; } } // 唤醒AQS中排队的线程 private void doReleaseShared() { // 死循环 for (;;) { // 拿到头 Node h = head; // 说明有排队的 if (h != null && h != tail) { // 拿到head的状态 int ws = h.waitStatus; // 判断是否为 -1 if (ws == Node.SIGNAL) { // 到这,说明后面有挂起的线程,先基于CAS将head的状态从-1,改为0 if (!compareAndSetWaitStatus(h, Node.SIGNAL, 0)) continue; // 唤醒后续节点 unparkSuccessor(h); } // 这里不是给读写锁准备的,在信号量里说。。。 else if (ws == 0 && !compareAndSetWaitStatus(h, 0, Node.PROPAGATE)) continue; } // 这里是出口 if (h == head) break; } }

总结

ReentrantReadWriteLock 通过读写分离策略,在保证线程安全的前提下大幅提升系统吞吐量。其精妙之处在于锁降级机制,既避免了数据不一致性,又减少了锁竞争开销。在分布式配置中心、实时风控系统等读密集型场景中,合理使用读写锁可使性能提升数倍。

技术选型需结合具体场景:当写操作超过40%时,建议回归互斥锁;对于极端高性能需求,可考虑 StampedLock 的乐观读模式。

本文作者:柳始恭

本文链接:

版权声明:本博客所有文章除特别声明外,均采用 BY-NC-SA 许可协议。转载请注明出处!