前边聊过 ReentrantLock
互斥锁,知道在并发情况下锁竞争的时候通过CAS在竞争锁。那在多线程环境下,如何平衡数据一致性与并发性能?ReentrantReadWriteLock 通过读写分离策略给出了优雅的解决方案。
ReentrantReadWriteLock
是 Java 中 java.util.concurrent.locks 包提供的一个可重入的读写锁,它允许多个读线程同时访问共享资源,但在写线程存在时不允许任何读线程或写线程访问。这种机制非常适合读多写少的场景。
锁机制
为什么要出现读写锁
ReentrantLock
是互斥锁,在多线程访问共享资源时,读操作通常不会修改数据,可以并发执行,而写操作需要独占访问以保证数据一致性。如果有一个操作是读多写少,同时还需要保证线程安全,那么使用互斥锁(如 synchronized
或 ReentrantLock
)在读多写少的场景下会成为性能瓶颈。
与 ReentrantLock
一致,也是分为公平锁与非公平锁,不过读写锁是在 读锁 与写锁 的基础上,分别实现的公平锁与非公平锁
java/**
* 默认非公平锁 - Creates a new {@code ReentrantReadWriteLock} with default (nonfair) ordering properties.
*/
public ReentrantReadWriteLock() {
this(false);
}
/**
* Creates a new {@code ReentrantReadWriteLock} with
* the given fairness policy.
*
* @param fair {@code true} if this lock should use a fair ordering policy
*/
public ReentrantReadWriteLock(boolean fair) {
// 创建公平锁、非公平锁
sync = fair ? new FairSync() : new NonfairSync();
readerLock = new ReadLock(this);
writerLock = new WriteLock(this);
}
// 读锁与写锁
public ReentrantReadWriteLock.WriteLock writeLock() { return writerLock; }
public ReentrantReadWriteLock.ReadLock readLock() { return readerLock; }
// 非公平锁(默认)
ReentrantReadWriteLock lock = new ReentrantReadWriteLock();
// 公平锁
ReentrantReadWriteLock fairLock = new ReentrantReadWriteLock(true);
锁的降级是关键特性,可以将写锁降级为读锁
javapublic class CachedData {
private final ReentrantReadWriteLock rwl = new ReentrantReadWriteLock();
private Object data;
private volatile boolean cacheValid;
public void processCachedData() {
rwl.readLock().lock(); // ① 获取读锁
if (!cacheValid) {
// 发现缓存失效,准备升级为写锁
rwl.readLock().unlock(); // ② 释放读锁
rwl.writeLock().lock(); // ③ 获取写锁
try {
// 双重检查(避免重复初始化)
if (!cacheValid) {
data = loadDataFromDB(); // 加载数据
cacheValid = true;
}
// 降级为读锁(在写锁未释放前获取读锁)
rwl.readLock().lock();
} finally {
rwl.writeLock().unlock(); // ④ 写锁降级为读锁
}
}
try {
use(data); // 使用数据(受读锁保护)
} finally {
rwl.readLock().unlock(); // ⑤ 释放读锁
}
}
}
避免锁饥饿
公平模式下,长时间持有写锁可能导致读线程阻塞,非公平模式可能使写线程无限等待
可重入性
ReentrantReadWriteLock 也是基于AQS实现的,很多功能的实现和ReentrantLock类似,还是对state进行操作,拿到锁资源就去干活,如果没有拿到,依然去AQS队列中排队。
读锁操作:基于state的高16位进行操作。
写锁操作:基于state的低16为进行操作。
写锁重入
读写锁中的写锁的重入方式,基本和ReentrantLock一致,没有什么区别,依然是对state进行+1操作即可,只要确认持有锁资源的线程,是当前写锁线程即可。只不过之前ReentrantLock的重入次数是state的正数取值范围,但是读写锁中写锁范围就变小了。
读锁重入
因为读锁是共享锁。读锁在获取锁资源操作时,是要对state的高16位进行 + 1操作。因为读锁是共享锁,所以同一时间会有多个读线程持有读锁资源。这样一来,多个读操作在持有读锁时,无法确认每个线程读锁重入的次数。为了去记录读锁重入的次数,每个读操作的线程,都会有一个ThreadLocal记录锁重入的次数。
写锁的饥饿问题
读锁是共享锁,当有线程持有读锁资源时,再来一个线程想要获取读锁,直接对state修改即可。在读锁资源先被占用后,来了一个写锁资源,此时,大量的需要获取读锁的线程来请求锁资源,如果可以绕过写锁,直接拿资源,会造成写锁长时间无法获取到写锁资源。
读锁在拿到锁资源后,如果再有读线程需要获取读锁资源,需要去AQS队列排队。如果队列的前面需要写锁资源的线程,那么后续读线程是无法拿到锁资源的。持有读锁的线程,只会让写锁线程之前的读线程拿到锁资源
因为写操作和其他操作是互斥的,代表同一时间,只有一个线程持有着写锁,只要锁重入,就对低位+1即可。而且锁重入的限制,从原来的2^31 - 1,变为了2 ^ 16 -1。变短了~~
读锁的重入不能仿照写锁的方式,因为写锁属于互斥锁,同一时间只会有一个线程持有写锁,但是读锁是共享锁,同一时间会有多个线程持有读锁。所以每个获取到读锁的线程,记录锁重入的方式都是基于自己的ThreadLocal存储锁重入次数。
读锁重入的时候就不操作state了?不对,每次锁重入还要修改state,只是记录当前线程锁重入的次数,需要基于ThreadLocal记录
java00000000 00000000 00000000 00000000 : state
写锁:
00000000 00000000 00000000 00000001
写锁:
00000000 00000000 00000000 00000010
A读锁:拿不到,排队
00000000 00000000 00000000 00000010
写锁全部释放(唤醒)
00000000 00000000 00000000 00000000
A读锁:
00000000 00000001 00000000 00000000
B读锁:
00000000 00000010 00000000 00000000
B再次读锁:
00000000 00000011 00000000 00000000
每个读操作的线程,在获取读锁时,都需要开辟一个ThreadLocal。读写锁为了优化这个事情,做了两手操作:
上面了解了读锁与写锁的重入后,那么思考下,读写锁之间能不能重入呢?
我们来通过代码示例尝试下,结果发现在单个线程获取读锁后,再次获取写锁,是拿不到锁的,也就是读写锁是不可重入的
javapublic class XxxTest {
// 读写锁!
static ReentrantReadWriteLock lock = new ReentrantReadWriteLock();
// 写锁
static ReentrantReadWriteLock.WriteLock writeLock = lock.writeLock();
// 读锁
static ReentrantReadWriteLock.ReadLock readLock = lock.readLock();
public static void main(String[] args) throws InterruptedException {
readLock.lock();
try {
System.out.println("拿到读锁!");
} finally {
readLock.unlock();
}
writeLock.lock();
try {
System.out.println("拿到写锁!");
} finally {
writeLock.unlock();
}
}
}
核心原则
读-读不互斥:多个线程可同时持有读锁
读-写/写-读互斥:读锁与写锁不能共存
写-写互斥:同一时刻只允许一个写锁
写锁加锁流程
java// 写锁加锁的入口
public void lock() {
sync.acquire(1);
}
// 阿巴阿巴!!
public final void acquire(int arg) {
if (!tryAcquire(arg) &&
acquireQueued(addWaiter(Node.EXCLUSIVE), arg))
selfInterrupt();
}
// 读写锁的写锁实现tryAcquire
protected final boolean tryAcquire(int acquires) {
// 拿到当前线程
Thread current = Thread.currentThread();
// 拿到state的值
int c = getState();
// 得到state低16位的值
int w = exclusiveCount(c);
// 判断是否有线程持有着锁资源
if (c != 0) {
// 当前没有线程持有写锁,读写互斥,告辞。
// 有线程持有写锁,持有写锁的线程不是当前线程,不是锁重入,告辞。
if (w == 0 || current != getExclusiveOwnerThread())
return false;
// 当前线程持有写锁。 锁重入。
if (w + exclusiveCount(acquires) > MAX_COUNT)
throw new Error("Maximum lock count exceeded");
// 没有超过锁重入的次数,正常 + 1
setState(c + acquires);
return true;
}
// 尝试获取锁资源
if (writerShouldBlock() ||
// CAS拿锁
!compareAndSetState(c, c + acquires))
return false;
// 拿锁成功,设置占有互斥锁的线程
setExclusiveOwnerThread(current);
// 返回true
return true;
}
// ================================================================
// 这个方法是将state的低16位的值拿到
int w = exclusiveCount(c);
state & ((1 << 16) - 1)
00000000 00000000 00000000 00000001 == 1
00000000 00000001 00000000 00000000 == 1 << 16
00000000 00000000 11111111 11111111 == (1 << 16) - 1
&运算,一个为0,必然为0,都为1,才为1
// ================================================================
// writerShouldBlock方法查看公平锁和非公平锁的效果
// 非公平锁直接返回false执行CAS尝试获取锁资源
// 公平锁需要查看是否有排队的,如果有排队的,我是否是head的next
释放的流程和ReentrantLock一致,只是在判断释放是否干净时,判断低16位的值
java// 写锁释放锁的tryRelease方法
protected final boolean tryRelease(int releases) {
// 判断当前持有写锁的线程是否是当前线程
if (!isHeldExclusively())
throw new IllegalMonitorStateException();
// 获取state - 1
int nextc = getState() - releases;
// 判断低16位结果是否为0,如果为0,free设置为true
boolean free = exclusiveCount(nextc) == 0;
if (free)
// 将持有锁的线程设置为null
setExclusiveOwnerThread(null);
// 设置给state
setState(nextc);
// 释放干净,返回true。 写锁有冲入,这里需要返回false,不去释放排队的Node
return free;
}
分析读锁加速的基本流程
分析读锁的可重入锁实现以及优化
解决ThreadLocal内存泄漏问题
读锁获取锁自后,如果唤醒AQS中排队的读线程
针对上述简单逻辑的源码分析
java// 读锁加锁的方法入口
public final void acquireShared(int arg) {
// 竞争锁资源滴干活
if (tryAcquireShared(arg) < 0)
// 没拿到锁资源,去排队
doAcquireShared(arg);
}
// 读锁竞争锁资源的操作
protected final int tryAcquireShared(int unused) {
// 拿到当前线程
Thread current = Thread.currentThread();
// 拿到state
int c = getState();
// 拿到state的低16位,判断 != 0,有写锁占用着锁资源
// 并且,当前占用锁资源的线程不是当前线程
if (exclusiveCount(c) != 0 && getExclusiveOwnerThread() != current)
// 写锁被其他线程占用,无法获取读锁,直接返回 -1,去排队
return -1;
// 没有线程持有写锁、当前线程持有写锁
// 获取读锁的信息,state的高16位。
int r = sharedCount(c);
// 公平锁:就查看队列是由有排队的,有排队的,直接告辞,进不去if,后面也不用判断(没人排队继续走)
// 非公平锁:没有排队的,直接抢。 有排队的,但是读锁其实不需要排队,如果出现这个情况,大部分是写锁资源刚刚释放,
// 后续Node还没有来记得拿到读锁资源,当前竞争的读线程,可以直接获取
if (!readerShouldBlock() &&
// 判断持有读锁的临界值是否达到
r < MAX_COUNT &&
// CAS修改state,对高16位进行 + 1
compareAndSetState(c, c + SHARED_UNIT)) {
// 省略部分代码!!!!
return 1;
}
return fullTryAcquireShared(current);
}
// 非公平锁的判断
final boolean apparentlyFirstQueuedIsExclusive() {
Node h, s;
return (h = head) != null && // head为null,可以直接抢占锁资源
(s = h.next) != null && // head的next为null,可以直接抢占锁资源
!s.isShared() && // 如果排在head后面的Node,是共享锁,可以直接抢占锁资源。
s.thread != null; // 后面排队的thread为null,可以直接抢占锁资源
}
前面阐述过,读锁为了记录锁重入的次数,需要让每个读线程用 ThreadLocal
存储重入次数,ReentrantReadWriteLock
对读锁重入做了一些优化操作
ReentrantReadWriteLock
在内部对ThreadLocal做了封装,基于HoldCount的对象存储重入次数,在内部有个count属性记录,而且每个线程都是自己的ThreadLocalHoldCounter
,所以可以直接对内部的count进行++操作。
第一个拿到读锁资源的线程,不需要通过 ThreadLocal
存储,内部提供了两个属性来记录第一个拿到读锁资源线程的信息
内部提供了 firstReader
记录第一个拿到读锁资源的线程,firstReaderHoldCount记录firstReader的锁重入次数
最后一个拿到读锁资源的线程,也会缓存他的重入次数,这样++起来更方便,基于cachedHoldCounter
缓存最后一个拿到锁资源现成的重入次数
重入次数的流程执行方式
判断当前线程是否是第一个拿到读锁资源的:如果是,直接将 firstReader
以及firstReaderHoldCount
设置为当前线程的信息
判断当前线程是否是 firstReader
:如果是,直接对 firstReaderHoldCount++
即可。
跟 firstReader
没关系了,先获取 cachedHoldCounter
,判断是否是当前线程。
如果不是,获取当前线程的重入次数,将cachedHoldCounter设置为当前线程。 如果是,判断当前重入次数是否为0,重新设置当前线程的锁从入信息到readHolds(ThreadLocal)中,算是初始化操作,重入次数是0 前面两者最后都做count++
上述逻辑源码分析
javaprotected final int tryAcquireShared(int unused) {
Thread current = Thread.currentThread();
int c = getState();
if (exclusiveCount(c) != 0 &&
getExclusiveOwnerThread() != current)
return -1;
int r = sharedCount(c);
if (!readerShouldBlock() &&
r < MAX_COUNT &&
compareAndSetState(c, c + SHARED_UNIT)) {
// ===============================================================
// 判断r == 0,当前是第一个拿到读锁资源的线程
if (r == 0) {
// 将firstReader设置为当前线程
firstReader = current;
// 将count设置为1
firstReaderHoldCount = 1;
}
// 判断当前线程是否是第一个获取读锁资源的线程
else if (firstReader == current) {
// 直接++。
firstReaderHoldCount++;
}
// 到这,就说明不是第一个获取读锁资源的线程
else {
// 那获取最后一个拿到读锁资源的线程
HoldCounter rh = cachedHoldCounter;
// 判断当前线程是否是最后一个拿到读锁资源的线程
if (rh == null || rh.tid != getThreadId(current))
// 如果不是,设置当前线程为cachedHoldCounter
cachedHoldCounter = rh = readHolds.get();
// 当前线程是之前的cacheHoldCounter
else if (rh.count == 0)
// 将当前的重入信息设置到ThreadLocal中
readHolds.set(rh);
// 重入的++
rh.count++;
}
// ===============================================================
return 1;
}
return fullTryAcquireShared(current);
}
java// tryAcquireShard方法中,如果没有拿到锁资源,走这个方法,尝试再次获取,逻辑跟上面基本一致。
final int fullTryAcquireShared(Thread current) {
// 声明当前线程的锁重入次数
HoldCounter rh = null;
// 死循环
for (;;) {
// 再次拿到state
int c = getState();
// 当前如果有写锁在占用锁资源,并且不是当前线程,返回-1,走排队策略
if (exclusiveCount(c) != 0) {
if (getExclusiveOwnerThread() != current)
return -1;
}
// 查看当前是否可以尝试竞争锁资源(公平锁和非公平锁的逻辑)
else if (readerShouldBlock()) {
// 无论公平还是非公平,只要进来,就代表要放到AQS队列中了,先做一波准备
// 在处理ThreadLocal的内存泄漏问题
if (firstReader == current) {
// 如果当前当前线程是之前的firstReader,什么都不用做
} else {
// 第一次进来是null。
if (rh == null) {
// 拿到最后一个获取读锁的线程
rh = cachedHoldCounter;
// 当前线程并不是cachedHoldCounter,没到拿到
if (rh == null || rh.tid != getThreadId(current)) {
// 从自己的ThreadLocal中拿到重入计数器
rh = readHolds.get();
// 如果计数器为0,说明之前没拿到过读锁资源
if (rh.count == 0)
// remove,避免内存泄漏
readHolds.remove();
}
}
// 前面处理完之后,直接返回-1
if (rh.count == 0)
return -1;
}
}
// 判断重入次数,是否超出阈值
if (sharedCount(c) == MAX_COUNT)
throw new Error("Maximum lock count exceeded");
// CAS尝试获取锁资源
if (compareAndSetState(c, c + SHARED_UNIT)) {
if (sharedCount(c) == 0) {
firstReader = current;
firstReaderHoldCount = 1;
} else if (firstReader == current) {
firstReaderHoldCount++;
} else {
if (rh == null)
rh = cachedHoldCounter;
if (rh == null || rh.tid != getThreadId(current))
rh = readHolds.get();
else if (rh.count == 0)
readHolds.set(rh);
rh.count++;
cachedHoldCounter = rh; // cache for release
}
return 1;
}
}
}
正常如果都是读线程来获取读锁资源,不需要使用到AQS队列的,直接CAS操作即可
如果写线程持有着写锁,这是读线程就需要进入到AQS队列排队,可能会有多个读线程在AQS中。
当写锁释放资源后,会唤醒head后面的读线程,当head后面的读线程拿到锁资源后,还需要查看next节点是否也是读线程在阻塞,如果是,直接唤醒
源码分析
java// 读锁需要排队的操作
private void doAcquireShared(int arg) {
// 声明Node,类型是共享锁,并且扔到AQS中排队
final Node node = addWaiter(Node.SHARED);
boolean failed = true;
try {
boolean interrupted = false;
for (;;) {
// 拿到上一个节点
final Node p = node.predecessor();
// 如果prev节点是head,直接可以执行tryAcquireShared
if (p == head) {
int r = tryAcquireShared(arg);
if (r >= 0) {
// 拿到读锁资源后,需要做的后续处理
setHeadAndPropagate(node, r);
p.next = null; // help GC
if (interrupted)
selfInterrupt();
failed = false;
return;
}
}
// 找到prev有效节点,将状态设置为-1,挂起当前线程
if (shouldParkAfterFailedAcquire(p, node) &&
parkAndCheckInterrupt())
interrupted = true;
}
} finally {
if (failed)
cancelAcquire(node);
}
}
private void setHeadAndPropagate(Node node, int propagate) {
// 拿到head节点
Node h = head;
// 将当前节点设置为head节点
setHead(node);
// 第一个判断更多的是在信号量有处理JDK1.5 BUG的操作。
if (propagate > 0 || h == null || h.waitStatus < 0 || (h = head) == null || h.waitStatus < 0) {
// 拿到当前Node的next节点
Node s = node.next;
// 如果next节点是共享锁,直接唤醒next节点
if (s == null || s.isShared())
doReleaseShared();
}
}
处理重入以及state的值
唤醒后续排队的Node
源码分析
java// 读锁释放锁流程
public final boolean releaseShared(int arg) {
// tryReleaseShared:处理state的值,以及可重入的内容
if (tryReleaseShared(arg)) {
// AQS队列的事!
doReleaseShared();
return true;
}
return false;
}
// 1、 处理重入问题 2、 处理state
protected final boolean tryReleaseShared(int unused) {
// 拿到当前线程
Thread current = Thread.currentThread();
// 如果是firstReader,直接干活,不需要ThreadLocal
if (firstReader == current) {
// assert firstReaderHoldCount > 0;
if (firstReaderHoldCount == 1)
firstReader = null;
else
firstReaderHoldCount--;
}
// 不是firstReader,从cachedHoldCounter以及ThreadLocal处理
else {
// 如果是cachedHoldCounter,正常--
HoldCounter rh = cachedHoldCounter;
// 如果不是cachedHoldCounter,从自己的ThreadLocal中拿
if (rh == null || rh.tid != getThreadId(current))
rh = readHolds.get();
int count = rh.count;
// 如果为1或者更小,当前线程就释放干净了,直接remove,避免value内存泄漏
if (count <= 1) {
readHolds.remove();
// 如果已经是0,没必要再unlock,扔个异常
if (count <= 0)
throw unmatchedUnlockException();
}
// -- 走你。
--rh.count;
}
for (;;) {
// 拿到state,高16位,-1,成功后,返回state是否为0
int c = getState();
int nextc = c - SHARED_UNIT;
if (compareAndSetState(c, nextc))
return nextc == 0;
}
}
// 唤醒AQS中排队的线程
private void doReleaseShared() {
// 死循环
for (;;) {
// 拿到头
Node h = head;
// 说明有排队的
if (h != null && h != tail) {
// 拿到head的状态
int ws = h.waitStatus;
// 判断是否为 -1
if (ws == Node.SIGNAL) {
// 到这,说明后面有挂起的线程,先基于CAS将head的状态从-1,改为0
if (!compareAndSetWaitStatus(h, Node.SIGNAL, 0))
continue;
// 唤醒后续节点
unparkSuccessor(h);
}
// 这里不是给读写锁准备的,在信号量里说。。。
else if (ws == 0 && !compareAndSetWaitStatus(h, 0, Node.PROPAGATE))
continue;
}
// 这里是出口
if (h == head)
break;
}
}
ReentrantReadWriteLock 通过读写分离策略,在保证线程安全的前提下大幅提升系统吞吐量。其精妙之处在于锁降级机制,既避免了数据不一致性,又减少了锁竞争开销。在分布式配置中心、实时风控系统等读密集型场景中,合理使用读写锁可使性能提升数倍。
技术选型需结合具体场景:当写操作超过40%时,建议回归互斥锁;对于极端高性能需求,可考虑 StampedLock 的乐观读模式。
本文作者:柳始恭
本文链接:
版权声明:本博客所有文章除特别声明外,均采用 BY-NC-SA 许可协议。转载请注明出处!