在多线程编程中,锁是保证线程安全的核心机制之一。Java除了提供 synchronized
关键字外,还通过 java.util.concurrent.locks 包提供了更灵活的锁实现。ReentrantLock
是Java并发包中一个重要的锁实现,它提供了比 synchronized
更灵活的锁操作。接下来我们深入了解下Java中的可重入锁。
ReentrantLock
是Java并发包(java.util.concurrent.locks)中的一个可重入互斥锁实现。与synchronized
相比,它提供了以下增强功能:
可重入性
同一线程可以多次获得同一把锁,注意避免死锁(例如递归调用场景)
公平性
支持公平锁与非公平锁两种模式 公平锁:按等待顺序分配锁(构造参数true) 非公平锁:允许插队(默认模式,吞吐量更高)
锁等待可中断
lockInterruptibly() 方法允许在等待锁时响应中断
超时获取锁
tryLock() 方法可以指定超时时间
条件变量支持
通过 newCondition() 创建多个条件队列
代码示例:
javapublic class Counter {
private final ReentrantLock lock = new ReentrantLock();
private int count = 0;
public void increment() {
lock.lock(); // 阻塞获取锁
try {
count++;
} finally {
lock.unlock(); // 必须在finally中释放锁
}
}
}
关键点
锁释放必须放在finally中,防止异常导致死锁。
基于AQS独占模式方式实现的互斥锁,其内部类分为 公平锁、非公平锁
非公平锁允许"插队",当一个线程请求非公平锁时,如果在发出请求的同时该锁的状态变为可用,那么这个线程将跳过队列中所有等待的线程并获得这个锁
对于非公平锁,只要CAS设置 同步状态成功,则表示当前线程获取了锁
构造函数
ReentrantLock
对象默认构建时就是非公平锁
javapublic ReentrantLock() {
sync = new NonfairSync();
}
非公平锁要比公平锁性能高,减少了锁的上下文切换与调度时间,在没有必要严格执行线程顺序时,非公平锁是最好的选择,所以定义为默认
公平的获取锁,也就是等待时间最长的线程最优先获取锁,也可以说锁获取是顺序的,根据AQS底层的FIFO的双向队列排队
构造函数
控制锁是否是公平锁
javapublic ReentrantLock(boolean fair) {
sync = fair ? new FairSync() : new NonfairSync();
}
ReentrantLock
底层是基于AQS实现的,通过AQS维护的 volatile
的 state
变量来实现锁操作
对于锁的状态控制与可重入性实现,是通过AQS的 state
变量表示的,state
值为0时,表示锁未被占用,当 state
值不为0时,表示锁被占用,数值则是重入的次数。
等待队列是使用CLH变体实现线程排队,如果是公平锁模式,则严格按 FIFO 先进先出的顺序获取锁,如果是非公平锁模式,那每次新的线程会插队获取一次锁
可重入性
线程重复n次获取了锁,也就是重入了n次,随后在第n次释放该锁后,其他线程能够获取到该锁。锁的最终释放要求锁对于获取进行计数自增,计数表示当前锁被重复获取的次数,而锁被释放时,计数自减,当计数等于0时表示锁已经成功释放
清楚lock方法是如何实现让当前线程获取到锁资源(什么效果算是拿到了锁资源)
acquire
方法state
从0修改为1,如果成功,代表获取锁资源。如果没有成功,调用 acquire
公平&非公平的方法源码
java// 公平锁的sync的lock方法
final void lock() {
acquire(1);
}
// 非公平锁的sync的lock方法
final void lock() {
if (compareAndSetState(0, 1))
setExclusiveOwnerThread(Thread.currentThread());
else
acquire(1);
}
如果线程修改state失败怎么办?
如果线程没有拿到锁资源,会到AQS的双向链表中排队等待(在期间,线程可能会挂起)
AQS的双向链表(队列)是个啥?
AQS中的双向链表是基于内部类Node在维护,Node中包含prev,next,thread属性,并且在AQS中还有两个属性,分别是head,tail。
AQS的核心
acquire是一个业务方法,里面并没有实际的业务处理,都是在调用其他方法
java// 核心acquire arg = 1
public final void acquire(int arg) {
//1. 调用tryAcquire方法:尝试获取锁资源(非公平、公平),拿到锁资源,返回true,直接结束方法。 没有拿到锁资源,
// 需要执行&&后面的方法
//2. 当没有获取锁资源后,会先调用addWaiter:会将没有获取到锁资源的线程封装为Node对象,
// 并且插入到AQS的队列的末尾,并且作为tail
//3. 继续调用acquireQueued方法,查看当前排队的Node是否在队列的前面,如果在前面(head的next),尝试获取锁资源
// 如果没在前面,尝试将线程挂起,阻塞起来!
if (!tryAcquire(arg) &&
acquireQueued(addWaiter(Node.EXCLUSIVE), arg))
selfInterrupt();
}
tryAcquire分为公平和非公平两种实现,主要做了两件事:
java// 非公平锁实现!
final boolean nonfairTryAcquire(int acquires) {
// 拿到当前线程!
final Thread current = Thread.currentThread();
// 拿到AQS的state
int c = getState();
// 如果state == 0,说明没有线程占用着当前的锁资源
if (c == 0) {
// 没人占用锁资源,我直接抢一波(不管有没有线程在排队)
if (compareAndSetState(0, acquires)) {
// 将当前占用这个互斥锁的线程属性设置为当前线程
setExclusiveOwnerThread(current);
// 返回true,拿锁成功
return true;
}
}
// 当前state != 0,说明有线程占用着锁资源
// 判断拿着锁的线程是不是当前线程(锁重入)
else if (current == getExclusiveOwnerThread()) {
// 将state再次+1
int nextc = c + acquires;
// 锁重入是否超过最大限制
// 01111111 11111111 11111111 11111111 + 1
// 10000000 00000000 00000000 00000000
// 抛出error
if (nextc < 0)
throw new Error("Maximum lock count exceeded");
// 将值设置给state
setState(nextc);
// 返回true,拿锁成功
return true;
}
return false;
}
java// 公平锁实现
protected final boolean tryAcquire(int acquires) {
// 拿到当前线程!
final Thread current = Thread.currentThread();
// 拿到AQS的state
int c = getState();
// 阿巴阿巴~~~~
if (c == 0) {
// 判断是否有线程在排队,如果有线程排队,返回true,配上前面的!,那会直接不执行返回最外层的false
if (!hasQueuedPredecessors() &&
// 如果没有线程排队,直接CAS尝试获取锁资源
compareAndSetState(0, acquires)) {
setExclusiveOwnerThread(current);
return true;
}
}
else if (current == getExclusiveOwnerThread()) {
int nextc = c + acquires;
if (nextc < 0)
throw new Error("Maximum lock count exceeded");
setState(nextc);
return true;
}
return false;
}
在获取锁资源失败后,需要将当前线程封装为Node对象,并且插入到AQS队列的末尾
java// 将当前线程封装为Node对象,并且插入到AQS队列的末尾
private Node addWaiter(Node mode) {
// 将当前线程封装为Node对象,mode为null,代表互斥锁
Node node = new Node(Thread.currentThread(), mode);
// pred是tail节点
Node pred = tail;
// 如果pred不为null,有线程正在排队
if (pred != null) {
// 将当前节点的prev,指定tail尾节点
node.prev = pred;
// 以CAS的方式,将当前节点变为tail节点
if (compareAndSetTail(pred, node)) {
// 之前的tail的next指向当前节点
pred.next = node;
return node;
}
}
// 添加的流程为, 自己prev指向、tail指向自己、前节点next指向我
// 如果上述方式,CAS操作失败,导致加入到AQS末尾失败,如果失败,就基于enq的方式添加到AQS队列
enq(node);
return node;
}
// enq,无论怎样都添加进入
private Node enq(final Node node) {
for (;;) {
// 拿到tail
Node t = tail;
// 如果tail为null,说明当前没有Node在队列中
if (t == null) {
// 创建一个新的Node作为head,并且将tail和head指向一个Node
if (compareAndSetHead(new Node()))
tail = head;
} else {
// 和上述代码一致!
node.prev = t;
if (compareAndSetTail(t, node)) {
t.next = node;
return t;
}
}
}
}
acquireQueued方法会查看当前排队的Node是否是head的next,如果是,尝试获取锁资源,如果不是或者获取锁资源失败那么就尝试将当前Node的线程挂起(unsafe.park())
在挂起线程前,需要确认当前节点的上一个节点的状态必须是小于等于0,
java// acquireQueued方法
// 查看当前排队的Node是否是head的next,
// 如果是,尝试获取锁资源,
// 如果不是或者获取锁资源失败那么就尝试将当前Node的线程挂起(unsafe.park())
final boolean acquireQueued(final Node node, int arg) {
// 标识。
boolean failed = true;
try {
// 循环走起
for (;;) {
// 拿到上一个节点
final Node p = node.predecessor();
if (p == head && // 说明当前节点是head的next
tryAcquire(arg)) { // 竞争锁资源,成功:true,失败:false
// 进来说明拿到锁资源成功
// 将当前节点置位head,thread和prev属性置位null
setHead(node);
// 帮助快速GC
p.next = null;
// 设置获取锁资源成功
failed = false;
// 不管线程中断。
return interrupted;
}
// 如果不是或者获取锁资源失败,尝试将线程挂起
// 第一个事情,当前节点的上一个节点的状态正常!
// 第二个事情,挂起线程
if (shouldParkAfterFailedAcquire(p, node) &&
// 通过LockSupport将当前线程挂起
parkAndCheckInterrupt())
}
} finally {
if (failed)
cancelAcquire(node);
}
}
// 确保上一个节点状态是正确的
private static boolean shouldParkAfterFailedAcquire(Node pred, Node node) {
// 拿到上一个节点的状态
int ws = pred.waitStatus;
// 如果上一个节点为 -1
if (ws == Node.SIGNAL)
// 返回true,挂起线程
return true;
// 如果上一个节点是取消状态
if (ws > 0) {
// 循环往前找,找到一个状态小于等于0的节点
do {
node.prev = pred = pred.prev;
} while (pred.waitStatus > 0);
pred.next = node;
} else {
// 将小于等于0的节点状态该为-1
compareAndSetWaitStatus(pred, ws, Node.SIGNAL);
}
return false;
}
释放锁资源:
释放锁不分公平和非公平,就一个方法。
java// 真正释放锁资源的方法
public final boolean release(int arg) {
// 核心的释放锁资源方法
if (tryRelease(arg)) {
// 释放锁资源释放干净了。 (state == 0)
Node h = head;
// 如果头节点不为null,并且头节点的状态不为0,唤醒排队的线程
if (h != null && h.waitStatus != 0)、
// 唤醒线程
unparkSuccessor(h);
return true;
}
// 释放锁成功,但是state != 0
return false;
}
// 核心的释放锁资源方法
protected final boolean tryRelease(int releases) {
// 获取state - 1
int c = getState() - releases;
// 如果释放锁的线程不是占用锁的线程,抛异常
if (Thread.currentThread() != getExclusiveOwnerThread())
throw new IllegalMonitorStateException();
// 是否成功的将锁资源释放利索 (state == 0)
boolean free = false;
if (c == 0) {
// 锁资源释放干净。
free = true;
// 将占用锁资源的属性设置为null
setExclusiveOwnerThread(null);
}
// 将state赋值
setState(c);
// 返回true,代表释放干净了
return free;
}
// 唤醒节点
private void unparkSuccessor(Node node) {
// 拿到头节点状态
int ws = node.waitStatus;
// 如果头节点状态小于0,换为0
if (ws < 0)
compareAndSetWaitStatus(node, ws, 0);
// 拿到当前节点的next
Node s = node.next;
// 如果s == null ,或者s的状态为1
if (s == null || s.waitStatus > 0) {
// next节点不需要唤醒,需要唤醒next的next
s = null;
// 从尾部往前找,找到状态正常的节点。(小于等于0代表正常状态)
for (Node t = tail; t != null && t != node; t = t.prev)
if (t.waitStatus <= 0)
s = t;
}
// 经过循环的获取,如果拿到状态正常的节点,并且不为null
if (s != null)
// 唤醒线程
LockSupport.unpark(s.thread);
}
为什么唤醒线程时,为啥从尾部往前找,而不是从前往后找?
因为在addWaiter操作时,是先将当前Node的prev指针指向前面的节点,然后是将tail赋值给当前Node,最后才是能上一个节点的next指针,指向当前Node。
如果从前往后,通过next去找,可能会丢失某个节点,导致这个节点不会被唤醒~ 如果从后往前找,肯定可以找到全部的节点。
锁释放:必须在finally中调用unlock(),否则可能导致死锁。
避免嵌套:过度嵌套锁会增加死锁风险,尽量降低锁粒度。
性能考量:非公平锁在竞争激烈时吞吐量更高,但可能引发 线程饥饿。
监控工具:使用getHoldCount()、isHeldByCurrentThread()等辅助调试。
什么是线程饥饿?
线程饥饿(Thread Starvation)是指某些线程因长时间无法获得所需的资源或执行机会,导致它们长时间处于等待状态的现象。这种现象通常是由于系统资源分配不公平或不合理造成的
线程饥饿的原因
1)不公平的资源分配策略:例如,在优先级调度中,如果高优先级的线程持续占用资源,低优先级的线程可能永远得不到执行机会
2)锁的非公平实现:如果使用非公平锁(如默认的std::mutex),某些线程可能长期被排在队列后面
3)资源占用时间过长:某些线程长时间持有资源,导致其他线程无法获取该资源
4)死循环或不释放资源:如果某个线程获取资源后进入死循环或迟迟不释放资源,其他线程会一直等待
特性 | ReentrantLock | synchronized |
---|---|---|
锁获取方式 | 显式调用lock()/unlock() | 隐式(代码块/方法) |
公平性 | 支持公平/非公平模式 | 非公平 |
锁中断 | ✅ lockInterruptibly() | ❌ |
超时尝试 | ✅ tryLock(timeout) | ❌ |
条件变量 | ✅ 多Condition支持 | 单一wait()/notify() |
锁释放 | 必须手动unlock()(防死锁) | 自动释放 |
ReentrantLock通过更灵活的API弥补了synchronized的局限性,尤其适合复杂并发场景。但其手动管理特性也要求开发者更谨慎地处理锁的获取与释放。
与 synchronized 进行选择
ReentrantLock 的危险性比 synchronized 要高,如果忘记 finally 块中 unlock 释放锁,会导致锁一直无法释放,阻塞后续线程
优先考虑synchronized,只有当 synchronized 无法满足需求,需要高级功能(如公平性、条件变量)时才考虑使用 ReentrantLock,否则,还是应该优先使用 synchronized
本文作者:柳始恭
本文链接:
版权声明:本博客所有文章除特别声明外,均采用 BY-NC-SA 许可协议。转载请注明出处!