Java 并发编程技术深度精讲
本文从底层原理出发,结合源码分析与类比思维,深入剖析 Java 并发编程的核心知识体系。 每个知识点配有 ASCII 图解、设计动机分析和一句话总结。
一、Java 内存模型(JMM)
1.1 为什么需要 JMM
类比:咖啡店模型
想象一家大型咖啡店,有一个总仓库(主内存)和多个咖啡师工作台(工作内存)。
+---------------------+
| 主内存(仓库) |
| sugar = 100 |
| milk = 200 |
+-----+--------+------+
| |
read/load read/load
| |
+----------+--+ +--+----------+
| 工作内存 A | | 工作内存 B |
| (咖啡师小张) | | (咖啡师小李) |
| sugar = 100 | | sugar = 100 |
+--------------+ +--------------+
问题一:可见性问题
小张在自己的工作台上把 sugar 减少到了 80(assign + store + write 回仓库),但小李工作台上的 sugar 还是 100——他根本不知道仓库里的 sugar 已经变了。这就是可见性问题:一个线程对共享变量的修改,另一个线程无法立即看到。
问题二:有序性问题
经理写了一张操作清单:“先加糖,再加奶,最后搅拌”。但咖啡师为了效率,可能自己调整顺序——“先加奶,再加糖,最后搅拌”。在单人操作时结果不变,但如果两个咖啡师需要协作(比如一个加糖后另一个才能加奶),这种重排序就会出问题。这就是有序性问题。
问题三:原子性问题
小张正在称 sugar(读取重量 → 舀一勺 → 放回去),这个操作不是一瞬间完成的。如果在他称到一半时小李也来取 sugar,就可能出现数据混乱。这就是原子性问题。
JMM 就是 Java 定义的一套规范,它规定了:
- 线程如何与主内存交互
- 什么条件下一个线程的修改对另一个线程可见
- 哪些操作可以重排序,哪些不可以
JMM 屏蔽了底层硬件和操作系统的差异,让 Java 程序在不同平台上有一致的并发行为。
1.2 JMM 的核心规则
JMM 定义了 8 种内存交互操作,精确描述了变量如何从主内存到工作内存、再从工作内存回到主内存的全过程:
主内存 工作内存
+----------------+ +----------------+
| | ① read | |
| 变量 X = 10 | -------------> | (传输中) |
| | ② load | |
| | -------------> | 变量副本 X=10 |
| | | |
| | ③ use | |
| | <------------- | --> 执行引擎 |
| | | |
| | ④ assign | |
| | -------------> | X = 20 |
| | | (执行引擎写回) |
| | ⑤ store | |
| | <------------- | (传输中) |
| | ⑥ write | |
| 变量 X = 20 | <------------- | |
| | | |
| ⑦ lock(加锁) | | |
| ⑧ unlock(解锁)| | |
+----------------+ +----------------+
8 种操作详解:
| 操作 | 作用域 | 说明 |
|---|---|---|
| read(读取) | 主内存 | 将变量值从主内存传输到工作内存 |
| load(载入) | 工作内存 | 将 read 得到的值放入工作内存的变量副本 |
| use(使用) | 工作内存 | 将变量副本的值传递给执行引擎 |
| assign(赋值) | 工作内存 | 将执行引擎的值赋给工作内存的变量副本 |
| store(存储) | 工作内存 | 将变量副本的值传输到主内存 |
| write(写入) | 主内存 | 将 store 得到的值写入主内存的变量 |
| lock(锁定) | 主内存 | 将变量标识为线程独占 |
| unlock(解锁) | 主内存 | 释放变量的线程独占标识 |
关键约束规则:
- read 和 load、store 和 write 必须成对出现,不允许单独使用
- 不允许线程丢弃最近的 assign 操作(修改后必须同步回主内存)
- 不允许线程无原因地把工作内存的值同步回主内存(没有 assign 就不能 store + write)
- 新变量只能在主内存中诞生
- 一个变量同一时刻只允许一条线程对其 lock,但可以被同一线程多次 lock(可重入)
- lock 操作会清空工作内存中该变量的值,使用前需重新 load 或 assign
- unlock 之前必须先把变量同步回主内存(store + write)
1.3 happens-before 原则
happens-before 是 JMM 的核心概念。如果操作 A happens-before 操作 B,那么 A 的结果对 B 一定可见。注意:happens-before 不是说 A 一定在 B 之前执行,而是说 A 的执行结果对 B 可见。
8 条 happens-before 规则:
规则 1:程序顺序规则(Program Order Rule)
在一个线程中,按照程序代码顺序,前面的操作 happens-before 后面的操作。
// 在同一个线程中:
int a = 1; // 操作 A
int b = a + 1; // 操作 B
// A happens-before B,所以 b 一定等于 2
// 注意:编译器可以重排序,但必须保证结果一致(as-if-serial 语义)
规则 2:监视器锁规则(Monitor Lock Rule)
对一个锁的 unlock 操作 happens-before 后续对同一个锁的 lock 操作。
synchronized (lock) {
// 操作 A:x = 10
}
// unlock(lock) happens-before 下一次 lock(lock)
synchronized (lock) {
// 操作 B:读取 x,一定能看到 10
}
规则 3:volatile 变量规则(Volatile Variable Rule)
对一个 volatile 变量的写操作 happens-before 后续对同一 volatile 变量的读操作。
volatile boolean flag = false;
int data = 0;
// 线程 A
data = 42; // 普通写
flag = true; // volatile 写(happens-before 线程 B 的 volatile 读)
// 线程 B
if (flag) { // volatile 读
System.out.println(data); // 一定输出 42
}
规则 4:线程启动规则(Thread Start Rule)
Thread 对象的 start() 方法 happens-before 此线程的每一个动作。
int x = 10;
Thread t = new Thread(() -> {
// 这里一定能看到 x = 10
System.out.println(x); // 一定输出 10
});
t.start(); // start() happens-before 线程 t 中的所有操作
规则 5:线程终止规则(Thread Termination Rule)
线程中的所有操作 happens-before 对此线程的终止检测(Thread.join() 或 Thread.isAlive())。
Thread t = new Thread(() -> {
data = 42; // 线程 t 中的操作
});
t.start();
t.join(); // join() 返回意味着线程 t 中的所有操作 happens-before 这一行之后的代码
System.out.println(data); // 一定能看到 42
规则 6:线程中断规则(Thread Interruption Rule)
对线程 interrupt() 的调用 happens-before 被中断线程检测到中断事件(Thread.interrupted() 或 isInterrupted())。
// 线程 A
sharedData = 99;
thread.interrupt(); // 中断操作 happens-before 线程 B 检测到中断
// 线程 B(被中断的线程)
if (Thread.interrupted()) {
// 一定能看到 sharedData = 99
}
规则 7:对象终结规则(Finalizer Rule)
一个对象的构造函数执行结束 happens-before 它的 finalize() 方法的开始。
public class MyObject {
private int value;
public MyObject() {
this.value = 42; // 构造函数中的赋值
}
@Override
protected void finalize() {
// 这里一定能看到 value = 42
System.out.println(value);
}
}
规则 8:传递性规则(Transitivity)
如果 A happens-before B,且 B happens-before C,那么 A happens-before C。
// 线程 A
x = 10; // (1) 普通写
flag = true; // (2) volatile 写
// 线程 B
if (flag) { // (3) volatile 读
// 由传递性:(1) hb (2)(程序顺序规则),(2) hb (3)(volatile 规则)
// 所以 (1) hb (3),x 一定等于 10
assert x == 10; // 成立
}
1.4 指令重排序
现代处理器和编译器为了提高性能,会对指令进行重排序。JMM 将重排序分为三类:
源代码
|
v
+------------------+
| ① 编译器优化重排序 | 编译器在不改变单线程语义的前提下重排语句
+------------------+
|
v
+------------------+
| ② 指令级并行重排序 | 处理器利用指令级并行技术重排指令
+------------------+
|
v
+------------------+
| ③ 内存系统重排序 | 由于缓存和读写缓冲区,内存操作看起来是乱序的
+------------------+
|
v
最终执行的指令序列
重排序导致的典型 Bug:
public class ReorderingExample {
private static int x = 0, y = 0;
private static int a = 0, b = 0;
public static void main(String[] args) throws InterruptedException {
Thread t1 = new Thread(() -> {
a = 1; // (1)
x = b; // (2)
});
Thread t2 = new Thread(() -> {
b = 1; // (3)
y = a; // (4)
});
t1.start();
t2.start();
t1.join();
t2.join();
// 直觉上 x == 0 && y == 0 不可能发生
// 但由于指令重排序,(1)(2) 可能变成 (2)(1),(3)(4) 变成 (4)(3)
// 执行顺序:x = b(0) → y = a(0) → a = 1 → b = 1
// 结果:x == 0 && y == 0 —— 违反直觉!
System.out.println("x=" + x + ", y=" + y);
}
}
为什么要重排序? 因为 CPU 流水线需要尽量减少停顿(stall)。如果前一条指令在等待内存读取,CPU 不会干等,而是先执行后面不依赖该结果的指令。这对单线程是安全的(as-if-serial),但对多线程可能引发可见性和有序性问题。
一句话总结: JMM 是 Java 为了屏蔽硬件差异而定义的内存访问规范,通过 happens-before 原则和内存屏障来保证多线程环境下的可见性、有序性和原子性。
二、volatile 深度剖析
2.1 volatile 的两个语义
语义一:可见性
不加 volatile 时的问题:
public class VisibilityProblem {
private static boolean running = true; // 没有 volatile
public static void main(String[] args) throws InterruptedException {
Thread t = new Thread(() -> {
int count = 0;
// 线程可能把 running 缓存在工作内存或寄存器中
// 永远看不到 main 线程的修改,导致死循环
while (running) {
count++;
}
System.out.println("停止,count=" + count);
});
t.start();
Thread.sleep(1000);
running = false; // main 线程修改了 running
// 但子线程可能永远不会停止!
System.out.println("main 已设置 running=false");
}
}
加了 volatile 后的修复:
public class VisibilityFixed {
// volatile 保证每次读都从主内存读取,每次写都立即刷回主内存
private static volatile boolean running = true;
public static void main(String[] args) throws InterruptedException {
Thread t = new Thread(() -> {
int count = 0;
while (running) { // 每次循环都从主内存读取最新值
count++;
}
System.out.println("停止,count=" + count);
});
t.start();
Thread.sleep(1000);
running = false; // 写入后立即刷新到主内存
// 子线程一定会在下次循环检查时看到 false 并退出
}
}
语义二:有序性(禁止指令重排序)
volatile 通过插入内存屏障(Memory Barrier) 来禁止重排序。JMM 规定了四种屏障:
+-------------+--------------------------------------------------+
| 屏障类型 | 说明 |
+-------------+--------------------------------------------------+
| LoadLoad | Load1; LoadLoad; Load2 |
| | 确保 Load1 的数据装载先于 Load2 及其后所有装载指令 |
+-------------+--------------------------------------------------+
| StoreStore | Store1; StoreStore; Store2 |
| | 确保 Store1 的数据刷新到主内存先于 Store2 及其后 |
+-------------+--------------------------------------------------+
| LoadStore | Load1; LoadStore; Store2 |
| | 确保 Load1 装载数据先于 Store2 及其后刷新到主内存 |
+-------------+--------------------------------------------------+
| StoreLoad | Store1; StoreLoad; Load2 |
| | 确保 Store1 刷新到主内存先于 Load2 及其后装载指令 |
| | 开销最大,是全能屏障 |
+-------------+--------------------------------------------------+
volatile 写操作的屏障插入策略:
普通读/写操作
|
StoreStore 屏障 <-- 防止上面的普通写与下面的 volatile 写重排序
|
volatile 写操作
|
StoreLoad 屏障 <-- 防止 volatile 写与下面可能的 volatile 读/写重排序
|
后续操作
volatile 读操作的屏障插入策略:
volatile 读操作
|
LoadLoad 屏障 <-- 防止 volatile 读与下面的普通读重排序
|
LoadStore 屏障 <-- 防止 volatile 读与下面的普通写重排序
|
后续操作
2.2 volatile 为什么不保证原子性
以经典的 i++ 为例:
public class AtomicityProblem {
private static volatile int count = 0;
public static void main(String[] args) throws InterruptedException {
// 开 10 个线程,每个线程自增 10000 次
Thread[] threads = new Thread[10];
for (int i = 0; i < 10; i++) {
threads[i] = new Thread(() -> {
for (int j = 0; j < 10000; j++) {
count++; // 这不是原子操作!
}
});
threads[i].start();
}
for (Thread t : threads) t.join();
// 预期 100000,实际结果小于 100000
System.out.println("count = " + count);
}
}
count++ 的字节码分析:
// count++ 被编译为 4 条字节码指令:
getstatic count // ① 从主内存读取 count 的值到操作数栈(volatile 读,保证可见性)
iconst_1 // ② 将常数 1 压入操作数栈
iadd // ③ 栈顶两个值相加
putstatic count // ④ 将结果写回 count(volatile 写,保证可见性)
竞态条件的 ASCII 时间线:
时间轴 ----->
线程 A: getstatic(count=10) iconst_1 iadd(=11) putstatic(count=11)
| |
线程 B: | getstatic(count=10) iconst_1 iadd(=11) putstatic(count=11)
| |
+--- 两个线程都读到了 count=10
结果:两个线程各自增了一次,但 count 只从 10 变成了 11,丢失了一次自增!
本质原因: volatile 保证了每次读写都直接操作主内存(可见性),但 i++ 是一个**“读-改-写”** 的复合操作。volatile 只能保证每一步单独的可见性,无法保证这三步作为一个整体的原子性。在”读”和”写”之间,其他线程可以插入。
2.3 volatile 的典型使用场景
场景一:状态标志位
// 设计动机:一个线程写,多个线程读,不需要原子的读-改-写
// volatile 足够保证可见性,比 synchronized 开销小得多
public class GracefulShutdown {
private volatile boolean shutdownRequested = false;
public void shutdown() {
shutdownRequested = true; // 只有一个线程会调用
}
public void doWork() {
while (!shutdownRequested) { // 多个工作线程检查标志
// 执行任务...
}
// 清理资源...
}
}
场景二:双重检查锁定的单例模式
public class Singleton {
// 为什么必须加 volatile?
// 因为 new Singleton() 不是原子操作,分三步:
// ① 分配内存空间
// ② 初始化对象
// ③ 将引用指向分配的内存地址
// 如果不加 volatile,②③ 可能重排序,另一个线程可能拿到未初始化的对象
private static volatile Singleton instance;
private Singleton() {}
public static Singleton getInstance() {
if (instance == null) { // 第一次检查(无锁,快速路径)
synchronized (Singleton.class) {
if (instance == null) { // 第二次检查(有锁,防止重复创建)
instance = new Singleton();
}
}
}
return instance;
}
}
不加 volatile 时重排序导致的问题:
线程 A 执行 new Singleton():
① memory = allocate() // 分配内存
③ instance = memory // 引用指向内存(重排序后先执行)
② ctorInstance(memory) // 初始化对象(重排序后后执行)
|
线程 B 执行第一次 if 检查:
instance != null --> true(但对象还没初始化完成!)
return instance --> 返回了一个半初始化的对象!
场景三:低开销的读-写策略
// 设计动机:读操作用 volatile 保证可见性,写操作用 synchronized 保证原子性
// 适用于读多写少的场景,读不需要加锁,性能接近无锁
public class CheapReadWriteLock {
private volatile int value;
public int getValue() {
return value; // 无锁读,volatile 保证可见性
}
public synchronized void increment() {
value++; // 加锁写,保证原子性
}
}
2.4 volatile vs synchronized vs Atomic
+------------------+------------+------------+------------+
| 特性 | volatile | synchronized | Atomic |
+------------------+------------+------------+------------+
| 原子性 | 否 | 是 | 是 |
| 可见性 | 是 | 是 | 是 |
| 有序性 | 是 | 是 | 否 |
| 阻塞 | 否 | 是 | 否 |
| 性能(无竞争时) | 最快 | 较慢 | 快 |
| 性能(高竞争时) | 最快 | 一般 | CAS自旋 |
| 适用场景 | 状态标志 | 复合操作 | 计数器 |
| 实现机制 | 内存屏障 | Monitor锁 | CAS指令 |
+------------------+------------+------------+------------+
一句话总结: volatile 通过内存屏障保证可见性和有序性,但无法保证复合操作的原子性;它是轻量级的同步机制,适用于”一写多读”的简单场景。
三、synchronized 底层演进
3.1 对象头与 Mark Word
![[Gemini_Generated_Image_bbv0zobbv0zobbv0.png]]
3.2 偏向锁
为什么存在偏向锁?
经过 HotSpot 作者的研究发现,大多数情况下锁不仅不存在多线程竞争,而且总是由同一线程多次获得。偏向锁就是为了在没有竞争的情况下消除同步的全部开销。
类比:你每天去同一家咖啡店,店员认出你后就不再检查会员卡了——“偏向”你。直到另一个人也要来用你的会员卡,才需要重新验证。
偏向锁获取流程:
+---------------------------+
| 线程访问同步块 |
+---------------------------+
|
v
+---------------------------+
| 检查 Mark Word 的偏向标志 |
| biased_lock:1 且锁标志:01 |
+---------------------------+
|
+------+------+
| |
v v
线程 ID 是 线程 ID 不是
当前线程 当前线程
| |
v v
直接进入 +--------------------+
同步块 | 检查原持有线程是否存活 |
(无需CAS) +--------------------+
|
+------+------+
| |
v v
未存活 已存活
| |
v v
CAS 替换 偏向锁撤销
线程 ID 升级为轻量级锁
偏向锁撤销(Revocation):
偏向锁的撤销需要等到全局安全点(SafePoint)——此时所有线程都暂停。撤销的成本较高,所以 JVM 有一个优化:如果一个类的偏向锁撤销次数达到阈值(默认 20 次),JVM 会批量重偏向(Bulk Rebias);如果达到更高阈值(默认 40 次),会批量撤销(Bulk Revoke),之后该类的新实例不再使用偏向锁。
注意:JDK 15 开始,偏向锁被默认禁用(JEP 374),因为现代应用中多线程竞争更加普遍,偏向锁的撤销开销反而成了负担。
3.3 轻量级锁
为什么存在轻量级锁?
当偏向锁被撤销(有另一个线程来竞争了),但竞争不激烈时——线程交替执行同步块,不会同时争抢。此时用轻量级锁,通过 CAS 操作避免使用操作系统级的互斥量。
类比:两个人交替使用同一台打印机,每次用的时候在机器上贴个便签纸(Lock Record)标记”我在用”。因为从来不会同时使用,所以不需要去找管理员(操作系统)协调。
轻量级锁获取流程:
+-------------------------------------+
| 线程进入同步块 |
+-------------------------------------+
|
v
+-------------------------------------+
| 在当前线程的栈帧中创建 Lock Record |
| 将 Mark Word 复制到 Lock Record |
| (这个副本叫 Displaced Mark Word) |
+-------------------------------------+
|
v
+-------------------------------------+
| CAS 尝试将对象头的 Mark Word |
| 替换为指向 Lock Record 的指针 |
| 并将锁标志位改为 00 |
+-------------------------------------+
|
+------+------+
| |
CAS 成功 CAS 失败
| |
v v
成功获取 +--------------------+
轻量级锁 | 检查 Mark Word 是否 |
| 指向当前线程栈帧? |
+--------------------+
|
+------+------+
| |
是(重入) 否(竞争)
| |
v v
Lock Record 自旋等待
的 Displaced |
Mark Word 自旋达到阈值?
设为 null |
(记录重入次数) +------+------+
| |
否 是
| |
v v
继续自旋 膨胀为重量级锁
自旋优化:
自旋就是让线程执行一个空循环(busy-wait),避免切换到内核态。JDK 6 之后引入了自适应自旋:如果在同一个锁对象上,自旋等待刚刚成功获得过锁,那么 JVM 认为这次自旋也很有可能成功,就允许更长时间的自旋;反之,如果自旋很少成功,以后可能直接省略自旋过程。
3.4 重量级锁
为什么存在重量级锁?
当自旋超过阈值还没拿到锁,说明竞争确实很激烈——多个线程同时争抢同一把锁。此时继续自旋就是浪费 CPU,不如让线程阻塞,交给操作系统调度。
类比:打印机现在有很多人排队了,贴便签纸(CAS)已经不管用了,必须请管理员(操作系统内核)来维持秩序,登记排队名单。
ObjectMonitor 结构(C++ 实现):
+------------------------------------------+
| ObjectMonitor |
+------------------------------------------+
| _header : Mark Word 备份 |
| _count : 重入计数 |
| _owner : 持有锁的线程 |
| _WaitSet : wait() 的线程集合(双向链表)|
| _EntryList : 阻塞等待锁的线程集合 |
| _cxq : 最近到达的竞争者(栈结构) |
| _recursions : 重入次数 |
+------------------------------------------+
+------- synchronized(obj) -------+
| |
v |
+----------+ 竞争失败 +-----------+ 获取锁 +--------+
| _cxq | -------> | _EntryList | --------> | _owner |
| (竞争栈) | | (等待队列) | | (持有者)|
+----------+ +-----------+ +--------+
^ |
| | obj.wait()
| obj.notify() v
| +-----------+
+----------------- | _WaitSet |
| (等待集合) |
+-----------+
线程状态转换:
1. 线程竞争锁失败 --> 进入 _cxq 或 _EntryList --> BLOCKED 状态
2. 获取锁 --> 成为 _owner --> RUNNABLE 状态
3. 调用 wait() --> 释放锁,进入 _WaitSet --> WAITING 状态
4. 被 notify() --> 从 _WaitSet 移到 _EntryList --> BLOCKED 状态
5. 重新获取锁 --> 成为 _owner --> RUNNABLE 状态
为什么重量级锁开销大?
- 系统调用(syscall):线程阻塞和唤醒需要从用户态切换到内核态,每次切换约 1-10 微秒
- 上下文切换:保存和恢复线程的寄存器、程序计数器、栈指针等,还会导致 CPU 缓存失效
- 线程调度:操作系统需要决定唤醒哪个线程,涉及调度算法的开销
3.5 锁升级全流程
+--------+ 有另一线程 +--------+ 存在实际竞争 +----------+
| 无锁 | --竞争偏向锁------> | 偏向锁 | --CAS竞争失败-----> | 轻量级锁 |
| (01,0) | | (01,1) | | (00) |
+--------+ +--------+ +----------+
|
自旋超过阈值
或等待线程数>1
|
v
+----------+
| 重量级锁 |
| (10) |
+----------+
详细流程图:
线程访问同步块
|
v
+----是否启用偏向锁?----+
| |
是 否
| |
v v
尝试偏向 直接轻量级锁
|
v
+--对象是否已偏向?--+
| |
未偏向 已偏向
| |
v v
CAS设置 偏向的是
线程ID 当前线程?
| / \
成功 是 否
| | |
v v v
进入同步块 直接进入 到达安全点
(偏向当前 同步块 撤销偏向
线程) |
v
升级为轻量级锁
|
v
CAS 获取锁
/ \
成功 失败
| |
v v
进入同步块 自旋等待
|
超过阈值?
/ \
否 是
| |
v v
继续自旋 膨胀为重量级锁
|
v
线程阻塞(BLOCKED)
等待 _owner 释放
锁只能升级,不能降级(有一个例外):
这是因为锁降级的判断成本太高——你不知道什么时候从”高竞争”变回”低竞争”。唯一的例外是偏向锁的批量撤销(Bulk Revoke),这实际上是在安全点上统一操作,性质不同于运行时降级。
3.6 锁优化技术
锁消除(Lock Elimination)
JIT 编译器通过逃逸分析判断一个对象是否被多线程共享。如果对象不会逃逸出当前线程,那么对它的加锁操作可以直接消除。
// 设计动机:程序员可能不经意地在不需要同步的地方使用了同步
// 比如在方法内部创建的 StringBuffer
public String concat(String s1, String s2) {
// StringBuffer 的 append 方法是 synchronized 的
// 但 sb 不会逃逸出这个方法,不可能被其他线程访问
StringBuffer sb = new StringBuffer();
sb.append(s1); // JIT 会消除这里的 synchronized
sb.append(s2); // JIT 会消除这里的 synchronized
return sb.toString();
}
// JIT 优化后,等价于:
public String concat(String s1, String s2) {
StringBuffer sb = new StringBuffer();
sb.append(s1); // 无锁
sb.append(s2); // 无锁
return sb.toString();
}
锁粗化(Lock Coarsening)
如果 JVM 检测到一连串零碎的加锁解锁操作都是对同一个对象,就会把锁的范围扩大(粗化),只加一次锁。
// 优化前:反复加锁解锁
for (int i = 0; i < 10000; i++) {
synchronized (lock) { // 10000 次 lock/unlock
list.add(i);
}
}
// JIT 锁粗化后:
synchronized (lock) { // 只有 1 次 lock/unlock
for (int i = 0; i < 10000; i++) {
list.add(i);
}
}
自适应自旋(Adaptive Spinning)
自旋的次数不再固定,而是由 JVM 根据前一次在同一个锁上的自旋结果动态调整:
- 上次自旋成功了 → 这次多自旋几圈(乐观)
- 上次自旋失败了 → 这次少自旋或直接不自旋(悲观)
- 如果某个锁从来没有自旋成功过 → 以后直接跳过自旋阶段
一句话总结: synchronized 经历了从无锁到偏向锁、轻量级锁、重量级锁的升级演进,配合锁消除、锁粗化和自适应自旋等优化,已经不再是性能杀手。
四、AQS 框架精讲
4.1 什么是 AQS
AQS(AbstractQueuedSynchronizer)是 Java 并发包(java.util.concurrent)的基石框架。ReentrantLock、Semaphore、CountDownLatch、ReentrantReadWriteLock 等同步工具都是基于 AQS 实现的。
类比:银行排队系统
+---------------------------------------------------------------------+
| 银行排队系统 |
+---------------------------------------------------------------------+
| |
| state(柜台状态) |
| = 0 表示柜台空闲 |
| = 1 表示有人正在办理 |
| > 1 表示重入次数(同一个人办多笔业务) |
| |
| CLH 队列(排队队列) |
| +------+ +------+ +------+ +------+ |
| | head | -> | 客户A | -> | 客户B | -> | 客户C | (tail) |
| |(哨兵)| |(等待) | |(等待) | |(等待) | |
| +------+ +------+ +------+ +------+ |
| |
| 每个客户(Node)记录: |
| - 我的状态(等待中/取消/需要被唤醒) |
| - 前面是谁(prev) |
| - 后面是谁(next) |
| - 绑定的线程 |
+---------------------------------------------------------------------+
设计动机: Doug Lea 发现几乎所有同步器都遵循相同的模式——维护一个共享状态 + 管理等待线程队列。AQS 把这个通用逻辑抽取出来,子类只需要实现”如何判断能否获取/释放同步状态”的逻辑即可。
4.2 AQS 核心数据结构
AbstractQueuedSynchronizer
+---------------------------------------------------+
| volatile int state; // 同步状态 |
| |
| +--- CLH 双向队列 (FIFO) -----------------------+ |
| | | |
| | +------+ prev +------+ prev +------+ | |
| | | head | <-------- | Node | <------ | tail | | |
| | |(哨兵) | -------> | 节点 | ------> | | | |
| | +------+ next +------+ next +------+ | |
| | | |
| +------------------------------------------------+ |
+---------------------------------------------------+
Node 结构:
+---------------------------------------------------+
| static final class Node { |
| volatile int waitStatus; // 节点等待状态 |
| volatile Node prev; // 前驱节点 |
| volatile Node next; // 后继节点 |
| volatile Thread thread; // 绑定的线程 |
| Node nextWaiter; // Condition队列指针 |
| } |
+---------------------------------------------------+
waitStatus 取值:
+-------------+-------+------------------------------------------+
| 状态名 | 值 | 含义 |
+-------------+-------+------------------------------------------+
| CANCELLED | 1 | 线程已取消等待(超时或中断) |
| SIGNAL | -1 | 后继节点需要被唤醒(当前节点释放锁时要通知) |
| CONDITION | -2 | 节点在 Condition 队列中等待 |
| PROPAGATE | -3 | 共享模式下,释放应该传播到其他节点 |
| 0 | 0 | 初始状态 |
+-------------+-------+------------------------------------------+
4.3 独占模式源码分析
以 ReentrantLock 的 lock() 为例,完整调用链:
lock()
└── acquire(1)
├── tryAcquire(1) // 尝试获取锁(子类实现)
│ ├── 成功 → 返回
│ └── 失败 ↓
├── addWaiter(Node.EXCLUSIVE) // 创建节点加入队列
└── acquireQueued(node, 1) // 在队列中自旋/阻塞
├── shouldParkAfterFailedAcquire() // 判断是否需要阻塞
└── parkAndCheckInterrupt() // LockSupport.park() 阻塞
acquire() — 获取锁的入口:
// 设计动机:先快速尝试获取(无锁快速路径),失败再排队(慢速路径)
public final void acquire(int arg) {
if (!tryAcquire(arg) && // ① 尝试获取锁(由子类实现)
acquireQueued(addWaiter(Node.EXCLUSIVE), arg)) // ② 获取失败则排队
selfInterrupt(); // ③ 如果在排队过程中被中断过,补上中断标志
}
// 为什么要 selfInterrupt?因为 parkAndCheckInterrupt 会清除中断标志
// 但 acquire 的语义是不响应中断的,所以要在成功获取锁后恢复中断状态
addWaiter() — 将当前线程封装为 Node 加入队列尾部:
private Node addWaiter(Node mode) {
Node node = new Node(Thread.currentThread(), mode);
// 快速尝试:直接 CAS 设置 tail(大多数情况下成功)
Node pred = tail;
if (pred != null) {
node.prev = pred;
if (compareAndSetTail(pred, node)) { // CAS 成功
pred.next = node;
return node;
}
}
// 快速尝试失败(队列为空或 CAS 失败),进入完整入队流程
enq(node);
return node;
}
// 设计动机:先用快速路径(一次 CAS),失败再走完整的自旋入队
// 这是典型的 fast-path / slow-path 设计模式
private Node enq(final Node node) {
for (;;) { // 自旋,直到入队成功
Node t = tail;
if (t == null) {
// 队列为空,初始化一个哨兵节点作为 head
if (compareAndSetHead(new Node()))
tail = head;
} else {
node.prev = t;
if (compareAndSetTail(t, node)) {
t.next = node;
return t;
}
}
}
}
acquireQueued() — 在队列中自旋获取锁:
// 设计动机:只有队列中第二个节点(head 的后继)才有资格尝试获取锁
// 其他节点老老实实阻塞,避免所有线程同时自旋浪费 CPU
final boolean acquireQueued(final Node node, int arg) {
boolean failed = true;
try {
boolean interrupted = false;
for (;;) { // 自旋
final Node p = node.predecessor(); // 获取前驱节点
// 只有前驱是 head 时才尝试获取锁(公平性保证)
if (p == head && tryAcquire(arg)) {
setHead(node); // 获取成功,当前节点变成新的 head
p.next = null; // 帮助 GC 回收旧 head
failed = false;
return interrupted;
}
// 不是 head 的后继,或者 tryAcquire 失败
if (shouldParkAfterFailedAcquire(p, node) && // 检查是否应该阻塞
parkAndCheckInterrupt()) // 阻塞当前线程
interrupted = true;
}
} finally {
if (failed)
cancelAcquire(node); // 异常情况下取消获取
}
}
shouldParkAfterFailedAcquire() — 判断是否应该阻塞:
// 设计动机:不是一失败就阻塞,而是先整理队列状态
// 确保前驱节点的 waitStatus 为 SIGNAL 后再阻塞
// 这样前驱释放锁时才知道要唤醒后继
private static boolean shouldParkAfterFailedAcquire(Node pred, Node node) {
int ws = pred.waitStatus;
if (ws == Node.SIGNAL)
// 前驱已经设为 SIGNAL,安全地阻塞
return true;
if (ws > 0) {
// 前驱被取消了(CANCELLED),跳过所有被取消的前驱
do {
node.prev = pred = pred.prev;
} while (pred.waitStatus > 0);
pred.next = node;
} else {
// 前驱状态为 0 或 PROPAGATE,设为 SIGNAL
compareAndSetWaitStatus(pred, ws, Node.SIGNAL);
}
return false; // 返回 false,外层 acquireQueued 会再循环一次
}
4.4 共享模式源码分析
共享模式允许多个线程同时获取同步状态,典型的实现有 Semaphore 和 CountDownLatch。
acquireShared(arg)
└── tryAcquireShared(arg) // 返回值:< 0 失败,>= 0 成功
├── >= 0 → 获取成功,返回
└── < 0 → 失败 ↓
└── doAcquireShared(arg) // 排队 + 阻塞
└── 获取成功后 setHeadAndPropagate() // 关键:传播唤醒
// 设计动机:与独占模式的关键区别在于 tryAcquireShared 的返回值
// 返回正数表示还有剩余资源,可以继续唤醒后面等待的线程
public final void acquireShared(int arg) {
if (tryAcquireShared(arg) < 0)
doAcquireShared(arg);
}
private void doAcquireShared(int arg) {
final Node node = addWaiter(Node.SHARED); // 注意:SHARED 模式
boolean failed = true;
try {
boolean interrupted = false;
for (;;) {
final Node p = node.predecessor();
if (p == head) {
int r = tryAcquireShared(arg);
if (r >= 0) {
// 获取成功,设置头节点并传播
setHeadAndPropagate(node, r);
p.next = null;
if (interrupted) selfInterrupt();
failed = false;
return;
}
}
if (shouldParkAfterFailedAcquire(p, node) &&
parkAndCheckInterrupt())
interrupted = true;
}
} finally {
if (failed) cancelAcquire(node);
}
}
// 传播机制:一个线程获取到共享锁后,如果还有剩余资源,继续唤醒后面的线程
private void setHeadAndPropagate(Node node, int propagate) {
Node h = head;
setHead(node);
// propagate > 0 说明还有剩余资源,继续唤醒
if (propagate > 0 || h == null || h.waitStatus < 0 ||
(h = head) == null || h.waitStatus < 0) {
Node s = node.next;
if (s == null || s.isShared())
doReleaseShared(); // 唤醒后继共享节点
}
}
传播机制的直觉理解:
独占模式就像银行只有一个柜台,一次只服务一个人。共享模式就像银行大门,可以同时进 N 个人。当一个人进去后发现门还没满(propagate > 0),就喊后面的人也进来。
4.5 ReentrantLock 基于 AQS 的实现
公平锁 vs 非公平锁:
// ============ 非公平锁的 tryAcquire ============
// 设计动机:新来的线程直接尝试抢锁,不管队列里有没有人排队
// 优点:减少线程切换开销,吞吐量更高
// 缺点:可能导致队列中的线程饥饿
final boolean nonfairTryAcquire(int acquires) {
final Thread current = Thread.currentThread();
int c = getState();
if (c == 0) {
// 直接 CAS 抢锁,不检查队列!
if (compareAndSetState(0, acquires)) {
setExclusiveOwnerThread(current);
return true;
}
}
else if (current == getExclusiveOwnerThread()) {
// 重入:state + 1
int nextc = c + acquires;
if (nextc < 0) throw new Error("Maximum lock count exceeded");
setState(nextc);
return true;
}
return false;
}
// ============ 公平锁的 tryAcquire ============
// 设计动机:严格按照先来后到,保证公平性
// 优点:不会饥饿
// 缺点:多了一次 hasQueuedPredecessors() 检查,性能略差
protected final boolean tryAcquire(int acquires) {
final Thread current = Thread.currentThread();
int c = getState();
if (c == 0) {
// 关键区别:先检查队列中是否有前驱节点在等待
if (!hasQueuedPredecessors() && // <--- 这一行是唯一的区别!
compareAndSetState(0, acquires)) {
setExclusiveOwnerThread(current);
return true;
}
}
else if (current == getExclusiveOwnerThread()) {
int nextc = c + acquires;
if (nextc < 0) throw new Error("Maximum lock count exceeded");
setState(nextc);
return true;
}
return false;
}
重入的实现原理:
第一次 lock(): state: 0 → 1, owner: null → ThreadA
第二次 lock(): state: 1 → 2, owner: ThreadA(不变,是同一个线程)
第三次 lock(): state: 2 → 3
第一次 unlock():state: 3 → 2
第二次 unlock():state: 2 → 1
第三次 unlock():state: 1 → 0, owner: ThreadA → null(完全释放)
4.6 Condition 机制
Condition 是 AQS 内部类 ConditionObject 实现的,它为每个 Condition 对象维护了一个单独的等待队列(Condition Queue),与 AQS 的同步队列(Sync Queue)是两个不同的队列。
AQS 同步队列(Sync Queue):
争抢锁的线程排队的地方
+------+ +------+ +------+
| head | -> | NodeA | -> | NodeB | (tail)
+------+ +------+ +------+
Condition 等待队列(Condition Queue):
调用 await() 的线程等待的地方(单向链表)
Condition1:
+----------+ +----------+
| firstWaiter| -> | lastWaiter |
| (NodeC) | | (NodeD) |
+----------+ +----------+
Condition2:
+----------+
| firstWaiter|
| (NodeE) |
+----------+
await() 和 signal() 的转移过程:
线程 A 调用 condition.await():
① 创建新 Node,加入 Condition Queue 尾部
② 完全释放锁(state 置为 0)
③ 线程阻塞(LockSupport.park)
Sync Queue: head -> [其他等待者...]
Cond Queue: ... -> [NodeA] (线程 A 在这里等待)
线程 B 调用 condition.signal():
① 取 Condition Queue 的 firstWaiter(NodeA)
② 将 NodeA 从 Condition Queue 移除
③ 将 NodeA 加入 Sync Queue 尾部
④ 将 NodeA 的 waitStatus 设为 SIGNAL
Sync Queue: head -> [其他等待者...] -> [NodeA] (线程 A 转移到这里)
Cond Queue: (空或剩余节点)
线程 A 被唤醒后:
⑤ 在 Sync Queue 中排队等待获取锁
⑥ 获取锁成功后从 await() 返回
// await() 核心逻辑
public final void await() throws InterruptedException {
if (Thread.interrupted()) throw new InterruptedException();
Node node = addConditionWaiter(); // ① 加入 Condition Queue
int savedState = fullyRelease(node); // ② 完全释放锁(重入锁也全部释放)
int interruptMode = 0;
while (!isOnSyncQueue(node)) { // 如果还没被转移到 Sync Queue
LockSupport.park(this); // ③ 阻塞
if ((interruptMode = checkInterruptWhileWaiting(node)) != 0)
break;
}
// 被 signal 唤醒后,在 Sync Queue 中重新获取锁
if (acquireQueued(node, savedState) && interruptMode != THROW_IE)
interruptMode = REINTERRUPT;
// 清理 Condition Queue 中已取消的节点
if (node.nextWaiter != null)
unlinkCancelledWaiters();
if (interruptMode != 0)
reportInterruptAfterWait(interruptMode);
}
4.7 基于 AQS 自定义同步器
示例:实现一个简单的互斥锁
/**
* 设计动机:展示 AQS 的模板方法模式
* 子类只需要实现 tryAcquire/tryRelease(独占)
* 或 tryAcquireShared/tryReleaseShared(共享)
* 排队、阻塞、唤醒的逻辑全部由 AQS 负责
*/
public class SimpleMutex implements Lock {
private static class Sync extends AbstractQueuedSynchronizer {
// state == 0: 未锁定
// state == 1: 已锁定
@Override
protected boolean tryAcquire(int arg) {
// CAS 将 state 从 0 改为 1
if (compareAndSetState(0, 1)) {
setExclusiveOwnerThread(Thread.currentThread());
return true;
}
return false;
}
@Override
protected boolean tryRelease(int arg) {
if (getState() == 0) throw new IllegalMonitorStateException();
setExclusiveOwnerThread(null);
setState(0); // 不需要 CAS,因为是独占模式,只有持有锁的线程才能释放
return true;
}
@Override
protected boolean isHeldExclusively() {
return getExclusiveOwnerThread() == Thread.currentThread();
}
Condition newCondition() {
return new ConditionObject();
}
}
private final Sync sync = new Sync();
@Override public void lock() { sync.acquire(1); }
@Override public void unlock() { sync.release(1); }
@Override public boolean tryLock() { return sync.tryAcquire(1); }
@Override public Condition newCondition() { return sync.newCondition(); }
@Override public void lockInterruptibly() throws InterruptedException {
sync.acquireInterruptibly(1);
}
@Override public boolean tryLock(long time, TimeUnit unit) throws InterruptedException {
return sync.tryAcquireNanos(1, unit.toNanos(time));
}
}
示例:实现一个共享锁(允许最多 N 个线程同时访问)
/**
* 设计动机:展示共享模式的使用方式
* 类似于 Semaphore,但更简单
*/
public class SharedLock {
private static class Sync extends AbstractQueuedSynchronizer {
Sync(int permits) {
setState(permits); // 初始许可数
}
@Override
protected int tryAcquireShared(int arg) {
for (;;) {
int available = getState();
int remaining = available - arg;
// remaining < 0 表示资源不足,返回负数让调用者排队
// remaining >= 0 则 CAS 扣减
if (remaining < 0 || compareAndSetState(available, remaining))
return remaining;
}
}
@Override
protected boolean tryReleaseShared(int arg) {
for (;;) {
int current = getState();
int next = current + arg;
if (compareAndSetState(current, next))
return true;
}
}
}
private final Sync sync;
public SharedLock(int permits) {
sync = new Sync(permits);
}
public void acquire() { sync.acquireShared(1); }
public void release() { sync.releaseShared(1); }
}
一句话总结: AQS 通过”volatile state + CLH 双向队列 + CAS”实现了一套通用的同步框架,子类只需实现 tryAcquire/tryRelease 即可得到完整的锁语义。
五、CAS 与原子操作
5.1 CAS 原理
CAS(Compare-And-Swap)是一种乐观锁思想的实现,它有三个操作数:
- V(Value):内存中的当前值
- A(Expected):期望值(我认为现在应该是什么)
- B(New):新值(如果确实是 A,我要改成 B)
CAS 的逻辑(伪代码):
if (V == A) {
V = B;
return true; // 成功
} else {
return false; // 失败,别人已经改过了
}
// 关键:这整个 if + 赋值 是一个原子操作(由 CPU 指令保证)
CPU 级别的实现:
在 x86 架构上,CAS 对应 CMPXCHG 指令(Compare and Exchange)。在多核处理器上,这条指令会配合 LOCK 前缀使用,它会锁住总线或缓存行,确保在执行期间其他核心无法修改同一内存地址。
// x86 汇编(简化)
lock cmpxchg [address], new_value
// lock 前缀确保原子性
// cmpxchg: if (EAX == [address]) { [address] = new_value; ZF=1 }
// else { EAX = [address]; ZF=0 }
为什么 CAS 比加锁快?
加锁方式(悲观锁):
线程 A: [获取锁] ----[执行操作]---- [释放锁]
线程 B: [尝试获取锁] --[阻塞(OS调度)]-- [被唤醒] --[获取锁] --[执行]-- [释放]
↑ 用户态 → 内核态(开销大)
CAS 方式(乐观锁):
线程 A: [CAS成功] --[操作完成]
线程 B: [CAS失败] --[重试] --[CAS成功] --[操作完成]
↑ 始终在用户态,没有上下文切换
5.2 Unsafe 类
Unsafe 是 Java 中直接操作内存的后门类,位于 sun.misc.Unsafe(JDK 9+ 改为 jdk.internal.misc.Unsafe)。原子类的底层实现都依赖它。
// AtomicInteger 的核心实现
public class AtomicInteger extends Number {
// Unsafe 实例
private static final Unsafe unsafe = Unsafe.getUnsafe();
// value 字段的内存偏移量(用于直接操作内存)
private static final long valueOffset;
static {
try {
// 获取 value 字段在对象中的内存偏移量
valueOffset = unsafe.objectFieldOffset(
AtomicInteger.class.getDeclaredField("value"));
} catch (Exception ex) { throw new Error(ex); }
}
// volatile 保证可见性
private volatile int value;
// 设计动机:自旋 CAS,失败就重试,直到成功
// 这就是无锁编程的核心模式
public final int getAndAddInt(Object obj, long offset, int delta) {
int v;
do {
v = getIntVolatile(obj, offset); // 读取当前值
} while (!compareAndSwapInt(obj, offset, v, v + delta)); // CAS 尝试更新
return v;
}
// native 方法,直接调用 CPU 的 CMPXCHG 指令
public final native boolean compareAndSwapInt(
Object obj, long offset, int expected, int update);
}
5.3 ABA 问题
什么是 ABA 问题?
CAS 判断的是”值有没有变”,但无法判断”值是不是被改过又改回来了”。
时间线:
线程 A:读取值 = A
|
|(线程 A 被挂起)
|
线程 B:将 A 改为 B
线程 C:将 B 改回 A
|
线程 A:CAS(expected=A, new=X) → 成功!
|
但此时的 A 已经不是当初那个 A 了!
具体例子:链表操作
初始状态:head -> A -> B -> C
线程 1 想做:CAS 把 head 从 A 改为 B(删除 A)
线程 1 读到 head = A,记下 next = B
线程 2 执行了一系列操作:
删除 A 和 B:head -> C
重新插入一个新的 A(不同对象但值相同):head -> A -> C
线程 1 恢复执行:
CAS(head, A, B) 成功!
但 B 已经被删除了,B.next 可能是野指针!
结果:head -> B -> ???(内存错误)
解决方案:AtomicStampedReference
// 设计动机:给每次修改加一个"版本号"
// 不仅比较值是否相同,还比较版本号是否相同
// 即使值从 A 变成 B 再变回 A,版本号也从 1 变成 2 再变成 3,不会相同
public class ABADemo {
// AtomicStampedReference 内部同时维护引用和版本号(stamp)
private static AtomicStampedReference<Integer> ref =
new AtomicStampedReference<>(100, 0); // 初始值 100,版本号 0
public static void main(String[] args) {
int stamp = ref.getStamp(); // 获取当前版本号
Integer reference = ref.getReference(); // 获取当前值
// CAS 时同时检查值和版本号
boolean success = ref.compareAndSet(
reference, // 期望值
200, // 新值
stamp, // 期望版本号
stamp + 1 // 新版本号
);
}
}
AtomicMarkableReference: 如果不关心被修改了几次,只关心”有没有被修改过”,可以用 AtomicMarkableReference,它用一个 boolean 标记代替版本号。
5.4 原子类家族
java.util.concurrent.atomic 包下的原子类:
+------------------------------------------+
| 基本类型原子类 |
| AtomicInteger |
| AtomicLong |
| AtomicBoolean |
+------------------------------------------+
| 引用类型原子类 |
| AtomicReference<V> |
| AtomicStampedReference<V> (解决ABA) |
| AtomicMarkableReference<V> (标记引用) |
+------------------------------------------+
| 数组原子类 |
| AtomicIntegerArray |
| AtomicLongArray |
| AtomicReferenceArray<E> |
+------------------------------------------+
| 字段更新器(对已有类的 volatile 字段做原子更新)|
| AtomicIntegerFieldUpdater<T> |
| AtomicLongFieldUpdater<T> |
| AtomicReferenceFieldUpdater<T,V> |
+------------------------------------------+
| 累加器(JDK 8+,高并发下性能更好) |
| LongAdder |
| LongAccumulator |
| DoubleAdder |
| DoubleAccumulator |
+------------------------------------------+
5.5 LongAdder 为什么比 AtomicLong 快
AtomicLong 的瓶颈:
所有线程争抢同一个 value:
线程1 ---+
线程2 ---+--> CAS(value) --> 成功只有1个,其他全部重试
线程3 ---+
线程4 ---+
高并发下,大量 CAS 失败 → 大量自旋 → 浪费 CPU
LongAdder 的分散策略:
LongAdder 内部结构:
+-------+ +---------+---------+---------+---------+
| base | | Cell[0] | Cell[1] | Cell[2] | Cell[3] |
| (基础值)| | value | value | value | value |
+-------+ +---------+---------+---------+---------+
线程1 -------> Cell[0] (CAS)
线程2 -------> Cell[1] (CAS)
线程3 -------> Cell[2] (CAS) 每个线程分散到不同的 Cell
线程4 -------> Cell[3] (CAS)
最终结果 = base + Cell[0] + Cell[1] + Cell[2] + Cell[3]
// 设计动机:"化整为零"——把一个热点变量拆分成多个,减少竞争
// 类似于银行多开几个柜台,每个柜台独立服务,最后汇总
// 这就是 Striped64 的核心思想
// LongAdder 的 add 方法(简化)
public void add(long x) {
Cell[] as; long b, v; int m; Cell a;
// 首先尝试 CAS 更新 base(快速路径,低竞争时走这里)
if ((as = cells) != null || !casBase(b = base, b + x)) {
boolean uncontended = true;
// base CAS 失败,说明有竞争,使用 Cell 数组
if (as == null || (m = as.length - 1) < 0 ||
// 通过线程的 probe 值(类似哈希)定位到某个 Cell
(a = as[getProbe() & m]) == null ||
!(uncontended = a.cas(v = a.value, v + x)))
// Cell CAS 也失败了,进入终极方法,可能扩容
longAccumulate(x, null, uncontended);
}
}
public long sum() {
Cell[] as = cells; Cell a;
long sum = base;
if (as != null) {
for (int i = 0; i < as.length; ++i) {
if ((a = as[i]) != null)
sum += a.value; // 把所有 Cell 的值加起来
}
}
return sum;
// 注意:sum() 不是原子操作!在统计过程中其他线程可能还在修改
// 所以 LongAdder 适合统计场景,不适合需要精确实时值的场景
}
伪共享(False Sharing)问题与 @Contended:
CPU 缓存行(Cache Line)通常是 64 字节:
+-------- 一个缓存行 (64 bytes) --------+
| Cell[0].value | Cell[1].value | ... |
+---------------------------------------+
问题:Cell[0] 和 Cell[1] 在同一个缓存行中。
当 CPU 核心 1 修改 Cell[0] 时,会导致 CPU 核心 2 缓存行失效,
即使核心 2 只使用 Cell[1]。这就是伪共享——不同的数据因为物理上相邻而互相干扰。
解决方案:用填充(Padding)让每个 Cell 独占一个缓存行
+------ 缓存行 1 ------+ +------ 缓存行 2 ------+
| [padding] Cell[0] | | [padding] Cell[1] |
| [padding] | | [padding] |
+-----------------------+ +-----------------------+
// JDK 8 中 Cell 类用 @Contended 注解防止伪共享
// JVM 会自动在对象前后加填充字节
@sun.misc.Contended // 告诉 JVM:给这个对象前后加 128 字节的 padding
static final class Cell {
volatile long value;
Cell(long x) { value = x; }
final boolean cas(long cmp, long val) {
return UNSAFE.compareAndSwapLong(this, valueOffset, cmp, val);
}
}
// 使用 @Contended 需要 JVM 参数:-XX:-RestrictContended
一句话总结: CAS 是无锁编程的基石,通过 CPU 原子指令实现;LongAdder 通过”分散热点 + Cell 数组”策略在高并发场景下大幅超越 AtomicLong 的性能。
六、线程池源码级理解
6.1 线程池状态机
线程池用一个 AtomicInteger 类型的 ctl 变量同时存储两个信息:
ctl(32 位 int):
+---+-----------------------------+
| 3 | 29 |
+---+-----------------------------+
↑ ↑
状态位 工作线程数量
(高3位) (低29位,最大约5亿)
5 种状态及其转换:
shutdown()
RUNNING (-1) ─────────────────> SHUTDOWN (0)
│ │
│ shutdownNow() │ 队列为空 && 工作线程数为 0
│ v
└──────────────────────> STOP (1)
│
│ 工作线程数为 0
v
TIDYING (2)
│
│ terminated() 钩子执行完毕
v
TERMINATED (3)
状态值说明:
+-------------+-------+--------------------------------------------+
| 状态 | 值 | 说明 |
+-------------+-------+--------------------------------------------+
| RUNNING | -1 | 接受新任务,处理队列中的任务 |
| SHUTDOWN | 0 | 不接受新任务,但处理队列中的任务 |
| STOP | 1 | 不接受新任务,不处理队列,中断正在执行的任务 |
| TIDYING | 2 | 所有任务已结束,workerCount=0,即将执行钩子 |
| TERMINATED | 3 | terminated() 钩子方法执行完毕 |
+-------------+-------+--------------------------------------------+
// 设计动机:用一个变量存两个信息,一次 CAS 就能同时更新状态和线程数
// 避免了两个变量需要两次 CAS 的不一致问题
private final AtomicInteger ctl = new AtomicInteger(ctlOf(RUNNING, 0));
private static final int COUNT_BITS = Integer.SIZE - 3; // 29
private static final int CAPACITY = (1 << COUNT_BITS) - 1; // 0x1FFFFFFF
// 高 3 位:状态
private static final int RUNNING = -1 << COUNT_BITS; // 111_00...0
private static final int SHUTDOWN = 0 << COUNT_BITS; // 000_00...0
private static final int STOP = 1 << COUNT_BITS; // 001_00...0
private static final int TIDYING = 2 << COUNT_BITS; // 010_00...0
private static final int TERMINATED = 3 << COUNT_BITS; // 011_00...0
// 拆解方法
private static int runStateOf(int c) { return c & ~CAPACITY; } // 取高 3 位
private static int workerCountOf(int c) { return c & CAPACITY; } // 取低 29 位
private static int ctlOf(int rs, int wc) { return rs | wc; } // 合并
6.2 execute() 核心流程
// 这是理解线程池最重要的方法
public void execute(Runnable command) {
if (command == null) throw new NullPointerException();
int c = ctl.get();
// 步骤一:工作线程数 < 核心线程数 → 创建核心线程执行任务
if (workerCountOf(c) < corePoolSize) {
if (addWorker(command, true)) // true 表示核心线程
return;
c = ctl.get(); // addWorker 失败,重新获取 ctl
}
// 步骤二:核心线程满了 → 尝试把任务加入队列
if (isRunning(c) && workQueue.offer(command)) {
int recheck = ctl.get();
// Double Check:再次检查线程池状态
// 因为在入队的过程中,线程池可能被 shutdown 了
if (!isRunning(recheck) && remove(command))
reject(command); // 线程池已关闭,拒绝任务
else if (workerCountOf(recheck) == 0)
addWorker(null, false); // 防止工作线程全部死亡,队列中的任务没人处理
}
// 步骤三:队列也满了 → 创建非核心线程
else if (!addWorker(command, false)) // false 表示非核心线程
reject(command); // 创建失败(达到 maximumPoolSize)→ 拒绝
}
execute() 的决策流程图:
execute(task)
|
v
+---------------------+
| workerCount | 是
| < corePoolSize ? ---------> addWorker(task, core=true) --> 成功则返回
+---------------------+ |
| 否 失败
v |
+---------------------+ v
| 线程池 RUNNING | 是 重新获取 ctl
| && queue.offer(task)?--------+ |
+---------------------+ | |
| 否 v |
| +----Double Check----+ |
| | 线程池还在 RUNNING? | |
| +---+----------+-----+ |
| | | |
| 否 是 |
| | | |
| v v |
| remove workerCount==0? |
| +reject | |
| 是: addWorker(null,false) |
| 否: do nothing |
v |
+----------------------+ |
| addWorker(task, | <-----------------------------------+
| core=false) |
+---+------------------+
|
成功则执行
失败则 reject(task)
6.3 Worker 线程的生命周期
// Worker 的双重身份:
// 1. 继承 AQS:用于实现不可重入的互斥锁(判断线程是否空闲)
// 2. 实现 Runnable:作为线程的执行体
private final class Worker extends AbstractQueuedSynchronizer implements Runnable {
final Thread thread; // Worker 绑定的线程
Runnable firstTask; // 创建时的第一个任务(可能为 null)
volatile long completedTasks; // 完成的任务数
Worker(Runnable firstTask) {
setState(-1); // 初始 state=-1,阻止中断直到 runWorker
this.firstTask = firstTask;
this.thread = getThreadFactory().newThread(this);
}
public void run() {
runWorker(this);
}
}
// runWorker — Worker 线程的核心执行循环
// 设计动机:通过 getTask() 不断从队列中取任务
// 核心线程用 take()(无限等待),非核心线程用 poll(timeout)(超时退出)
final void runWorker(Worker w) {
Thread wt = Thread.currentThread();
Runnable task = w.firstTask;
w.firstTask = null;
w.unlock(); // 允许中断(setState 从 -1 变为 0)
boolean completedAbruptly = true;
try {
// 关键循环:先执行 firstTask,然后不断 getTask() 取新任务
while (task != null || (task = getTask()) != null) {
w.lock(); // 获取 Worker 自身的锁(标记为"忙碌")
// 如果线程池 >= STOP,确保线程被中断
if ((runStateAtLeast(ctl.get(), STOP) ||
(Thread.interrupted() && runStateAtLeast(ctl.get(), STOP))) &&
!wt.isInterrupted())
wt.interrupt();
try {
beforeExecute(wt, task); // 钩子方法(可覆写)
try {
task.run(); // 实际执行任务
} catch (Throwable x) {
afterExecute(task, x); // 钩子方法
throw x;
}
afterExecute(task, null);
} finally {
task = null;
w.completedTasks++;
w.unlock(); // 标记为"空闲"
}
}
completedAbruptly = false;
} finally {
processWorkerExit(w, completedAbruptly); // 线程退出清理
}
}
// getTask — 从队列中获取任务(核心线程 vs 非核心线程的区别在这里)
private Runnable getTask() {
boolean timedOut = false;
for (;;) {
int c = ctl.get();
int rs = runStateOf(c);
// 线程池已关闭且队列为空,返回 null(线程退出)
if (rs >= SHUTDOWN && (rs >= STOP || workQueue.isEmpty())) {
decrementWorkerCount();
return null;
}
int wc = workerCountOf(c);
// 关键判断:是否需要超时控制?
// allowCoreThreadTimeOut=true 或 当前线程数 > 核心线程数 → 需要超时
boolean timed = allowCoreThreadTimeOut || wc > corePoolSize;
// 线程数超标或超时 → 减少线程
if ((wc > maximumPoolSize || (timed && timedOut))
&& (wc > 1 || workQueue.isEmpty())) {
if (compareAndDecrementWorkerCount(c))
return null; // 返回 null,调用者的 while 循环结束,线程退出
continue;
}
try {
Runnable r = timed ?
workQueue.poll(keepAliveTime, TimeUnit.NANOSECONDS) : // 超时等待
workQueue.take(); // 无限等待(核心线程在这里阻塞,不会退出)
if (r != null) return r;
timedOut = true; // poll 超时返回 null
} catch (InterruptedException retry) {
timedOut = false;
}
}
}
核心线程如何”存活”?
核心线程的生命周期:
runWorker() 循环
|
v
task = getTask()
|
v
workQueue.take() <--- 队列为空时,线程在这里阻塞等待
| 这就是核心线程不会退出的原因:
| take() 会一直等,直到队列有新任务
v
拿到任务 → task.run() → 回到循环开头
非核心线程的生命周期:
runWorker() 循环
|
v
task = getTask()
|
v
workQueue.poll(keepAliveTime) <--- 等待 keepAliveTime 后超时
|
v
返回 null → getTask 返回 null → while 循环结束 → 线程退出
6.4 优雅关闭
// shutdown() — 温和关闭:不接受新任务,但会执行完队列中的任务
public void shutdown() {
final ReentrantLock mainLock = this.mainLock;
mainLock.lock();
try {
checkShutdownAccess();
advanceRunState(SHUTDOWN); // 状态改为 SHUTDOWN
interruptIdleWorkers(); // 中断空闲线程(正在 take/poll 的线程)
onShutdown(); // 钩子方法(ScheduledThreadPoolExecutor 用)
} finally {
mainLock.unlock();
}
tryTerminate();
}
// shutdownNow() — 暴力关闭:不接受新任务,清空队列,中断所有线程
public List<Runnable> shutdownNow() {
List<Runnable> tasks;
final ReentrantLock mainLock = this.mainLock;
mainLock.lock();
try {
checkShutdownAccess();
advanceRunState(STOP); // 状态改为 STOP
interruptWorkers(); // 中断所有线程(包括正在执行任务的)
tasks = drainQueue(); // 清空队列,返回未执行的任务
} finally {
mainLock.unlock();
}
tryTerminate();
return tasks;
}
推荐的优雅关闭序列:
// 设计动机:先温和关闭,等一段时间;如果还没关完,再暴力关闭
// 这是 Java 官方文档推荐的最佳实践
public void gracefulShutdown(ExecutorService executor) {
executor.shutdown(); // 第一步:拒绝新任务,等待已提交任务完成
try {
// 第二步:等待已提交的任务执行完毕,最多等 60 秒
if (!executor.awaitTermination(60, TimeUnit.SECONDS)) {
executor.shutdownNow(); // 第三步:超时后强制关闭
// 第四步:再等一次,确认所有线程都已退出
if (!executor.awaitTermination(60, TimeUnit.SECONDS))
System.err.println("线程池未能完全关闭");
}
} catch (InterruptedException ie) {
// 当前线程被中断,立即强制关闭
executor.shutdownNow();
Thread.currentThread().interrupt(); // 保留中断状态
}
}
一句话总结: 线程池通过 ctl 变量管理状态和线程数,execute() 的三级策略(核心线程 → 队列 → 非核心线程 → 拒绝)和 Worker 的 getTask() 循环是理解线程池的关键。
七、CompletableFuture 编排原理
7.1 设计思想
Future 的局限性:
// 传统 Future 的问题:get() 是阻塞的
Future<String> future = executor.submit(() -> queryDatabase());
String result = future.get(); // 阻塞!调用线程什么都干不了,只能等
process(result);
// 如果有多个异步任务要编排,代码变成了"回调地狱"或"阻塞串行"
Future<A> fa = executor.submit(() -> taskA());
A a = fa.get(); // 阻塞
Future<B> fb = executor.submit(() -> taskB(a));
B b = fb.get(); // 阻塞
Future<C> fc = executor.submit(() -> taskC(b));
C c = fc.get(); // 阻塞
CompletableFuture 的解决方案:
// CompletableFuture = Future + 回调 + 组合 + 异常处理
// 设计动机:让异步编程像写同步代码一样流畅,无需阻塞等待
CompletableFuture
.supplyAsync(() -> queryDatabase()) // 异步执行
.thenApply(data -> transform(data)) // 上一步完成后自动执行(无需阻塞)
.thenAccept(result -> sendResponse(result)) // 继续链式处理
.exceptionally(ex -> handleError(ex)); // 统一异常处理
// 组合多个异步任务:
CompletableFuture<String> userFuture = supplyAsync(() -> getUser());
CompletableFuture<String> orderFuture = supplyAsync(() -> getOrder());
// 两个任务并行执行,都完成后合并结果
userFuture.thenCombine(orderFuture, (user, order) ->
merge(user, order));
7.2 内部结构
public class CompletableFuture<T> implements Future<T>, CompletionStage<T> {
// 结果字段:持有正常结果或异常
// null = 未完成
// 正常值 = 完成
// AltResult(exception) = 异常完成
// AltResult(null) = null 结果(区分 null 值和未完成)
volatile Object result;
// 回调栈:Treiber Stack(无锁并发栈)
// 每个注册的回调(thenApply, thenAccept 等)都是一个 Completion 节点
// 新注册的回调通过 CAS 压入栈顶
volatile Completion stack;
}
CompletableFuture 内部的 Completion 栈(Treiber Stack):
+-------------------+
| CompletableFuture |
| result: null | (未完成)
| stack: --------+ |
+----------------+--+
|
v
+-----------+ +-----------+ +-----------+
| Completion|---->| Completion|---->| Completion|----> null
| (thenApply) | (thenAccept) | (exceptionally)
+-----------+ +-----------+ +-----------+
(最后注册的) (第二个注册的) (最先注册的)
注意:这是一个栈结构(后进先出),但实际执行时会遍历整个链
当 complete(value) 被调用时:
1. 设置 result = value
2. 弹出 stack 中的所有 Completion 节点
3. 依次触发每个 Completion 的回调逻辑
7.3 异步回调链执行机制
thenApply 是如何注册回调的:
// 简化后的核心逻辑
public <U> CompletableFuture<U> thenApply(Function<? super T, ? extends U> fn) {
return uniApplyStage(null, fn); // null 表示同步执行(用调用线程或完成线程)
}
public <U> CompletableFuture<U> thenApplyAsync(Function<? super T, ? extends U> fn) {
return uniApplyStage(asyncPool, fn); // asyncPool 通常是 ForkJoinPool.commonPool()
}
private <V> CompletableFuture<V> uniApplyStage(Executor e, Function<? super T, ? extends V> f) {
CompletableFuture<V> d = new CompletableFuture<V>(); // 创建下游 CF
// 如果 this 已经完成了,直接执行 fn
// 否则创建 UniApply 节点,CAS 压入 this.stack
if (e != null || !d.uniApply(this, f, null)) {
UniApply<T,V> c = new UniApply<T,V>(e, d, this, f);
push(c); // CAS 入栈
c.tryFire(SYNC); // 再次尝试(防止在入栈过程中 this 已经完成)
}
return d; // 返回下游 CF,支持链式调用
}
complete() 是如何触发回调链的:
public boolean complete(T value) {
boolean triggered = completeValue(value); // CAS 设置 result
postComplete(); // 触发所有回调
return triggered;
}
// postComplete 的核心逻辑
final void postComplete() {
CompletableFuture<?> f = this; Completion h;
// 遍历 stack 链表,依次触发每个 Completion
while ((h = f.stack) != null ||
(f != this && (h = (f = this).stack) != null)) {
CompletableFuture<?> d; Completion t;
// CAS 弹出栈顶
if (f.casStack(h, t = h.next)) {
if (t != null) {
// 还有更多 Completion,继续处理
if (f != this) {
pushStack(h);
continue;
}
h.next = null;
}
// 触发回调:f 是上游 CF 的结果,h 是注册的回调
f = (d = h.tryFire(NESTED)) == null ? this : d;
}
}
}
线程切换规则:
+---------------------+----------------------------------------+
| 方法 | 执行线程 |
+---------------------+----------------------------------------+
| thenApply(fn) | 如果上游已完成:调用 thenApply 的线程 |
| | 如果上游未完成:完成上游的那个线程 |
+---------------------+----------------------------------------+
| thenApplyAsync(fn) | ForkJoinPool.commonPool() 中的线程 |
+---------------------+----------------------------------------+
| thenApplyAsync( | 指定的 Executor 中的线程 |
| fn, executor) | |
+---------------------+----------------------------------------+
7.4 异常传播机制
// 异常传播:上游的异常会沿着回调链向下传播
CompletableFuture<String> cf = CompletableFuture
.supplyAsync(() -> {
if (true) throw new RuntimeException("出错了");
return "ok";
})
.thenApply(s -> s + " world") // 不会执行(上游异常了)
.thenApply(s -> s.toUpperCase()); // 不会执行(上游异常了)
// cf.get() 会抛出 ExecutionException,cause 是 RuntimeException("出错了")
三种异常处理方式的区别:
// === exceptionally:只处理异常,正常值透传 ===
// 设计动机:类似 try-catch,只在出错时执行
CompletableFuture<String> result = cf
.exceptionally(ex -> {
// 只有上游异常时才执行
return "默认值"; // 返回兜底值,让下游继续正常执行
});
// === handle:无论成功失败都执行,可以转换结果 ===
// 设计动机:类似 try-catch-finally + return,总是执行,总是返回新值
CompletableFuture<String> result = cf
.handle((value, ex) -> {
if (ex != null) {
return "异常处理结果"; // 异常情况
}
return value + " 处理后"; // 正常情况
});
// === whenComplete:无论成功失败都执行,但不改变结果 ===
// 设计动机:类似 try-finally,用于记录日志、释放资源等副作用操作
CompletableFuture<String> result = cf
.whenComplete((value, ex) -> {
// 无论成功失败都执行
// 但不能改变 result 的值(返回值是 void)
if (ex != null) {
log.error("执行失败", ex);
} else {
log.info("执行成功: " + value);
}
});
三种异常处理方式对比:
+------------------+---------+----------+----------+
| 方法 | 正常执行 | 异常执行 | 能否改值 |
+------------------+---------+----------+----------+
| exceptionally | 否 | 是 | 是 |
| handle | 是 | 是 | 是 |
| whenComplete | 是 | 是 | 否 |
+------------------+---------+----------+----------+
一句话总结: CompletableFuture 通过内部的 Treiber Stack 管理回调链,complete() 触发链式执行,实现了无阻塞的异步编排能力。
八、ThreadLocal 原理与内存泄漏
8.1 ThreadLocal 结构
Thread 对象
+-------------------------------+
| Thread |
| |
| threadLocals: ThreadLocalMap |--+
+-------------------------------+ |
|
+-------------------------+
|
v
+------ ThreadLocalMap ------+
| Entry[] table |
| |
| [0] [1] [2] [3] ... |
| | | | | |
| v v v v |
| Entry Entry null Entry |
+----+------------------------+
|
v
+--- Entry extends WeakReference<ThreadLocal<?>> ---+
| key = WeakReference<ThreadLocal<?>> |
| value = Object(实际存储的值) |
+---------------------------------------------------+
完整的引用关系图:
栈内存 堆内存
+--------+
| Thread | --强引用--> ThreadLocalMap --强引用--> Entry[]
+--------+ |
v
+----------+ Entry
| ThreadLocal | <--弱引用-- key (WeakReference) |
| 变量 tl | --强引用--> value --强引用--> 实际值对象
+----------+
一个线程可以有多个 ThreadLocal,每个 ThreadLocal 对应 Entry[] 中的一个槽位
// ThreadLocal 的核心方法
// 设计动机:每个线程有自己独立的变量副本,天然线程安全,无需同步
public class ThreadLocal<T> {
public T get() {
Thread t = Thread.currentThread();
ThreadLocalMap map = getMap(t); // 获取当前线程的 ThreadLocalMap
if (map != null) {
ThreadLocalMap.Entry e = map.getEntry(this); // this 就是 ThreadLocal 对象
if (e != null) {
T result = (T) e.value;
return result;
}
}
return setInitialValue();
}
public void set(T value) {
Thread t = Thread.currentThread();
ThreadLocalMap map = getMap(t);
if (map != null)
map.set(this, value); // key = this(ThreadLocal对象), value = 用户的值
else
createMap(t, value);
}
// 重要!使用完毕必须调用 remove()
public void remove() {
ThreadLocalMap m = getMap(Thread.currentThread());
if (m != null)
m.remove(this);
}
ThreadLocalMap getMap(Thread t) {
return t.threadLocals; // 每个线程自己的 map
}
}
8.2 哈希冲突解决
ThreadLocalMap 使用开放定址法(线性探测) 而不是 HashMap 的链地址法。
为什么用开放定址法?
1. ThreadLocal 数量通常很少(一个线程不会有几百个 ThreadLocal)
2. 开放定址法对 CPU 缓存更友好(数据连续存储在数组中)
3. 避免了链表节点的额外内存开销
线性探测过程:
假设 table 长度为 16,ThreadLocal 的 hashCode 映射到 index=5
设置 tl1(hash→5): table[5] = Entry(tl1, value1)
设置 tl2(hash→5,冲突!): table[5] 被占 → 探测 table[6] = Entry(tl2, value2)
设置 tl3(hash→5,冲突!): table[5] 被占 → table[6] 被占 → table[7] = Entry(tl3, value3)
table: [0] [1] [2] [3] [4] [5] [6] [7] [8] ...
tl1 tl2 tl3
魔数 0x61c88647(斐波那契散列):
// 设计动机:让 ThreadLocal 在 table 中尽量均匀分布,减少冲突
// 0x61c88647 是黄金分割比 * 2^32 的近似值
private static final int HASH_INCREMENT = 0x61c88647;
private static AtomicInteger nextHashCode = new AtomicInteger();
private static int nextHashCode() {
return nextHashCode.getAndAdd(HASH_INCREMENT);
}
// 连续的 ThreadLocal 对象的 hashCode 差值恒为 0x61c88647
// 对 2 的幂次方取模后,这些值会均匀分布在数组中
// 例如 table 大小为 16 时:
// tl0: 0x61c88647 & 15 = 7
// tl1: 0xc3910c8e & 15 = 14
// tl2: 0x255992d5 & 15 = 5
// tl3: 0x8722191c & 15 = 12
// 分布非常均匀!
8.3 内存泄漏深度分析
引用链分析:
正常使用时的引用关系:
ThreadLocal对象 tl
^ ^
| 强引用(栈变量) | 弱引用(Entry.key)
| |
栈帧 Entry
|
| 强引用
v
Value 对象
当 tl = null(ThreadLocal 变量被回收)后:
(已GC) 弱引用断裂
|
Entry
key = null <--- key 被 GC 了
|
| 强引用 <--- value 还在!没人引用 Entry 本身
v
Value 对象 <--- 无法被 GC,内存泄漏!
引用链:Thread → ThreadLocalMap → Entry[] → Entry → Value
只要 Thread 活着,这条链就断不了
为什么在线程池中特别危险?
正常线程:
线程执行完毕 → 线程对象被 GC → ThreadLocalMap 被 GC → 所有 Entry 和 Value 被 GC
不会泄漏!
线程池中的线程:
线程执行完任务 → 线程不会销毁,回到池中等待下一个任务
→ ThreadLocalMap 一直存在
→ Entry(key=null, value=大对象) 一直存在
→ 内存泄漏!
而且如果同一个线程不断接收新任务,每次任务都使用新的 ThreadLocal
→ ThreadLocalMap 中积累越来越多的 stale Entry
→ 内存泄漏越来越严重
泄漏示例:
// 典型的内存泄漏场景
public class ThreadLocalLeakDemo {
// 线程池,线程长期存活
private static final ExecutorService pool = Executors.newFixedThreadPool(10);
public static void main(String[] args) {
for (int i = 0; i < 100000; i++) {
pool.execute(() -> {
// 每次任务创建一个新的 ThreadLocal
ThreadLocal<byte[]> tl = new ThreadLocal<>();
tl.set(new byte[1024 * 1024]); // 1MB 数据
// 使用 tl ...
// 忘记调用 tl.remove()!
// 任务结束后 tl 局部变量出栈,ThreadLocal 对象只剩弱引用
// 下次 GC 时 key 变成 null,但 1MB 的 value 还在!
});
}
}
}
正确用法:
// 最佳实践:始终在 finally 中调用 remove()
private static final ThreadLocal<UserContext> userContext = new ThreadLocal<>();
public void handleRequest(Request request) {
try {
userContext.set(new UserContext(request.getUserId()));
// 业务逻辑...
processRequest();
} finally {
userContext.remove(); // 必须在 finally 中清理!
}
}
ThreadLocalMap 的自我清理机制:
虽然 ThreadLocalMap 在 get()、set()、remove() 时会顺带清理遇到的 stale Entry(key == null 的 Entry),但这只是”碰巧清理”,不能完全避免泄漏。因为如果那些 stale Entry 所在的槽位一直没有被访问到,它们就永远不会被清理。
8.4 InheritableThreadLocal
问题: 父线程的 ThreadLocal 值默认无法传递给子线程。
// InheritableThreadLocal 的设计动机:
// 某些场景下需要在子线程中访问父线程的上下文信息(如 traceId、userId)
public class InheritableThreadLocalDemo {
private static final InheritableThreadLocal<String> traceId =
new InheritableThreadLocal<>();
public static void main(String[] args) {
traceId.set("trace-001");
// 创建子线程时,会复制父线程的 inheritableThreadLocals
new Thread(() -> {
System.out.println(traceId.get()); // 输出:trace-001
}).start();
}
}
原理: Thread 的构造函数中,如果父线程有 inheritableThreadLocals,会把父线程的 map 复制一份给子线程(浅拷贝)。
InheritableThreadLocal 在线程池中的问题:
线程池中的线程是预创建的,不是每次提交任务时才创建。
所以子线程的 inheritableThreadLocals 是在线程创建时从父线程复制的,
而不是在提交任务时。
父线程 A(traceId=001)提交任务 → 线程池线程 T1(创建时父线程是 main)
T1 的 inheritableThreadLocals 是 main 线程的,不是线程 A 的!
解决方案:阿里巴巴的 TransmittableThreadLocal (TTL)
原理:在任务提交时捕获当前线程的 ThreadLocal 值,
在任务执行时恢复到工作线程中,任务结束后恢复原值。
// TransmittableThreadLocal 的使用方式(需要额外依赖 transmittable-thread-local)
// 设计动机:解决线程池场景下线程本地变量传递的问题
// 方式一:修饰线程池
ExecutorService ttlExecutor = TtlExecutors.getTtlExecutorService(
Executors.newFixedThreadPool(10));
// 方式二:修饰 Runnable
Runnable ttlRunnable = TtlRunnable.get(() -> {
// 这里可以拿到提交任务时的 TransmittableThreadLocal 值
});
executor.submit(ttlRunnable);
一句话总结: ThreadLocal 通过为每个线程维护独立的 ThreadLocalMap 实现线程隔离,但在线程池中使用时必须在 finally 中调用 remove(),否则 Entry 的弱引用 key 被 GC 后 value 无法回收,导致内存泄漏。
九、并发设计模式
9.1 生产者-消费者模式
核心思想: 通过一个共享队列解耦生产者和消费者,两者不直接通信。
+----------+ put() +----------------+ take() +----------+
| 生产者 1 | ---------> | | ----------> | 消费者 1 |
+----------+ | BlockingQueue | +----------+
+----------+ put() | (缓冲区) | take() +----------+
| 生产者 2 | ---------> | | ----------> | 消费者 2 |
+----------+ +----------------+ +----------+
设计优势:
1. 解耦:生产者和消费者不需要知道对方的存在
2. 削峰:队列缓冲突发流量
3. 异步:生产者不需要等消费者处理完
// 实际场景:异步日志写入
// 设计动机:日志写磁盘很慢(IO 操作),不应该阻塞业务线程
// 业务线程把日志消息放入队列,专门的日志线程从队列取出并写入文件
public class AsyncLogger {
private final BlockingQueue<String> logQueue = new LinkedBlockingQueue<>(10000);
private volatile boolean running = true;
// 生产者:业务线程调用
public void log(String message) {
if (!logQueue.offer(message)) {
// 队列满了,丢弃日志(降级策略)
System.err.println("日志队列满,丢弃: " + message);
}
}
// 消费者:单独的日志线程
public void start() {
Thread logThread = new Thread(() -> {
while (running || !logQueue.isEmpty()) {
try {
String msg = logQueue.poll(100, TimeUnit.MILLISECONDS);
if (msg != null) {
writeToFile(msg); // 慢速 IO 操作
}
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
}
}
}, "async-logger");
logThread.setDaemon(true);
logThread.start();
}
public void shutdown() {
running = false; // 停止接收,处理完队列中剩余的日志后退出
}
private void writeToFile(String message) {
// 实际的文件写入操作...
}
}
一句话总结: 生产者-消费者模式通过 BlockingQueue 解耦、缓冲和异步化,是并发编程中最基础也最实用的设计模式。
9.2 读写分离模式
核心思想: 读操作之间不互斥,只有写操作需要独占。适合读多写少的场景。
ReadWriteLock 的访问规则:
读者 写者
+-----+-----+-----+ +-----+-----+-----+
| R | R | R | | W | | |
| 同 | 时 | 读 | | 独 | 阻塞 | 阻塞 |
+-----+-----+-----+ +-----+-----+-----+
读-读:共享(并发读)
读-写:互斥(有人读时不能写,有人写时不能读)
写-写:互斥(同时只能有一个写者)
// 设计动机:对于缓存这类读远多于写的数据结构
// 如果用 synchronized,读操作也会互斥,严重浪费并发度
// ReadWriteLock 让读操作完全并行,只在写时才加锁
public class ReadWriteCache<K, V> {
private final Map<K, V> cache = new HashMap<>();
private final ReadWriteLock lock = new ReentrantReadWriteLock();
private final Lock readLock = lock.readLock();
private final Lock writeLock = lock.writeLock();
// 读操作:多个线程可以同时执行
public V get(K key) {
readLock.lock();
try {
return cache.get(key);
} finally {
readLock.unlock();
}
}
// 写操作:独占执行
public void put(K key, V value) {
writeLock.lock();
try {
cache.put(key, value);
} finally {
writeLock.unlock();
}
}
// 锁降级:写锁 → 读锁(安全的)
// 设计动机:写完数据后,不释放写锁直接获取读锁,
// 然后释放写锁。这样可以在写操作后立即以读模式继续使用数据,
// 同时允许其他读者进入。
public V putAndGet(K key, V value) {
writeLock.lock();
try {
cache.put(key, value);
readLock.lock(); // 在持有写锁的情况下获取读锁(锁降级)
} finally {
writeLock.unlock(); // 释放写锁,此时仍持有读锁
}
try {
return cache.get(key); // 以读模式访问
} finally {
readLock.unlock();
}
}
// 注意:读锁不能升级为写锁(会死锁),只能写锁降级为读锁
}
一句话总结: ReadWriteLock 通过读共享/写独占的策略,在读多写少场景下大幅提升并发性能,是 synchronized 的精细化替代方案。
9.3 分而治之 --- ForkJoinPool
核心思想: 把大任务拆分成小任务(Fork),分别执行后合并结果(Join)。
ForkJoin 的分治过程:
[计算 1~1000000 的和]
/ \
Fork Fork
/ \
[计算 1~500000] [计算 500001~1000000]
/ \ / \
Fork Fork Fork Fork
/ \ / \
[1~250000] [250001~500000] [500001~750000] [750001~1000000]
\ / \ /
\--Join---/ \----Join------/
\ /
\---------Join--------------/
|
最终结果:500000500000
工作窃取(Work-Stealing)算法:
每个工作线程有自己的双端队列(Deque):
线程 1 的队列(自己从头部取):
HEAD --> [task1] [task2] [task3] [task4] <-- TAIL
线程 2 的队列(自己从头部取):
HEAD --> [task5] <-- TAIL
线程 3 的队列(空闲!):
HEAD --> <empty> <-- TAIL
工作窃取过程:
线程 3 空闲了!从线程 1 的队列尾部偷一个任务:
线程 1 的队列:
HEAD --> [task1] [task2] [task3] <-- TAIL (task4 被偷走了)
线程 3 执行偷来的 task4
设计动机:
1. 自己的任务从头部取(LIFO),偷别人的从尾部取(FIFO),减少竞争
2. LIFO 取自己的任务可以利用 CPU 缓存局部性(最近 Fork 出的子任务数据还在缓存中)
3. FIFO 偷大任务(早期 Fork 的任务通常更大),偷一个大任务比偷多个小任务效率高
// RecursiveTask(有返回值) vs RecursiveAction(无返回值)
// 设计动机:展示 ForkJoin 的使用方式和阈值控制
public class ParallelSum extends RecursiveTask<Long> {
private static final int THRESHOLD = 10000; // 阈值:小于此值直接计算
private final long[] array;
private final int start, end;
public ParallelSum(long[] array, int start, int end) {
this.array = array;
this.start = start;
this.end = end;
}
@Override
protected Long compute() {
int length = end - start;
// 任务足够小,直接计算(不再分割)
if (length <= THRESHOLD) {
long sum = 0;
for (int i = start; i < end; i++) {
sum += array[i];
}
return sum;
}
// 任务太大,分成两半
int mid = start + length / 2;
ParallelSum leftTask = new ParallelSum(array, start, mid);
ParallelSum rightTask = new ParallelSum(array, mid, end);
leftTask.fork(); // 异步执行左半部分
Long rightResult = rightTask.compute(); // 当前线程直接计算右半部分
Long leftResult = leftTask.join(); // 等待左半部分完成
return leftResult + rightResult; // 合并结果
}
public static void main(String[] args) {
long[] array = new long[10_000_000];
// 填充数组...
ForkJoinPool pool = new ForkJoinPool(); // 默认线程数 = CPU 核心数
long result = pool.invoke(new ParallelSum(array, 0, array.length));
System.out.println("Sum = " + result);
}
}
ForkJoinPool 与 Java 8 并行流的关系:
// Java 8 的并行流底层就是用 ForkJoinPool.commonPool() 实现的
long sum = LongStream.rangeClosed(1, 1_000_000)
.parallel() // 使用 ForkJoinPool.commonPool()
.sum();
// 注意:共享的 commonPool 可能导致问题
// 如果一个并行流任务很慢(比如有 IO),会影响其他使用 commonPool 的任务
// 解决方案:使用自定义的 ForkJoinPool
ForkJoinPool customPool = new ForkJoinPool(4);
long sum = customPool.submit(() ->
LongStream.rangeClosed(1, 1_000_000).parallel().sum()
).get();
一句话总结: ForkJoinPool 通过分治策略拆分任务 + 工作窃取算法平衡负载,是计算密集型并行任务的理想执行框架。
9.4 Future 模式
核心思想: 提交任务时立即返回一个”凭证”(Future),任务在后台异步执行,需要结果时再通过凭证获取。
传统同步模式(串行等待):
主线程:[提交任务] --等待30秒--> [拿到结果] --> [继续工作]
Future 模式(异步获取):
主线程:[提交任务] --> [拿到 Future 凭证] --> [做其他事情...] --> [future.get() 拿结果]
工作线程: [后台执行任务 30秒]
类比:在餐厅点餐后拿到取餐号(Future),不需要站在窗口等,
可以先去找座位、倒水,等号码被叫到再去取餐。
// 设计动机:解耦任务的提交和结果的获取,允许调用方在等待期间做其他事情
public class FuturePatternDemo {
public static void main(String[] args) throws Exception {
ExecutorService executor = Executors.newCachedThreadPool();
// 同时提交多个耗时任务
Future<String> userFuture = executor.submit(() -> queryUserService()); // 2秒
Future<String> orderFuture = executor.submit(() -> queryOrderService()); // 3秒
Future<String> stockFuture = executor.submit(() -> queryStockService()); // 1秒
// 三个任务并行执行,总耗时 = max(2, 3, 1) = 3秒
// 而非串行的 2 + 3 + 1 = 6秒
String user = userFuture.get(); // 可能已经完成,无需等待
String order = orderFuture.get();
String stock = stockFuture.get();
return merge(user, order, stock);
}
}
一句话总结: Future 模式通过异步执行 + 延迟获取结果,将任务的提交与结果的消费解耦,是实现并行化的基础模式。
9.5 Thread-Per-Message vs Worker Thread
Thread-Per-Message(每请求一线程):
传统模式:
请求1 --> new Thread() --> 处理 --> 线程销毁
请求2 --> new Thread() --> 处理 --> 线程销毁
请求3 --> new Thread() --> 处理 --> 线程销毁
问题:线程创建和销毁开销大,高并发时可能 OOM
Worker Thread(线程池模式):
线程池模式:
线程池 [Thread1, Thread2, Thread3]
请求1 --> Thread1 执行 --> 归还
请求2 --> Thread2 执行 --> 归还
请求3 --> Thread3 执行 --> 归还
请求4 --> 等待...直到有线程空闲
优势:线程复用,资源可控
Java 21 虚拟线程对 Thread-Per-Message 的影响:
// Java 21 之前:Thread-Per-Message 模式不可行(平台线程太重)
// 10000 个并发请求 = 10000 个平台线程 = ~10GB 内存(每个线程约 1MB 栈空间)
// Java 21 之后:虚拟线程让 Thread-Per-Message 重新可行
// 虚拟线程极其轻量(~几KB),百万级并发不是问题
// 设计动机:简化并发编程模型,回归简单直观的"一个请求一个线程"
try (var executor = Executors.newVirtualThreadPerTaskExecutor()) {
// 每个任务一个虚拟线程,但虚拟线程几乎没有创建开销
for (int i = 0; i < 100_000; i++) {
executor.submit(() -> {
// IO 密集型操作(网络请求、数据库查询)
// 虚拟线程在 IO 时会自动让出底层平台线程
String result = httpClient.send(request, bodyHandler);
processResult(result);
});
}
}
// 虚拟线程的原理:
// 多个虚拟线程共享少量平台线程(载体线程)
// 当虚拟线程执行 IO 操作时,会被"挂载"到载体线程上
// IO 阻塞时,虚拟线程被"卸载",载体线程去执行其他虚拟线程
// IO 完成后,虚拟线程被重新"挂载"到某个载体线程上继续执行
// 平台线程 虚拟线程
// [载体线程1] <---- VT1(运行中)
// [载体线程2] <---- VT2(运行中)
// VT3(等待IO,已卸载)
// VT4(等待IO,已卸载)
// VT5(等待调度)
// VT3 的 IO 完成后,挂载到空闲的载体线程上继续执行
一句话总结: Worker Thread(线程池)是传统 Java 并发的标准模式;而 Java 21 的虚拟线程让 Thread-Per-Message 模式重获新生,以极低的开销实现”一个请求一个线程”的简洁编程模型。
总结
+----------------------------------------------------------+
| Java 并发编程知识体系全景图 |
+----------------------------------------------------------+
| |
| 底层基石 |
| ├── JMM(happens-before + 内存屏障) |
| ├── CAS(CPU 级原子指令) |
| └── volatile(可见性 + 有序性) |
| |
| 锁机制 |
| ├── synchronized(偏向锁→轻量级锁→重量级锁) |
| └── AQS(CLH队列 + state + CAS) |
| ├── ReentrantLock(独占可重入) |
| ├── Semaphore(共享许可) |
| ├── CountDownLatch(共享倒计时) |
| └── ReentrantReadWriteLock(读写分离) |
| |
| 无锁方案 |
| ├── Atomic 系列(CAS 自旋) |
| └── LongAdder(Cell 数组分散热点) |
| |
| 线程管理 |
| ├── ThreadLocal(线程隔离 + 注意内存泄漏) |
| ├── 线程池(ctl状态机 + Worker生命周期) |
| └── ForkJoinPool(分治 + 工作窃取) |
| |
| 异步编排 |
| ├── Future(异步凭证) |
| └── CompletableFuture(回调链 + Treiber Stack) |
| |
| 设计模式 |
| ├── 生产者-消费者(BlockingQueue) |
| ├── 读写分离(ReadWriteLock) |
| └── Thread-Per-Message → 虚拟线程(Java 21) |
| |
+----------------------------------------------------------+
学习路径建议:
- 先理解 JMM 和 volatile,这是所有并发机制的理论基础
- 再学 synchronized 的锁升级,理解 JVM 层面的优化思路
- 然后学 CAS 和 AQS,这是 JUC 包的实现基础
- 最后学线程池、CompletableFuture 等高层抽象,会用且知道为什么这样设计
- ThreadLocal 要特别注意内存泄漏问题,面试高频考点
核心思想贯穿全文:
- 空间换时间:ThreadLocal 用每线程一份副本换取无锁;LongAdder 用 Cell 数组换取低竞争
- 分而治之:ForkJoinPool 把大任务拆小;LongAdder 把热点变量分散
- 乐观 vs 悲观:CAS 是乐观策略(假设不冲突,冲突了重试);synchronized 是悲观策略(假设会冲突,先加锁)
- 升级而非降级:synchronized 的锁只升级不降级,因为降级判断的成本太高
- 延迟初始化:偏向锁、线程池的核心线程都是”需要时才创建”的思路