Java AQS 原理与 ReentrantLock 实现方法
目录
- 一、AQS 简介
- 二、AQS 核心设计
- 2.1 核心组成部分
- 2.2 AQS 的工作原理
- 2.3 AQS 的关键方法
- 三、ReentrantLock 与 AQS 的关系
- 3.1 ReentrantLock 的结构
- 3.2 ReentrantLock 如何使用 AQS 的 state
- 四、AQS 关键流程分析
- 4.1 独占锁的获取流程
- 4.2 独占锁的释放流程
- 五、公平锁与非公平锁
- 5.1 非公平锁(默认)
- 5.2 公平锁
- 六、自定义实现:简化版 ReentrantLock
- 七、Condition 实现原理
- 八、AQS 的应用场景
- 九、总结
一、AQS 简介
AbstractQueuedSynchronizer(简称 AQS)是 Java 并发包(java.util.concurrent)中最核心的基础组件之一,它为 Java 中的大多数同步类(如 ReentrantLock、Semaphore、CountDownLatch 等)提供了一个通用的框架。理解 AQS 的工作原理对于深入掌握 Java 并发编程至关重要。
AQS 的作用是解决同步器的实现问题,它将复杂的同步器实现分解为简单的框架方法,开发者只需要实现少量特定的方法就能快速构建出可靠的同步器。
二、AQS 核心设计
2.1 核心组成部分
AQS 主要由以下部分组成:
- 同步状态(state):使用 volatile int 类型的变量表示资源的可用状态
- FIFO 等待队列:使用双向链表实现的队列,用于管理等待获取资源的线程
- 独占/共享模式:支持独占锁(如 ReentrantLock)和共享锁(如 CountDownLatch)两种模式
- 条件变量:通过 ConditionObject 类提供条件等待/通知机制,类似于 Object.wait()/notify()
2.2 AQS 的工作原理
AQS 通过模板方法模式,将一些通用的同步操作封装在框架内部,而将特定同步器的特性(如资源是否可获取的判断)交给子类去实现。AQS 提供以下基本操作:
- 资源获取:线程尝试获取资源,如果获取不到,将被包装成 Node 加入等待队列并被阻塞
- 资源释放:持有资源的线程释放资源后,会唤醒等待队列中的下一个线程
- 线程阻塞与唤醒:通过 LockSupport 的 park/unpark 机制实现
2.3 AQS 的关键方法
AQS 定义了一组需要子类实现的方法:
- tryAcquire(int):尝试以独占模式获取资源
- tryRelease(int):尝试以独占模式释放资源
- tryAcquireShared(int):尝试以共享模式获取资源
- tryReleaseShared(int):尝试以共享模式释放资源
- isHeldExclusively():判断资源是否被当前线程独占
三、ReentrantLock 与 AQS 的关系
ReentrantLock 是基于 AQS 实现的可重入锁,它通过内部类 Sync(继承自 AQS)来实现锁的基本功能,并通过 FairSync 和 NonfairSync 两个子类分别实现公平锁和非公平锁。
3.1 ReentrantLock 的结构
public class ReentrantLock implements Lock { private final Sync sync; abstract static class Sync extends AbstractQueuedSynchronizer { // 实现锁的基本操作 } // 公平锁实现 static final class FairSync extends Sync { ... } // 非公平锁实现 static final class NonfairSync extends Sync { ... } }
3.2 ReentrantLock 如何使用 AQS 的 state
ReentrantLock 使用 AQS 的 state 字段来表示锁的持有次数:
- state = 0:表示锁FgvEBV未被持有
- state > 0:表示锁被持有,值表示重入次数
四、AQS 关键流程分析
4.1 独占锁的获取流程
当线程调用 ReentrantLock.lock()方法时,实际上会执行以下流程:
- 首先调用 AQS 的 acquire(1)方法:
public final void acquire(int arg) { if (!tryAcquire(arg) && acquireQueued(addwaiter(Node.EXCLUSIVE), arg)) selfInterrupt(); }
tryAcquire 尝试获取锁,这是由 ReentrantLock 的 Sync 子类实现的:
- 如果 state=0,尝试使用 CAS 将 state 设为 1,并设置当前线程为持有锁的线程
- 如果当前线程已经持有锁,则增加 state 值,实现可重入
- 其他情况下返回 false
如果 tryAcquire 失败,则调用 addWaiter 将当前线程封装成 Node 添加到等待队列末尾:
private Node addWaiter(Node mode) { Node node = new Node(Thread.currentThread(), mode); // 尝试快速添加到队列尾部 Node pred = tail; if (pred != null) { node.prev = pred; if (compareAndSetTail(pred, node)) { pred.next = node; return node; } } // 快速添加失败,进入完整的入队方法 enq(node); return node; }
- 然后执行 acquireQueued 方法,让该节点在队列中不断尝试获取锁,直到成功或被中断:
final boolean acquireQueued(final Node node, int arg) { boolean failed = true; try { boolean interrupted = false; for (;;) { // 获取前驱节点 final Node p = node.predecessor(); // 如果前驱是头节点,说明轮到当前节点尝试获取锁 if (p == head && tryAcquire(arg)) { // 获取成功,把当前节点设为头节点 setHead(node); p.next = null; // help GC failed = false; return interrupted; } // 判断是否应该阻塞当前线程 if (shouldParkAfterFailedAcquire(p, node) && parkAndCheckInterrupt()) interrupted = true; } } finally { if (failed) cancelAcquire(node); } }
4.2 独占锁的释放流程
当线程调用 ReentrantLock.unlock()方法时,会执行以下流程:
- 首先调用 AQS 的 release(1)方法:
public final boolean release(int arg) { if (tryRelease(arg)) { Node h = head; if (h != null && h.waitStatus != 0) unparkSuccessor(h); return true; } return false; }
tryRelease 尝试释放锁,这是由 ReentrantLock 的 Sync 类实现的:
- 检查当前线程是否是持有锁的线程
- 减少 state 值
- 如果 state 变为 0,清空持有锁的线程,并返回 true
如果 tryRelease 返回 true,表示已完全释放锁,则调用 unparkSuccessor 唤醒等待队列中的下一个线程:
private void unparkSuccessor(Node node) { // 获取当前节点的等待状态 int ws = node.waitStatus; if (ws < 0) compareAndSetWaitStatus(node, ws, 0); // 找到下一个需要唤醒的节点 Node s = node.next; if (s == null || s.waitStatus > 0) { s = null; // 从尾部向前查找需要唤醒的节点 for (Node t = tail; t != null && t != node; t = t.prev) if (t.waitStatus <= 0) s = t; } // 唤醒找到的节点 if (s != null) LockSupport.unpark(s.thread); }
五、公平锁与非公平锁
ReentrantLock 支持公平锁和非公平锁两种模式:
5.1 非公平锁(默认)
非公平锁的 tryAcquire 实现:
protected final boolean tryAcquire(int acquires) { return nonfairTryAcquire(acquires); } final boolean nonfairTryAcquire(int acquires) { final Thread current = Thread.currentThread(); int c = getState(); if (c == 0) { // 非公平锁直接尝试CAS获取锁,不检查队列 if (compareAndSetState(0, acquires)) { setExclusiveOwnerThread(current); return true; } } else if (current == getExclusiveOwnerThread()) { int nextc = c + acquires; if (nextc < 0) throw new Error("Maximum lock count exceeded"); setState(nextc); return true; } return false; }
5.2 公平锁
公平锁的 tryAcquire 实现:
protected final boolean tryAcquire(int acquires) { final Thread current = Thread.currentThread(); int c = getState(); if (c == 0) { // 公平锁会先调用hasQueuedPredecessors检查是否有前驱节点在等待 if (!hasQueuedPredecessors() && compareAndSetState(0, acquires)) { setExclusiveOwnerThread(current); return true; } } else if (current == getExclusiveOwnerThread()) { int nextc = c + acquires; if (nextc < 0) throw new Error("Maximum lock count exceeded"); setState(nextc); return true; } return false; }
公平锁与非公平锁的主要区别在于获取锁时是否考虑等待队列。公平锁会检查是否有线程在等待队列中排队,而非公平锁则直接尝试获取,不考虑等待顺序。
六、自定义实现:简化版编程 ReentrantLock
为了更深入理解 AQS 原理,我们可以实现一个简化版的 ReentrantLock:
public class SimpleReentrantLock implements Lock { private final Sync sync; /** * 默认创建非公平锁 */ public SimpleReentrantLock() { sync = new NonfairSync(); } /** * 根据参数创建公平锁或非公平锁 */ public SimpleReentrantLock(boolean fair) { sync = fair ? new FairSync() : new NonfairSync(); } /** * 继承AQS的同步器实现 */ abstract static class Sync extends AbstractQueuedSynchronizer { private static final long serialVersionUID = -5179523762034025860L; /** * 非公平的方式获取锁 */ final boolean unfairTryAcquire(int acquires) { // 获取当前线程 final Thread current = Thread.currentThread(); // 获取当前state状态 int c = getState(); // state为0表示锁未被持有 if (c == 0) { // 使用CAS尝试将state从0设置为1 if (compareAndSetState(0, acquires)) { // 成功获取锁,设置当前持有锁的线程为当前线程 setExclusiveOwnerThread(current); return true; } } // 如果当前线程就是持有锁的线程,实现可重入 else if (current == getExclusiveOwnerThread()) { // 增加state值实现重入计数 int nextC = c + acquires; // 检查溢出 if (nextC < 0) { throw new Error("Maximum lock count exceeded"); } // 设置新的state值,这里不需要CAS因为当前线程已经持有锁 setState(nextC); return true; } // 获取锁失败 return false; } /** * 释放锁 */ @Override protected final boolean tryRelease(int releases) { // 检查当前线程是否是持有锁的线程 if (Thread.currentThread() != getExclusiveOwnerThread()) { throw new IllegalMonitorStateException(); } // 减少state值 int c = getState() - releases; // 判断是否完全释放锁 boolean free = (c == 0); if (free) { // 完全释放锁,清空持有锁的线程 setExclusiveOwnerThread(null); } // 更新state值 setState(c); return free; } /** * 判断当前线程是否持有锁 */ @Override protected boolean isHeldExclusively() { return getExclusiveOwnerThread() == Thread.currentThread(); } /** * 创建条件变量 */ Condition newCondition() { return new ConditionObject(); } /** * 获取锁的持有计数 */ public int getHoldCount() { return isHeldExclusively() ? getState() : 0; } } /** * 公平锁的实现 */ static final class FairSync extends Sync { private static fiandroidnal long serialVersionUID = -3000897897090466540L; @Override protected boolean tryAcquire(int acquires) { final Thread current = Thread.currentThread(); int c = getState(); if (c == 0) { // 公平性体现:先检查队列中是否有前驱节点在等待 if (!hasQueuedPredecessors() && compareAndSetState(0, acquires)) { setExclusiveOwnerThread(current); return true; } } else if (current == getExclusiveOwnerThread()) { int nextC = c + acquires; if (nextC < 0) { throw new Error("Maximum lock count exceeded"); } setState(nextC); return true; } return false; } } /** * 非公平锁的实现 */ static final class NonfairSync extends Sync { private static final long serialVersionUID = 7316153563782823691L; /** * 非公平锁的获取实现 */ @Override protected boolean tryAcquire(int acquires) { return unfairTryAcquire(acquires); } } // 实现Lock接口的方法 @Override public void lock() { sync.acquire(1); } @Override public void lockInterruptibly() throws InterruptedException { sync.acquireInterruptibly(1); } @Override public boolean tryLock() { return sync.unfairTryAcquire(1); } @Override public boolean tryLock(long time, TimeUnit unit) throws InterruptedException { return sync.tryAcquireNanos(1, unit.toNanos(time)); } @Override public void unlock() { sync.release(1); } @Override public Condition newCondition() { return sync.newCondition(); } /** * 查询当前锁是否被某个线程持有 */ public boolean isLocked() { return sync.isHeldExclusively(); } /** * 查询当前线程是否持有锁 */ public boolean isHeldByCurrentThread() { return sync.isHeldExclusively(); } /** * 获取当前锁的持有计数 */ public int getHoldCount() { return sync.getHoldCount(); } }
七、Condition 实现原理
AQS 提供了 ConditionObject 内部类,用于实现 Condition 接口,支持类似 wait/notify 的条件等待/通知机制:
- 条件队列:每个 Condition 维护一个单独的条件队列,与 AQS 同步队列相互独立
- await 操作:将当前线程加入条件队列,并释放持有的锁
- signal 操作:将条件队列中的线程转移到同步队列,等待重新获取锁
public final void await() throws InterruptedException { if (Thread.interrupted()) throw new InterruptedException(); // 添加到条件队列 Node node = addConditionWaiter(); // 完全释放锁 int savedState = fullyRelease(node); int interruptMode = 0; // 循环检查节点是否已经转移到同步队列 while (!isOnSyncQueue(node)) { // 阻塞当前线程 LockSupport.park(this); // 检查中断 if ((interruptMode = checkInterruptWhileWaiting(node)) != 0) break; } 编程客栈 // 重新竞争锁 if (acquireQueued(node, savedState) && interruptMode != THROW_IE) interruptMode = REINTERRUPT; if (node.nextWaiter != null) unlinkCancelledWaiters(); if (interruptMode != 0) reportInterruptAfterWait(interruptMode); }
八、AQS 的应用场景
AQS 广泛应用于 Java 并发包中的各种同步器:
- ReentrantLock:可重入独占锁
- Semaphore:信号量,控js制同时访问特定资源的线程数量
- CountDownLatch:闭锁,允许一个或多个线程等待一组操作完成
- ReentrantReadWriteLock:读写锁,允许多个线程同时读,但只允许一个线程写
- CyclicBarrier:循环栅栏,允许一组线程相互等待达到一个共同点
九、总结
AQS 是 Java 并发框架中最核心的基础组件,它通过以下机制实现了高效的线程同步:
- 状态管理:使用 volatile 变量和 CAS 操作保证线程安全
- 队列管理:使用 CLH 队列高效管理等待线程
- 阻塞原语:使用 LockSupport 实现线程的阻塞和唤醒
- 模板方法模式:将通用逻辑和特定逻辑分离,提高可扩展性
理解 AQS 的工作原理,不仅有助于更好地使用 Java 并发包中的同步器,也能帮助我们在必要时实现自己的高效同步器。AQS 通过简洁的设计将复杂的同步器问题分解为少量的基本方法,使得开发者能够快速实现各种同步器。ReentrantLock 相比 synchronized 提供了更多的功能,如可中断、超时等待、公平性选择等。
到此这篇关于深入理解 Java AQS 原理与 ReentrantLock 实现的文章就介绍到这了,更多相关Java AQS 原理内容请搜索编程客栈(www.devze.com)以前的文章或继续浏览下面的相关文章希望大家以后多多支持编程客栈(www.devze.com)!
精彩评论