Java 并发编程技术深度精讲

本文从底层原理出发,结合源码分析与类比思维,深入剖析 Java 并发编程的核心知识体系。 每个知识点配有 ASCII 图解、设计动机分析和一句话总结。


一、Java 内存模型(JMM)

1.1 为什么需要 JMM

类比:咖啡店模型

想象一家大型咖啡店,有一个总仓库(主内存)和多个咖啡师工作台(工作内存)。

               +---------------------+
               |      主内存(仓库)    |
               |  sugar = 100        |
               |  milk  = 200        |
               +-----+--------+------+
                     |        |
              read/load    read/load
                     |        |
          +----------+--+  +--+----------+
          | 工作内存 A    |  | 工作内存 B    |
          | (咖啡师小张)  |  | (咖啡师小李)  |
          | sugar = 100  |  | sugar = 100  |
          +--------------+  +--------------+

问题一:可见性问题

小张在自己的工作台上把 sugar 减少到了 80(assign + store + write 回仓库),但小李工作台上的 sugar 还是 100——他根本不知道仓库里的 sugar 已经变了。这就是可见性问题:一个线程对共享变量的修改,另一个线程无法立即看到。

问题二:有序性问题

经理写了一张操作清单:“先加糖,再加奶,最后搅拌”。但咖啡师为了效率,可能自己调整顺序——“先加奶,再加糖,最后搅拌”。在单人操作时结果不变,但如果两个咖啡师需要协作(比如一个加糖后另一个才能加奶),这种重排序就会出问题。这就是有序性问题

问题三:原子性问题

小张正在称 sugar(读取重量 → 舀一勺 → 放回去),这个操作不是一瞬间完成的。如果在他称到一半时小李也来取 sugar,就可能出现数据混乱。这就是原子性问题

JMM 就是 Java 定义的一套规范,它规定了:

  • 线程如何与主内存交互
  • 什么条件下一个线程的修改对另一个线程可见
  • 哪些操作可以重排序,哪些不可以

JMM 屏蔽了底层硬件和操作系统的差异,让 Java 程序在不同平台上有一致的并发行为。

1.2 JMM 的核心规则

JMM 定义了 8 种内存交互操作,精确描述了变量如何从主内存到工作内存、再从工作内存回到主内存的全过程:

        主内存                              工作内存
  +----------------+                  +----------------+
  |                |   ① read        |                |
  |   变量 X = 10  | ------------->  |   (传输中)     |
  |                |   ② load        |                |
  |                | ------------->  |   变量副本 X=10  |
  |                |                  |                |
  |                |   ③ use         |                |
  |                | <-------------  |  --> 执行引擎    |
  |                |                  |                |
  |                |   ④ assign      |                |
  |                | ------------->  |   X = 20       |
  |                |                  |  (执行引擎写回)  |
  |                |   ⑤ store       |                |
  |                | <-------------  |   (传输中)     |
  |                |   ⑥ write       |                |
  |   变量 X = 20  | <-------------  |                |
  |                |                  |                |
  |  ⑦ lock(加锁) |                  |                |
  |  ⑧ unlock(解锁)|                  |                |
  +----------------+                  +----------------+

8 种操作详解:

操作作用域说明
read(读取)主内存将变量值从主内存传输到工作内存
load(载入)工作内存将 read 得到的值放入工作内存的变量副本
use(使用)工作内存将变量副本的值传递给执行引擎
assign(赋值)工作内存将执行引擎的值赋给工作内存的变量副本
store(存储)工作内存将变量副本的值传输到主内存
write(写入)主内存将 store 得到的值写入主内存的变量
lock(锁定)主内存将变量标识为线程独占
unlock(解锁)主内存释放变量的线程独占标识

关键约束规则:

  1. read 和 load、store 和 write 必须成对出现,不允许单独使用
  2. 不允许线程丢弃最近的 assign 操作(修改后必须同步回主内存)
  3. 不允许线程无原因地把工作内存的值同步回主内存(没有 assign 就不能 store + write)
  4. 新变量只能在主内存中诞生
  5. 一个变量同一时刻只允许一条线程对其 lock,但可以被同一线程多次 lock(可重入)
  6. lock 操作会清空工作内存中该变量的值,使用前需重新 load 或 assign
  7. unlock 之前必须先把变量同步回主内存(store + write)

1.3 happens-before 原则

happens-before 是 JMM 的核心概念。如果操作 A happens-before 操作 B,那么 A 的结果对 B 一定可见。注意:happens-before 不是说 A 一定在 B 之前执行,而是说 A 的执行结果对 B 可见。

8 条 happens-before 规则:

规则 1:程序顺序规则(Program Order Rule)

在一个线程中,按照程序代码顺序,前面的操作 happens-before 后面的操作。

// 在同一个线程中:
int a = 1;      // 操作 A
int b = a + 1;  // 操作 B
// A happens-before B,所以 b 一定等于 2
// 注意:编译器可以重排序,但必须保证结果一致(as-if-serial 语义)

规则 2:监视器锁规则(Monitor Lock Rule)

对一个锁的 unlock 操作 happens-before 后续对同一个锁的 lock 操作。

synchronized (lock) {
    // 操作 A:x = 10
}
// unlock(lock) happens-before 下一次 lock(lock)
synchronized (lock) {
    // 操作 B:读取 x,一定能看到 10
}

规则 3:volatile 变量规则(Volatile Variable Rule)

对一个 volatile 变量的写操作 happens-before 后续对同一 volatile 变量的读操作。

volatile boolean flag = false;
int data = 0;

// 线程 A
data = 42;          // 普通写
flag = true;        // volatile 写(happens-before 线程 B 的 volatile 读)

// 线程 B
if (flag) {         // volatile 读
    System.out.println(data); // 一定输出 42
}

规则 4:线程启动规则(Thread Start Rule)

Thread 对象的 start() 方法 happens-before 此线程的每一个动作。

int x = 10;
Thread t = new Thread(() -> {
    // 这里一定能看到 x = 10
    System.out.println(x); // 一定输出 10
});
t.start(); // start() happens-before 线程 t 中的所有操作

规则 5:线程终止规则(Thread Termination Rule)

线程中的所有操作 happens-before 对此线程的终止检测(Thread.join() 或 Thread.isAlive())。

Thread t = new Thread(() -> {
    data = 42; // 线程 t 中的操作
});
t.start();
t.join(); // join() 返回意味着线程 t 中的所有操作 happens-before 这一行之后的代码
System.out.println(data); // 一定能看到 42

规则 6:线程中断规则(Thread Interruption Rule)

对线程 interrupt() 的调用 happens-before 被中断线程检测到中断事件(Thread.interrupted() 或 isInterrupted())。

// 线程 A
sharedData = 99;
thread.interrupt(); // 中断操作 happens-before 线程 B 检测到中断

// 线程 B(被中断的线程)
if (Thread.interrupted()) {
    // 一定能看到 sharedData = 99
}

规则 7:对象终结规则(Finalizer Rule)

一个对象的构造函数执行结束 happens-before 它的 finalize() 方法的开始。

public class MyObject {
    private int value;

    public MyObject() {
        this.value = 42; // 构造函数中的赋值
    }

    @Override
    protected void finalize() {
        // 这里一定能看到 value = 42
        System.out.println(value);
    }
}

规则 8:传递性规则(Transitivity)

如果 A happens-before B,且 B happens-before C,那么 A happens-before C。

// 线程 A
x = 10;            // (1) 普通写
flag = true;       // (2) volatile 写

// 线程 B
if (flag) {        // (3) volatile 读
    // 由传递性:(1) hb (2)(程序顺序规则),(2) hb (3)(volatile 规则)
    // 所以 (1) hb (3),x 一定等于 10
    assert x == 10; // 成立
}

1.4 指令重排序

现代处理器和编译器为了提高性能,会对指令进行重排序。JMM 将重排序分为三类:

源代码
  |
  v
+------------------+
| ① 编译器优化重排序 |  编译器在不改变单线程语义的前提下重排语句
+------------------+
  |
  v
+------------------+
| ② 指令级并行重排序 |  处理器利用指令级并行技术重排指令
+------------------+
  |
  v
+------------------+
| ③ 内存系统重排序   |  由于缓存和读写缓冲区,内存操作看起来是乱序的
+------------------+
  |
  v
最终执行的指令序列

重排序导致的典型 Bug:

public class ReorderingExample {
    private static int x = 0, y = 0;
    private static int a = 0, b = 0;

    public static void main(String[] args) throws InterruptedException {
        Thread t1 = new Thread(() -> {
            a = 1;    // (1)
            x = b;    // (2)
        });
        Thread t2 = new Thread(() -> {
            b = 1;    // (3)
            y = a;    // (4)
        });
        t1.start();
        t2.start();
        t1.join();
        t2.join();

        // 直觉上 x == 0 && y == 0 不可能发生
        // 但由于指令重排序,(1)(2) 可能变成 (2)(1),(3)(4) 变成 (4)(3)
        // 执行顺序:x = b(0) → y = a(0) → a = 1 → b = 1
        // 结果:x == 0 && y == 0  —— 违反直觉!
        System.out.println("x=" + x + ", y=" + y);
    }
}

为什么要重排序? 因为 CPU 流水线需要尽量减少停顿(stall)。如果前一条指令在等待内存读取,CPU 不会干等,而是先执行后面不依赖该结果的指令。这对单线程是安全的(as-if-serial),但对多线程可能引发可见性和有序性问题。

一句话总结: JMM 是 Java 为了屏蔽硬件差异而定义的内存访问规范,通过 happens-before 原则和内存屏障来保证多线程环境下的可见性、有序性和原子性。


二、volatile 深度剖析

2.1 volatile 的两个语义

语义一:可见性

不加 volatile 时的问题:

public class VisibilityProblem {
    private static boolean running = true; // 没有 volatile

    public static void main(String[] args) throws InterruptedException {
        Thread t = new Thread(() -> {
            int count = 0;
            // 线程可能把 running 缓存在工作内存或寄存器中
            // 永远看不到 main 线程的修改,导致死循环
            while (running) {
                count++;
            }
            System.out.println("停止,count=" + count);
        });
        t.start();
        Thread.sleep(1000);
        running = false; // main 线程修改了 running
        // 但子线程可能永远不会停止!
        System.out.println("main 已设置 running=false");
    }
}

加了 volatile 后的修复:

public class VisibilityFixed {
    // volatile 保证每次读都从主内存读取,每次写都立即刷回主内存
    private static volatile boolean running = true;

    public static void main(String[] args) throws InterruptedException {
        Thread t = new Thread(() -> {
            int count = 0;
            while (running) { // 每次循环都从主内存读取最新值
                count++;
            }
            System.out.println("停止,count=" + count);
        });
        t.start();
        Thread.sleep(1000);
        running = false; // 写入后立即刷新到主内存
        // 子线程一定会在下次循环检查时看到 false 并退出
    }
}

语义二:有序性(禁止指令重排序)

volatile 通过插入内存屏障(Memory Barrier) 来禁止重排序。JMM 规定了四种屏障:

+-------------+--------------------------------------------------+
|  屏障类型     |  说明                                            |
+-------------+--------------------------------------------------+
| LoadLoad    |  Load1; LoadLoad; Load2                          |
|             |  确保 Load1 的数据装载先于 Load2 及其后所有装载指令  |
+-------------+--------------------------------------------------+
| StoreStore  |  Store1; StoreStore; Store2                      |
|             |  确保 Store1 的数据刷新到主内存先于 Store2 及其后   |
+-------------+--------------------------------------------------+
| LoadStore   |  Load1; LoadStore; Store2                        |
|             |  确保 Load1 装载数据先于 Store2 及其后刷新到主内存   |
+-------------+--------------------------------------------------+
| StoreLoad   |  Store1; StoreLoad; Load2                        |
|             |  确保 Store1 刷新到主内存先于 Load2 及其后装载指令   |
|             |  开销最大,是全能屏障                               |
+-------------+--------------------------------------------------+

volatile 写操作的屏障插入策略:

普通读/写操作
    |
 StoreStore 屏障   <-- 防止上面的普通写与下面的 volatile 写重排序
    |
volatile 写操作
    |
 StoreLoad 屏障    <-- 防止 volatile 写与下面可能的 volatile 读/写重排序
    |
后续操作

volatile 读操作的屏障插入策略:

volatile 读操作
    |
 LoadLoad 屏障     <-- 防止 volatile 读与下面的普通读重排序
    |
 LoadStore 屏障    <-- 防止 volatile 读与下面的普通写重排序
    |
后续操作

2.2 volatile 为什么不保证原子性

以经典的 i++ 为例:

public class AtomicityProblem {
    private static volatile int count = 0;

    public static void main(String[] args) throws InterruptedException {
        // 开 10 个线程,每个线程自增 10000 次
        Thread[] threads = new Thread[10];
        for (int i = 0; i < 10; i++) {
            threads[i] = new Thread(() -> {
                for (int j = 0; j < 10000; j++) {
                    count++; // 这不是原子操作!
                }
            });
            threads[i].start();
        }
        for (Thread t : threads) t.join();

        // 预期 100000,实际结果小于 100000
        System.out.println("count = " + count);
    }
}

count++ 的字节码分析:

// count++ 被编译为 4 条字节码指令:
getstatic   count   // ① 从主内存读取 count 的值到操作数栈(volatile 读,保证可见性)
iconst_1            // ② 将常数 1 压入操作数栈
iadd                // ③ 栈顶两个值相加
putstatic   count   // ④ 将结果写回 count(volatile 写,保证可见性)

竞态条件的 ASCII 时间线:

时间轴 ----->

线程 A:  getstatic(count=10)  iconst_1  iadd(=11)  putstatic(count=11)
              |                                          |
线程 B:       |    getstatic(count=10)  iconst_1  iadd(=11)  putstatic(count=11)
              |         |
              +--- 两个线程都读到了 count=10

结果:两个线程各自增了一次,但 count 只从 10 变成了 11,丢失了一次自增!

本质原因: volatile 保证了每次读写都直接操作主内存(可见性),但 i++ 是一个**“读-改-写”** 的复合操作。volatile 只能保证每一步单独的可见性,无法保证这三步作为一个整体的原子性。在”读”和”写”之间,其他线程可以插入。

2.3 volatile 的典型使用场景

场景一:状态标志位

// 设计动机:一个线程写,多个线程读,不需要原子的读-改-写
// volatile 足够保证可见性,比 synchronized 开销小得多
public class GracefulShutdown {
    private volatile boolean shutdownRequested = false;

    public void shutdown() {
        shutdownRequested = true; // 只有一个线程会调用
    }

    public void doWork() {
        while (!shutdownRequested) { // 多个工作线程检查标志
            // 执行任务...
        }
        // 清理资源...
    }
}

场景二:双重检查锁定的单例模式

public class Singleton {
    // 为什么必须加 volatile?
    // 因为 new Singleton() 不是原子操作,分三步:
    // ① 分配内存空间
    // ② 初始化对象
    // ③ 将引用指向分配的内存地址
    // 如果不加 volatile,②③ 可能重排序,另一个线程可能拿到未初始化的对象
    private static volatile Singleton instance;

    private Singleton() {}

    public static Singleton getInstance() {
        if (instance == null) {               // 第一次检查(无锁,快速路径)
            synchronized (Singleton.class) {
                if (instance == null) {       // 第二次检查(有锁,防止重复创建)
                    instance = new Singleton();
                }
            }
        }
        return instance;
    }
}

不加 volatile 时重排序导致的问题:

线程 A 执行 new Singleton():
  ① memory = allocate()       // 分配内存
  ③ instance = memory         // 引用指向内存(重排序后先执行)
  ② ctorInstance(memory)      // 初始化对象(重排序后后执行)
        |
线程 B 执行第一次 if 检查:
  instance != null  --> true(但对象还没初始化完成!)
  return instance   --> 返回了一个半初始化的对象!

场景三:低开销的读-写策略

// 设计动机:读操作用 volatile 保证可见性,写操作用 synchronized 保证原子性
// 适用于读多写少的场景,读不需要加锁,性能接近无锁
public class CheapReadWriteLock {
    private volatile int value;

    public int getValue() {
        return value; // 无锁读,volatile 保证可见性
    }

    public synchronized void increment() {
        value++; // 加锁写,保证原子性
    }
}

2.4 volatile vs synchronized vs Atomic

+------------------+------------+------------+------------+
|  特性             | volatile   | synchronized | Atomic   |
+------------------+------------+------------+------------+
|  原子性           |    否      |     是       |    是     |
|  可见性           |    是      |     是       |    是     |
|  有序性           |    是      |     是       |    否     |
|  阻塞             |    否      |     是       |    否     |
|  性能(无竞争时)  |    最快    |     较慢     |    快     |
|  性能(高竞争时)  |    最快    |     一般     |  CAS自旋  |
|  适用场景         | 状态标志   | 复合操作     | 计数器    |
|  实现机制         | 内存屏障   | Monitor锁   | CAS指令   |
+------------------+------------+------------+------------+

一句话总结: volatile 通过内存屏障保证可见性和有序性,但无法保证复合操作的原子性;它是轻量级的同步机制,适用于”一写多读”的简单场景。


三、synchronized 底层演进

3.1 对象头与 Mark Word

![[Gemini_Generated_Image_bbv0zobbv0zobbv0.png]]

3.2 偏向锁

为什么存在偏向锁?

经过 HotSpot 作者的研究发现,大多数情况下锁不仅不存在多线程竞争,而且总是由同一线程多次获得。偏向锁就是为了在没有竞争的情况下消除同步的全部开销。

类比:你每天去同一家咖啡店,店员认出你后就不再检查会员卡了——“偏向”你。直到另一个人也要来用你的会员卡,才需要重新验证。

偏向锁获取流程:

+---------------------------+
|  线程访问同步块             |
+---------------------------+
            |
            v
+---------------------------+
|  检查 Mark Word 的偏向标志  |
|  biased_lock:1 且锁标志:01 |
+---------------------------+
            |
     +------+------+
     |             |
     v             v
 线程 ID 是       线程 ID 不是
 当前线程         当前线程
     |             |
     v             v
 直接进入        +--------------------+
 同步块          | 检查原持有线程是否存活 |
 (无需CAS)       +--------------------+
                       |
                +------+------+
                |             |
                v             v
            未存活          已存活
                |             |
                v             v
           CAS 替换       偏向锁撤销
           线程 ID        升级为轻量级锁

偏向锁撤销(Revocation):

偏向锁的撤销需要等到全局安全点(SafePoint)——此时所有线程都暂停。撤销的成本较高,所以 JVM 有一个优化:如果一个类的偏向锁撤销次数达到阈值(默认 20 次),JVM 会批量重偏向(Bulk Rebias);如果达到更高阈值(默认 40 次),会批量撤销(Bulk Revoke),之后该类的新实例不再使用偏向锁。

注意:JDK 15 开始,偏向锁被默认禁用(JEP 374),因为现代应用中多线程竞争更加普遍,偏向锁的撤销开销反而成了负担。

3.3 轻量级锁

为什么存在轻量级锁?

当偏向锁被撤销(有另一个线程来竞争了),但竞争不激烈时——线程交替执行同步块,不会同时争抢。此时用轻量级锁,通过 CAS 操作避免使用操作系统级的互斥量。

类比:两个人交替使用同一台打印机,每次用的时候在机器上贴个便签纸(Lock Record)标记”我在用”。因为从来不会同时使用,所以不需要去找管理员(操作系统)协调。

轻量级锁获取流程:

+-------------------------------------+
|  线程进入同步块                       |
+-------------------------------------+
            |
            v
+-------------------------------------+
|  在当前线程的栈帧中创建 Lock Record   |
|  将 Mark Word 复制到 Lock Record     |
|  (这个副本叫 Displaced Mark Word)    |
+-------------------------------------+
            |
            v
+-------------------------------------+
|  CAS 尝试将对象头的 Mark Word        |
|  替换为指向 Lock Record 的指针       |
|  并将锁标志位改为 00                  |
+-------------------------------------+
            |
     +------+------+
     |             |
  CAS 成功      CAS 失败
     |             |
     v             v
 成功获取       +--------------------+
 轻量级锁      | 检查 Mark Word 是否 |
               | 指向当前线程栈帧?   |
               +--------------------+
                    |
             +------+------+
             |             |
           是(重入)       否(竞争)
             |             |
             v             v
         Lock Record     自旋等待
         的 Displaced     |
         Mark Word       自旋达到阈值?
         设为 null        |
         (记录重入次数)    +------+------+
                          |             |
                         否            是
                          |             |
                          v             v
                       继续自旋      膨胀为重量级锁

自旋优化:

自旋就是让线程执行一个空循环(busy-wait),避免切换到内核态。JDK 6 之后引入了自适应自旋:如果在同一个锁对象上,自旋等待刚刚成功获得过锁,那么 JVM 认为这次自旋也很有可能成功,就允许更长时间的自旋;反之,如果自旋很少成功,以后可能直接省略自旋过程。

3.4 重量级锁

为什么存在重量级锁?

当自旋超过阈值还没拿到锁,说明竞争确实很激烈——多个线程同时争抢同一把锁。此时继续自旋就是浪费 CPU,不如让线程阻塞,交给操作系统调度。

类比:打印机现在有很多人排队了,贴便签纸(CAS)已经不管用了,必须请管理员(操作系统内核)来维持秩序,登记排队名单。

ObjectMonitor 结构(C++ 实现):

+------------------------------------------+
|             ObjectMonitor                 |
+------------------------------------------+
|  _header      : Mark Word 备份           |
|  _count       : 重入计数                  |
|  _owner       : 持有锁的线程              |
|  _WaitSet     : wait() 的线程集合(双向链表)|
|  _EntryList   : 阻塞等待锁的线程集合       |
|  _cxq         : 最近到达的竞争者(栈结构)  |
|  _recursions  : 重入次数                  |
+------------------------------------------+

                   +------- synchronized(obj) -------+
                   |                                  |
                   v                                  |
+----------+  竞争失败  +-----------+   获取锁    +--------+
| _cxq     | -------> | _EntryList | -------->  | _owner |
| (竞争栈)  |          | (等待队列)  |            | (持有者)|
+----------+          +-----------+            +--------+
                           ^                       |
                           |                       | obj.wait()
                           |  obj.notify()         v
                           |                  +-----------+
                           +----------------- | _WaitSet  |
                                              | (等待集合) |
                                              +-----------+

线程状态转换:
1. 线程竞争锁失败 --> 进入 _cxq 或 _EntryList --> BLOCKED 状态
2. 获取锁 --> 成为 _owner --> RUNNABLE 状态
3. 调用 wait() --> 释放锁,进入 _WaitSet --> WAITING 状态
4. 被 notify() --> 从 _WaitSet 移到 _EntryList --> BLOCKED 状态
5. 重新获取锁 --> 成为 _owner --> RUNNABLE 状态

为什么重量级锁开销大?

  1. 系统调用(syscall):线程阻塞和唤醒需要从用户态切换到内核态,每次切换约 1-10 微秒
  2. 上下文切换:保存和恢复线程的寄存器、程序计数器、栈指针等,还会导致 CPU 缓存失效
  3. 线程调度:操作系统需要决定唤醒哪个线程,涉及调度算法的开销

3.5 锁升级全流程

+--------+      有另一线程       +--------+     存在实际竞争     +----------+
|  无锁   | --竞争偏向锁------> | 偏向锁  | --CAS竞争失败-----> | 轻量级锁  |
| (01,0) |                     | (01,1) |                     |   (00)   |
+--------+                     +--------+                     +----------+
                                                                    |
                                                            自旋超过阈值
                                                            或等待线程数>1
                                                                    |
                                                                    v
                                                              +----------+
                                                              | 重量级锁  |
                                                              |   (10)   |
                                                              +----------+

详细流程图:

        线程访问同步块
              |
              v
    +----是否启用偏向锁?----+
    |                       |
   是                      否
    |                       |
    v                       v
 尝试偏向             直接轻量级锁
    |
    v
 +--对象是否已偏向?--+
 |                   |
未偏向             已偏向
 |                   |
 v                   v
CAS设置           偏向的是
线程ID             当前线程?
 |                /      \
成功             是       否
 |               |        |
 v               v        v
进入同步块    直接进入    到达安全点
(偏向当前     同步块     撤销偏向
 线程)                    |
                          v
                    升级为轻量级锁
                          |
                          v
                    CAS 获取锁
                   /          \
                 成功          失败
                  |             |
                  v             v
               进入同步块     自旋等待
                              |
                           超过阈值?
                           /       \
                         否         是
                          |          |
                          v          v
                       继续自旋   膨胀为重量级锁
                                    |
                                    v
                              线程阻塞(BLOCKED)
                              等待 _owner 释放

锁只能升级,不能降级(有一个例外):

这是因为锁降级的判断成本太高——你不知道什么时候从”高竞争”变回”低竞争”。唯一的例外是偏向锁的批量撤销(Bulk Revoke),这实际上是在安全点上统一操作,性质不同于运行时降级。

3.6 锁优化技术

锁消除(Lock Elimination)

JIT 编译器通过逃逸分析判断一个对象是否被多线程共享。如果对象不会逃逸出当前线程,那么对它的加锁操作可以直接消除。

// 设计动机:程序员可能不经意地在不需要同步的地方使用了同步
// 比如在方法内部创建的 StringBuffer
public String concat(String s1, String s2) {
    // StringBuffer 的 append 方法是 synchronized 的
    // 但 sb 不会逃逸出这个方法,不可能被其他线程访问
    StringBuffer sb = new StringBuffer();
    sb.append(s1);   // JIT 会消除这里的 synchronized
    sb.append(s2);   // JIT 会消除这里的 synchronized
    return sb.toString();
}

// JIT 优化后,等价于:
public String concat(String s1, String s2) {
    StringBuffer sb = new StringBuffer();
    sb.append(s1);   // 无锁
    sb.append(s2);   // 无锁
    return sb.toString();
}

锁粗化(Lock Coarsening)

如果 JVM 检测到一连串零碎的加锁解锁操作都是对同一个对象,就会把锁的范围扩大(粗化),只加一次锁。

// 优化前:反复加锁解锁
for (int i = 0; i < 10000; i++) {
    synchronized (lock) {    // 10000 次 lock/unlock
        list.add(i);
    }
}

// JIT 锁粗化后:
synchronized (lock) {        // 只有 1 次 lock/unlock
    for (int i = 0; i < 10000; i++) {
        list.add(i);
    }
}

自适应自旋(Adaptive Spinning)

自旋的次数不再固定,而是由 JVM 根据前一次在同一个锁上的自旋结果动态调整:

  • 上次自旋成功了 → 这次多自旋几圈(乐观)
  • 上次自旋失败了 → 这次少自旋或直接不自旋(悲观)
  • 如果某个锁从来没有自旋成功过 → 以后直接跳过自旋阶段

一句话总结: synchronized 经历了从无锁到偏向锁、轻量级锁、重量级锁的升级演进,配合锁消除、锁粗化和自适应自旋等优化,已经不再是性能杀手。


四、AQS 框架精讲

4.1 什么是 AQS

AQS(AbstractQueuedSynchronizer)是 Java 并发包(java.util.concurrent)的基石框架。ReentrantLock、Semaphore、CountDownLatch、ReentrantReadWriteLock 等同步工具都是基于 AQS 实现的。

类比:银行排队系统

+---------------------------------------------------------------------+
|                           银行排队系统                                |
+---------------------------------------------------------------------+
|                                                                     |
|  state(柜台状态)                                                   |
|  = 0 表示柜台空闲                                                    |
|  = 1 表示有人正在办理                                                |
|  > 1 表示重入次数(同一个人办多笔业务)                                |
|                                                                     |
|  CLH 队列(排队队列)                                                |
|  +------+    +------+    +------+    +------+                       |
|  | head | -> | 客户A | -> | 客户B | -> | 客户C |  (tail)             |
|  |(哨兵)|    |(等待) |    |(等待) |    |(等待) |                      |
|  +------+    +------+    +------+    +------+                       |
|                                                                     |
|  每个客户(Node)记录:                                               |
|  - 我的状态(等待中/取消/需要被唤醒)                                  |
|  - 前面是谁(prev)                                                  |
|  - 后面是谁(next)                                                  |
|  - 绑定的线程                                                       |
+---------------------------------------------------------------------+

设计动机: Doug Lea 发现几乎所有同步器都遵循相同的模式——维护一个共享状态 + 管理等待线程队列。AQS 把这个通用逻辑抽取出来,子类只需要实现”如何判断能否获取/释放同步状态”的逻辑即可。

4.2 AQS 核心数据结构

AbstractQueuedSynchronizer
+---------------------------------------------------+
|  volatile int state;     // 同步状态                |
|                                                    |
|  +--- CLH 双向队列 (FIFO) -----------------------+ |
|  |                                                | |
|  |  +------+    prev   +------+   prev  +------+ | |
|  |  | head | <-------- | Node | <------ | tail | | |
|  |  |(哨兵) | -------> | 节点  | ------> |      | | |
|  |  +------+    next   +------+   next  +------+ | |
|  |                                                | |
|  +------------------------------------------------+ |
+---------------------------------------------------+

Node 结构:
+---------------------------------------------------+
|  static final class Node {                         |
|      volatile int waitStatus;  // 节点等待状态      |
|      volatile Node prev;       // 前驱节点          |
|      volatile Node next;       // 后继节点          |
|      volatile Thread thread;   // 绑定的线程        |
|      Node nextWaiter;          // Condition队列指针 |
|  }                                                 |
+---------------------------------------------------+

waitStatus 取值:
+-------------+-------+------------------------------------------+
|  状态名      | 值    | 含义                                      |
+-------------+-------+------------------------------------------+
| CANCELLED   |   1   | 线程已取消等待(超时或中断)                 |
| SIGNAL      |  -1   | 后继节点需要被唤醒(当前节点释放锁时要通知)   |
| CONDITION   |  -2   | 节点在 Condition 队列中等待                 |
| PROPAGATE   |  -3   | 共享模式下,释放应该传播到其他节点             |
| 0           |   0   | 初始状态                                    |
+-------------+-------+------------------------------------------+

4.3 独占模式源码分析

以 ReentrantLock 的 lock() 为例,完整调用链:

lock()
  └── acquire(1)
        ├── tryAcquire(1)         // 尝试获取锁(子类实现)
        │     ├── 成功 → 返回
        │     └── 失败 ↓
        ├── addWaiter(Node.EXCLUSIVE) // 创建节点加入队列
        └── acquireQueued(node, 1)    // 在队列中自旋/阻塞
              ├── shouldParkAfterFailedAcquire()  // 判断是否需要阻塞
              └── parkAndCheckInterrupt()          // LockSupport.park() 阻塞

acquire() — 获取锁的入口:

// 设计动机:先快速尝试获取(无锁快速路径),失败再排队(慢速路径)
public final void acquire(int arg) {
    if (!tryAcquire(arg) &&                    // ① 尝试获取锁(由子类实现)
        acquireQueued(addWaiter(Node.EXCLUSIVE), arg)) // ② 获取失败则排队
        selfInterrupt();                        // ③ 如果在排队过程中被中断过,补上中断标志
}
// 为什么要 selfInterrupt?因为 parkAndCheckInterrupt 会清除中断标志
// 但 acquire 的语义是不响应中断的,所以要在成功获取锁后恢复中断状态

addWaiter() — 将当前线程封装为 Node 加入队列尾部:

private Node addWaiter(Node mode) {
    Node node = new Node(Thread.currentThread(), mode);
    // 快速尝试:直接 CAS 设置 tail(大多数情况下成功)
    Node pred = tail;
    if (pred != null) {
        node.prev = pred;
        if (compareAndSetTail(pred, node)) {  // CAS 成功
            pred.next = node;
            return node;
        }
    }
    // 快速尝试失败(队列为空或 CAS 失败),进入完整入队流程
    enq(node);
    return node;
}

// 设计动机:先用快速路径(一次 CAS),失败再走完整的自旋入队
// 这是典型的 fast-path / slow-path 设计模式

private Node enq(final Node node) {
    for (;;) { // 自旋,直到入队成功
        Node t = tail;
        if (t == null) {
            // 队列为空,初始化一个哨兵节点作为 head
            if (compareAndSetHead(new Node()))
                tail = head;
        } else {
            node.prev = t;
            if (compareAndSetTail(t, node)) {
                t.next = node;
                return t;
            }
        }
    }
}

acquireQueued() — 在队列中自旋获取锁:

// 设计动机:只有队列中第二个节点(head 的后继)才有资格尝试获取锁
// 其他节点老老实实阻塞,避免所有线程同时自旋浪费 CPU
final boolean acquireQueued(final Node node, int arg) {
    boolean failed = true;
    try {
        boolean interrupted = false;
        for (;;) { // 自旋
            final Node p = node.predecessor();  // 获取前驱节点
            // 只有前驱是 head 时才尝试获取锁(公平性保证)
            if (p == head && tryAcquire(arg)) {
                setHead(node);   // 获取成功,当前节点变成新的 head
                p.next = null;   // 帮助 GC 回收旧 head
                failed = false;
                return interrupted;
            }
            // 不是 head 的后继,或者 tryAcquire 失败
            if (shouldParkAfterFailedAcquire(p, node) && // 检查是否应该阻塞
                parkAndCheckInterrupt())                   // 阻塞当前线程
                interrupted = true;
        }
    } finally {
        if (failed)
            cancelAcquire(node); // 异常情况下取消获取
    }
}

shouldParkAfterFailedAcquire() — 判断是否应该阻塞:

// 设计动机:不是一失败就阻塞,而是先整理队列状态
// 确保前驱节点的 waitStatus 为 SIGNAL 后再阻塞
// 这样前驱释放锁时才知道要唤醒后继
private static boolean shouldParkAfterFailedAcquire(Node pred, Node node) {
    int ws = pred.waitStatus;
    if (ws == Node.SIGNAL)
        // 前驱已经设为 SIGNAL,安全地阻塞
        return true;
    if (ws > 0) {
        // 前驱被取消了(CANCELLED),跳过所有被取消的前驱
        do {
            node.prev = pred = pred.prev;
        } while (pred.waitStatus > 0);
        pred.next = node;
    } else {
        // 前驱状态为 0 或 PROPAGATE,设为 SIGNAL
        compareAndSetWaitStatus(pred, ws, Node.SIGNAL);
    }
    return false; // 返回 false,外层 acquireQueued 会再循环一次
}

4.4 共享模式源码分析

共享模式允许多个线程同时获取同步状态,典型的实现有 Semaphore 和 CountDownLatch。

acquireShared(arg)
  └── tryAcquireShared(arg)       // 返回值:< 0 失败,>= 0 成功
        ├── >= 0 → 获取成功,返回
        └── < 0 → 失败 ↓
              └── doAcquireShared(arg)  // 排队 + 阻塞
                    └── 获取成功后 setHeadAndPropagate()  // 关键:传播唤醒
// 设计动机:与独占模式的关键区别在于 tryAcquireShared 的返回值
// 返回正数表示还有剩余资源,可以继续唤醒后面等待的线程
public final void acquireShared(int arg) {
    if (tryAcquireShared(arg) < 0)
        doAcquireShared(arg);
}

private void doAcquireShared(int arg) {
    final Node node = addWaiter(Node.SHARED); // 注意:SHARED 模式
    boolean failed = true;
    try {
        boolean interrupted = false;
        for (;;) {
            final Node p = node.predecessor();
            if (p == head) {
                int r = tryAcquireShared(arg);
                if (r >= 0) {
                    // 获取成功,设置头节点并传播
                    setHeadAndPropagate(node, r);
                    p.next = null;
                    if (interrupted) selfInterrupt();
                    failed = false;
                    return;
                }
            }
            if (shouldParkAfterFailedAcquire(p, node) &&
                parkAndCheckInterrupt())
                interrupted = true;
        }
    } finally {
        if (failed) cancelAcquire(node);
    }
}

// 传播机制:一个线程获取到共享锁后,如果还有剩余资源,继续唤醒后面的线程
private void setHeadAndPropagate(Node node, int propagate) {
    Node h = head;
    setHead(node);
    // propagate > 0 说明还有剩余资源,继续唤醒
    if (propagate > 0 || h == null || h.waitStatus < 0 ||
        (h = head) == null || h.waitStatus < 0) {
        Node s = node.next;
        if (s == null || s.isShared())
            doReleaseShared(); // 唤醒后继共享节点
    }
}

传播机制的直觉理解:

独占模式就像银行只有一个柜台,一次只服务一个人。共享模式就像银行大门,可以同时进 N 个人。当一个人进去后发现门还没满(propagate > 0),就喊后面的人也进来。

4.5 ReentrantLock 基于 AQS 的实现

公平锁 vs 非公平锁:

// ============ 非公平锁的 tryAcquire ============
// 设计动机:新来的线程直接尝试抢锁,不管队列里有没有人排队
// 优点:减少线程切换开销,吞吐量更高
// 缺点:可能导致队列中的线程饥饿
final boolean nonfairTryAcquire(int acquires) {
    final Thread current = Thread.currentThread();
    int c = getState();
    if (c == 0) {
        // 直接 CAS 抢锁,不检查队列!
        if (compareAndSetState(0, acquires)) {
            setExclusiveOwnerThread(current);
            return true;
        }
    }
    else if (current == getExclusiveOwnerThread()) {
        // 重入:state + 1
        int nextc = c + acquires;
        if (nextc < 0) throw new Error("Maximum lock count exceeded");
        setState(nextc);
        return true;
    }
    return false;
}

// ============ 公平锁的 tryAcquire ============
// 设计动机:严格按照先来后到,保证公平性
// 优点:不会饥饿
// 缺点:多了一次 hasQueuedPredecessors() 检查,性能略差
protected final boolean tryAcquire(int acquires) {
    final Thread current = Thread.currentThread();
    int c = getState();
    if (c == 0) {
        // 关键区别:先检查队列中是否有前驱节点在等待
        if (!hasQueuedPredecessors() &&   // <--- 这一行是唯一的区别!
            compareAndSetState(0, acquires)) {
            setExclusiveOwnerThread(current);
            return true;
        }
    }
    else if (current == getExclusiveOwnerThread()) {
        int nextc = c + acquires;
        if (nextc < 0) throw new Error("Maximum lock count exceeded");
        setState(nextc);
        return true;
    }
    return false;
}

重入的实现原理:

第一次 lock():  state: 0 → 1,  owner: null → ThreadA
第二次 lock():  state: 1 → 2,  owner: ThreadA(不变,是同一个线程)
第三次 lock():  state: 2 → 3

第一次 unlock():state: 3 → 2
第二次 unlock():state: 2 → 1
第三次 unlock():state: 1 → 0,  owner: ThreadA → null(完全释放)

4.6 Condition 机制

Condition 是 AQS 内部类 ConditionObject 实现的,它为每个 Condition 对象维护了一个单独的等待队列(Condition Queue),与 AQS 的同步队列(Sync Queue)是两个不同的队列。

AQS 同步队列(Sync Queue):
  争抢锁的线程排队的地方

  +------+    +------+    +------+
  | head | -> | NodeA | -> | NodeB |  (tail)
  +------+    +------+    +------+

Condition 等待队列(Condition Queue):
  调用 await() 的线程等待的地方(单向链表)

  Condition1:
  +----------+    +----------+
  | firstWaiter| -> | lastWaiter |
  | (NodeC)  |    | (NodeD)   |
  +----------+    +----------+

  Condition2:
  +----------+
  | firstWaiter|
  | (NodeE)  |
  +----------+

await() 和 signal() 的转移过程:

线程 A 调用 condition.await():
  ① 创建新 Node,加入 Condition Queue 尾部
  ② 完全释放锁(state 置为 0)
  ③ 线程阻塞(LockSupport.park)

  Sync Queue:   head -> [其他等待者...]
  Cond Queue:   ... -> [NodeA]  (线程 A 在这里等待)


线程 B 调用 condition.signal():
  ① 取 Condition Queue 的 firstWaiter(NodeA)
  ② 将 NodeA 从 Condition Queue 移除
  ③ 将 NodeA 加入 Sync Queue 尾部
  ④ 将 NodeA 的 waitStatus 设为 SIGNAL

  Sync Queue:   head -> [其他等待者...] -> [NodeA]  (线程 A 转移到这里)
  Cond Queue:   (空或剩余节点)

线程 A 被唤醒后:
  ⑤ 在 Sync Queue 中排队等待获取锁
  ⑥ 获取锁成功后从 await() 返回
// await() 核心逻辑
public final void await() throws InterruptedException {
    if (Thread.interrupted()) throw new InterruptedException();
    Node node = addConditionWaiter();      // ① 加入 Condition Queue
    int savedState = fullyRelease(node);   // ② 完全释放锁(重入锁也全部释放)
    int interruptMode = 0;
    while (!isOnSyncQueue(node)) {         // 如果还没被转移到 Sync Queue
        LockSupport.park(this);            // ③ 阻塞
        if ((interruptMode = checkInterruptWhileWaiting(node)) != 0)
            break;
    }
    // 被 signal 唤醒后,在 Sync Queue 中重新获取锁
    if (acquireQueued(node, savedState) && interruptMode != THROW_IE)
        interruptMode = REINTERRUPT;
    // 清理 Condition Queue 中已取消的节点
    if (node.nextWaiter != null)
        unlinkCancelledWaiters();
    if (interruptMode != 0)
        reportInterruptAfterWait(interruptMode);
}

4.7 基于 AQS 自定义同步器

示例:实现一个简单的互斥锁

/**
 * 设计动机:展示 AQS 的模板方法模式
 * 子类只需要实现 tryAcquire/tryRelease(独占)
 * 或 tryAcquireShared/tryReleaseShared(共享)
 * 排队、阻塞、唤醒的逻辑全部由 AQS 负责
 */
public class SimpleMutex implements Lock {

    private static class Sync extends AbstractQueuedSynchronizer {

        // state == 0: 未锁定
        // state == 1: 已锁定

        @Override
        protected boolean tryAcquire(int arg) {
            // CAS 将 state 从 0 改为 1
            if (compareAndSetState(0, 1)) {
                setExclusiveOwnerThread(Thread.currentThread());
                return true;
            }
            return false;
        }

        @Override
        protected boolean tryRelease(int arg) {
            if (getState() == 0) throw new IllegalMonitorStateException();
            setExclusiveOwnerThread(null);
            setState(0); // 不需要 CAS,因为是独占模式,只有持有锁的线程才能释放
            return true;
        }

        @Override
        protected boolean isHeldExclusively() {
            return getExclusiveOwnerThread() == Thread.currentThread();
        }

        Condition newCondition() {
            return new ConditionObject();
        }
    }

    private final Sync sync = new Sync();

    @Override public void lock()              { sync.acquire(1); }
    @Override public void unlock()            { sync.release(1); }
    @Override public boolean tryLock()         { return sync.tryAcquire(1); }
    @Override public Condition newCondition()  { return sync.newCondition(); }
    @Override public void lockInterruptibly() throws InterruptedException {
        sync.acquireInterruptibly(1);
    }
    @Override public boolean tryLock(long time, TimeUnit unit) throws InterruptedException {
        return sync.tryAcquireNanos(1, unit.toNanos(time));
    }
}

示例:实现一个共享锁(允许最多 N 个线程同时访问)

/**
 * 设计动机:展示共享模式的使用方式
 * 类似于 Semaphore,但更简单
 */
public class SharedLock {

    private static class Sync extends AbstractQueuedSynchronizer {
        Sync(int permits) {
            setState(permits); // 初始许可数
        }

        @Override
        protected int tryAcquireShared(int arg) {
            for (;;) {
                int available = getState();
                int remaining = available - arg;
                // remaining < 0 表示资源不足,返回负数让调用者排队
                // remaining >= 0 则 CAS 扣减
                if (remaining < 0 || compareAndSetState(available, remaining))
                    return remaining;
            }
        }

        @Override
        protected boolean tryReleaseShared(int arg) {
            for (;;) {
                int current = getState();
                int next = current + arg;
                if (compareAndSetState(current, next))
                    return true;
            }
        }
    }

    private final Sync sync;

    public SharedLock(int permits) {
        sync = new Sync(permits);
    }

    public void acquire() { sync.acquireShared(1); }
    public void release() { sync.releaseShared(1); }
}

一句话总结: AQS 通过”volatile state + CLH 双向队列 + CAS”实现了一套通用的同步框架,子类只需实现 tryAcquire/tryRelease 即可得到完整的锁语义。


五、CAS 与原子操作

5.1 CAS 原理

CAS(Compare-And-Swap)是一种乐观锁思想的实现,它有三个操作数:

  • V(Value):内存中的当前值
  • A(Expected):期望值(我认为现在应该是什么)
  • B(New):新值(如果确实是 A,我要改成 B)
CAS 的逻辑(伪代码):

if (V == A) {
    V = B;
    return true;  // 成功
} else {
    return false; // 失败,别人已经改过了
}

// 关键:这整个 if + 赋值 是一个原子操作(由 CPU 指令保证)

CPU 级别的实现:

在 x86 架构上,CAS 对应 CMPXCHG 指令(Compare and Exchange)。在多核处理器上,这条指令会配合 LOCK 前缀使用,它会锁住总线或缓存行,确保在执行期间其他核心无法修改同一内存地址。

// x86 汇编(简化)
lock cmpxchg [address], new_value
// lock 前缀确保原子性
// cmpxchg: if (EAX == [address]) { [address] = new_value; ZF=1 }
//          else { EAX = [address]; ZF=0 }

为什么 CAS 比加锁快?

加锁方式(悲观锁):
  线程 A: [获取锁] ----[执行操作]---- [释放锁]
  线程 B: [尝试获取锁] --[阻塞(OS调度)]-- [被唤醒] --[获取锁] --[执行]-- [释放]
                          ↑ 用户态 → 内核态(开销大)

CAS 方式(乐观锁):
  线程 A: [CAS成功] --[操作完成]
  线程 B: [CAS失败] --[重试] --[CAS成功] --[操作完成]
                      ↑ 始终在用户态,没有上下文切换

5.2 Unsafe 类

Unsafe 是 Java 中直接操作内存的后门类,位于 sun.misc.Unsafe(JDK 9+ 改为 jdk.internal.misc.Unsafe)。原子类的底层实现都依赖它。

// AtomicInteger 的核心实现
public class AtomicInteger extends Number {
    // Unsafe 实例
    private static final Unsafe unsafe = Unsafe.getUnsafe();
    // value 字段的内存偏移量(用于直接操作内存)
    private static final long valueOffset;

    static {
        try {
            // 获取 value 字段在对象中的内存偏移量
            valueOffset = unsafe.objectFieldOffset(
                AtomicInteger.class.getDeclaredField("value"));
        } catch (Exception ex) { throw new Error(ex); }
    }

    // volatile 保证可见性
    private volatile int value;

    // 设计动机:自旋 CAS,失败就重试,直到成功
    // 这就是无锁编程的核心模式
    public final int getAndAddInt(Object obj, long offset, int delta) {
        int v;
        do {
            v = getIntVolatile(obj, offset);  // 读取当前值
        } while (!compareAndSwapInt(obj, offset, v, v + delta)); // CAS 尝试更新
        return v;
    }

    // native 方法,直接调用 CPU 的 CMPXCHG 指令
    public final native boolean compareAndSwapInt(
        Object obj, long offset, int expected, int update);
}

5.3 ABA 问题

什么是 ABA 问题?

CAS 判断的是”值有没有变”,但无法判断”值是不是被改过又改回来了”。

时间线:
  线程 A:读取值 = A
          |
          |(线程 A 被挂起)
          |
  线程 B:将 A 改为 B
  线程 C:将 B 改回 A
          |
  线程 A:CAS(expected=A, new=X) → 成功!
          |
          但此时的 A 已经不是当初那个 A 了!

具体例子:链表操作

初始状态:head -> A -> B -> C

线程 1 想做:CAS 把 head 从 A 改为 B(删除 A)
线程 1 读到 head = A,记下 next = B

线程 2 执行了一系列操作:
  删除 A 和 B:head -> C
  重新插入一个新的 A(不同对象但值相同):head -> A -> C

线程 1 恢复执行:
  CAS(head, A, B) 成功!
  但 B 已经被删除了,B.next 可能是野指针!
  结果:head -> B -> ???(内存错误)

解决方案:AtomicStampedReference

// 设计动机:给每次修改加一个"版本号"
// 不仅比较值是否相同,还比较版本号是否相同
// 即使值从 A 变成 B 再变回 A,版本号也从 1 变成 2 再变成 3,不会相同
public class ABADemo {
    // AtomicStampedReference 内部同时维护引用和版本号(stamp)
    private static AtomicStampedReference<Integer> ref =
        new AtomicStampedReference<>(100, 0); // 初始值 100,版本号 0

    public static void main(String[] args) {
        int stamp = ref.getStamp(); // 获取当前版本号
        Integer reference = ref.getReference(); // 获取当前值

        // CAS 时同时检查值和版本号
        boolean success = ref.compareAndSet(
            reference,    // 期望值
            200,          // 新值
            stamp,        // 期望版本号
            stamp + 1     // 新版本号
        );
    }
}

AtomicMarkableReference: 如果不关心被修改了几次,只关心”有没有被修改过”,可以用 AtomicMarkableReference,它用一个 boolean 标记代替版本号。

5.4 原子类家族

java.util.concurrent.atomic 包下的原子类:

+------------------------------------------+
| 基本类型原子类                              |
|  AtomicInteger                            |
|  AtomicLong                               |
|  AtomicBoolean                            |
+------------------------------------------+
| 引用类型原子类                              |
|  AtomicReference<V>                       |
|  AtomicStampedReference<V>  (解决ABA)     |
|  AtomicMarkableReference<V> (标记引用)     |
+------------------------------------------+
| 数组原子类                                  |
|  AtomicIntegerArray                       |
|  AtomicLongArray                          |
|  AtomicReferenceArray<E>                  |
+------------------------------------------+
| 字段更新器(对已有类的 volatile 字段做原子更新)|
|  AtomicIntegerFieldUpdater<T>             |
|  AtomicLongFieldUpdater<T>                |
|  AtomicReferenceFieldUpdater<T,V>         |
+------------------------------------------+
| 累加器(JDK 8+,高并发下性能更好)            |
|  LongAdder                                |
|  LongAccumulator                          |
|  DoubleAdder                              |
|  DoubleAccumulator                        |
+------------------------------------------+

5.5 LongAdder 为什么比 AtomicLong 快

AtomicLong 的瓶颈:

所有线程争抢同一个 value:

线程1 ---+
线程2 ---+--> CAS(value) --> 成功只有1个,其他全部重试
线程3 ---+
线程4 ---+

高并发下,大量 CAS 失败 → 大量自旋 → 浪费 CPU

LongAdder 的分散策略:

LongAdder 内部结构:

+-------+     +---------+---------+---------+---------+
| base  |     | Cell[0] | Cell[1] | Cell[2] | Cell[3] |
| (基础值)|     |  value  |  value  |  value  |  value  |
+-------+     +---------+---------+---------+---------+

线程1 -------> Cell[0] (CAS)
线程2 -------> Cell[1] (CAS)
线程3 -------> Cell[2] (CAS)    每个线程分散到不同的 Cell
线程4 -------> Cell[3] (CAS)

最终结果 = base + Cell[0] + Cell[1] + Cell[2] + Cell[3]
// 设计动机:"化整为零"——把一个热点变量拆分成多个,减少竞争
// 类似于银行多开几个柜台,每个柜台独立服务,最后汇总
// 这就是 Striped64 的核心思想

// LongAdder 的 add 方法(简化)
public void add(long x) {
    Cell[] as; long b, v; int m; Cell a;
    // 首先尝试 CAS 更新 base(快速路径,低竞争时走这里)
    if ((as = cells) != null || !casBase(b = base, b + x)) {
        boolean uncontended = true;
        // base CAS 失败,说明有竞争,使用 Cell 数组
        if (as == null || (m = as.length - 1) < 0 ||
            // 通过线程的 probe 值(类似哈希)定位到某个 Cell
            (a = as[getProbe() & m]) == null ||
            !(uncontended = a.cas(v = a.value, v + x)))
            // Cell CAS 也失败了,进入终极方法,可能扩容
            longAccumulate(x, null, uncontended);
    }
}

public long sum() {
    Cell[] as = cells; Cell a;
    long sum = base;
    if (as != null) {
        for (int i = 0; i < as.length; ++i) {
            if ((a = as[i]) != null)
                sum += a.value; // 把所有 Cell 的值加起来
        }
    }
    return sum;
    // 注意:sum() 不是原子操作!在统计过程中其他线程可能还在修改
    // 所以 LongAdder 适合统计场景,不适合需要精确实时值的场景
}

伪共享(False Sharing)问题与 @Contended:

CPU 缓存行(Cache Line)通常是 64 字节:

+-------- 一个缓存行 (64 bytes) --------+
| Cell[0].value | Cell[1].value | ...   |
+---------------------------------------+

问题:Cell[0] 和 Cell[1] 在同一个缓存行中。
当 CPU 核心 1 修改 Cell[0] 时,会导致 CPU 核心 2 缓存行失效,
即使核心 2 只使用 Cell[1]。这就是伪共享——不同的数据因为物理上相邻而互相干扰。

解决方案:用填充(Padding)让每个 Cell 独占一个缓存行

+------ 缓存行 1 ------+  +------ 缓存行 2 ------+
| [padding] Cell[0]     |  | [padding] Cell[1]     |
| [padding]             |  | [padding]             |
+-----------------------+  +-----------------------+
// JDK 8 中 Cell 类用 @Contended 注解防止伪共享
// JVM 会自动在对象前后加填充字节
@sun.misc.Contended   // 告诉 JVM:给这个对象前后加 128 字节的 padding
static final class Cell {
    volatile long value;
    Cell(long x) { value = x; }
    final boolean cas(long cmp, long val) {
        return UNSAFE.compareAndSwapLong(this, valueOffset, cmp, val);
    }
}
// 使用 @Contended 需要 JVM 参数:-XX:-RestrictContended

一句话总结: CAS 是无锁编程的基石,通过 CPU 原子指令实现;LongAdder 通过”分散热点 + Cell 数组”策略在高并发场景下大幅超越 AtomicLong 的性能。


六、线程池源码级理解

6.1 线程池状态机

线程池用一个 AtomicInteger 类型的 ctl 变量同时存储两个信息:

ctl(32 位 int):
+---+-----------------------------+
| 3 |            29               |
+---+-----------------------------+
 ↑               ↑
 状态位         工作线程数量
(高3位)        (低29位,最大约5亿)

5 种状态及其转换:

                 shutdown()
RUNNING (-1) ─────────────────> SHUTDOWN (0)
   │                               │
   │ shutdownNow()                 │ 队列为空 && 工作线程数为 0
   │                               v
   └──────────────────────> STOP (1)

                               │ 工作线程数为 0
                               v
                          TIDYING (2)

                               │ terminated() 钩子执行完毕
                               v
                         TERMINATED (3)

状态值说明:
+-------------+-------+--------------------------------------------+
|  状态        | 值    | 说明                                        |
+-------------+-------+--------------------------------------------+
| RUNNING     | -1    | 接受新任务,处理队列中的任务                   |
| SHUTDOWN    |  0    | 不接受新任务,但处理队列中的任务               |
| STOP        |  1    | 不接受新任务,不处理队列,中断正在执行的任务    |
| TIDYING     |  2    | 所有任务已结束,workerCount=0,即将执行钩子    |
| TERMINATED  |  3    | terminated() 钩子方法执行完毕                 |
+-------------+-------+--------------------------------------------+
// 设计动机:用一个变量存两个信息,一次 CAS 就能同时更新状态和线程数
// 避免了两个变量需要两次 CAS 的不一致问题
private final AtomicInteger ctl = new AtomicInteger(ctlOf(RUNNING, 0));
private static final int COUNT_BITS = Integer.SIZE - 3; // 29
private static final int CAPACITY   = (1 << COUNT_BITS) - 1; // 0x1FFFFFFF

// 高 3 位:状态
private static final int RUNNING    = -1 << COUNT_BITS; // 111_00...0
private static final int SHUTDOWN   =  0 << COUNT_BITS; // 000_00...0
private static final int STOP       =  1 << COUNT_BITS; // 001_00...0
private static final int TIDYING    =  2 << COUNT_BITS; // 010_00...0
private static final int TERMINATED =  3 << COUNT_BITS; // 011_00...0

// 拆解方法
private static int runStateOf(int c)     { return c & ~CAPACITY; } // 取高 3 位
private static int workerCountOf(int c)  { return c & CAPACITY; }  // 取低 29 位
private static int ctlOf(int rs, int wc) { return rs | wc; }      // 合并

6.2 execute() 核心流程

// 这是理解线程池最重要的方法
public void execute(Runnable command) {
    if (command == null) throw new NullPointerException();
    int c = ctl.get();

    // 步骤一:工作线程数 < 核心线程数 → 创建核心线程执行任务
    if (workerCountOf(c) < corePoolSize) {
        if (addWorker(command, true))  // true 表示核心线程
            return;
        c = ctl.get(); // addWorker 失败,重新获取 ctl
    }

    // 步骤二:核心线程满了 → 尝试把任务加入队列
    if (isRunning(c) && workQueue.offer(command)) {
        int recheck = ctl.get();
        // Double Check:再次检查线程池状态
        // 因为在入队的过程中,线程池可能被 shutdown 了
        if (!isRunning(recheck) && remove(command))
            reject(command); // 线程池已关闭,拒绝任务
        else if (workerCountOf(recheck) == 0)
            addWorker(null, false); // 防止工作线程全部死亡,队列中的任务没人处理
    }

    // 步骤三:队列也满了 → 创建非核心线程
    else if (!addWorker(command, false)) // false 表示非核心线程
        reject(command); // 创建失败(达到 maximumPoolSize)→ 拒绝
}

execute() 的决策流程图:

           execute(task)
               |
               v
    +---------------------+
    | workerCount          |     是
    | < corePoolSize ?  ---------> addWorker(task, core=true) --> 成功则返回
    +---------------------+                                       |
               | 否                                             失败
               v                                                  |
    +---------------------+                                       v
    | 线程池 RUNNING       |     是                          重新获取 ctl
    | && queue.offer(task)?--------+                              |
    +---------------------+        |                              |
               | 否                v                              |
               |        +----Double Check----+                    |
               |        | 线程池还在 RUNNING? |                    |
               |        +---+----------+-----+                    |
               |            |          |                          |
               |           否         是                          |
               |            |          |                          |
               |            v          v                          |
               |         remove     workerCount==0?               |
               |         +reject       |                          |
               |                     是: addWorker(null,false)    |
               |                     否: do nothing               |
               v                                                  |
    +----------------------+                                      |
    | addWorker(task,       | <-----------------------------------+
    |   core=false)         |
    +---+------------------+
        |
     成功则执行
     失败则 reject(task)

6.3 Worker 线程的生命周期

// Worker 的双重身份:
// 1. 继承 AQS:用于实现不可重入的互斥锁(判断线程是否空闲)
// 2. 实现 Runnable:作为线程的执行体
private final class Worker extends AbstractQueuedSynchronizer implements Runnable {
    final Thread thread;     // Worker 绑定的线程
    Runnable firstTask;      // 创建时的第一个任务(可能为 null)
    volatile long completedTasks; // 完成的任务数

    Worker(Runnable firstTask) {
        setState(-1); // 初始 state=-1,阻止中断直到 runWorker
        this.firstTask = firstTask;
        this.thread = getThreadFactory().newThread(this);
    }

    public void run() {
        runWorker(this);
    }
}
// runWorker — Worker 线程的核心执行循环
// 设计动机:通过 getTask() 不断从队列中取任务
// 核心线程用 take()(无限等待),非核心线程用 poll(timeout)(超时退出)
final void runWorker(Worker w) {
    Thread wt = Thread.currentThread();
    Runnable task = w.firstTask;
    w.firstTask = null;
    w.unlock(); // 允许中断(setState 从 -1 变为 0)
    boolean completedAbruptly = true;
    try {
        // 关键循环:先执行 firstTask,然后不断 getTask() 取新任务
        while (task != null || (task = getTask()) != null) {
            w.lock(); // 获取 Worker 自身的锁(标记为"忙碌")
            // 如果线程池 >= STOP,确保线程被中断
            if ((runStateAtLeast(ctl.get(), STOP) ||
                 (Thread.interrupted() && runStateAtLeast(ctl.get(), STOP))) &&
                !wt.isInterrupted())
                wt.interrupt();
            try {
                beforeExecute(wt, task);  // 钩子方法(可覆写)
                try {
                    task.run();           // 实际执行任务
                } catch (Throwable x) {
                    afterExecute(task, x); // 钩子方法
                    throw x;
                }
                afterExecute(task, null);
            } finally {
                task = null;
                w.completedTasks++;
                w.unlock(); // 标记为"空闲"
            }
        }
        completedAbruptly = false;
    } finally {
        processWorkerExit(w, completedAbruptly); // 线程退出清理
    }
}
// getTask — 从队列中获取任务(核心线程 vs 非核心线程的区别在这里)
private Runnable getTask() {
    boolean timedOut = false;
    for (;;) {
        int c = ctl.get();
        int rs = runStateOf(c);

        // 线程池已关闭且队列为空,返回 null(线程退出)
        if (rs >= SHUTDOWN && (rs >= STOP || workQueue.isEmpty())) {
            decrementWorkerCount();
            return null;
        }

        int wc = workerCountOf(c);
        // 关键判断:是否需要超时控制?
        // allowCoreThreadTimeOut=true 或 当前线程数 > 核心线程数 → 需要超时
        boolean timed = allowCoreThreadTimeOut || wc > corePoolSize;

        // 线程数超标或超时 → 减少线程
        if ((wc > maximumPoolSize || (timed && timedOut))
            && (wc > 1 || workQueue.isEmpty())) {
            if (compareAndDecrementWorkerCount(c))
                return null; // 返回 null,调用者的 while 循环结束,线程退出
            continue;
        }

        try {
            Runnable r = timed ?
                workQueue.poll(keepAliveTime, TimeUnit.NANOSECONDS) : // 超时等待
                workQueue.take(); // 无限等待(核心线程在这里阻塞,不会退出)
            if (r != null) return r;
            timedOut = true; // poll 超时返回 null
        } catch (InterruptedException retry) {
            timedOut = false;
        }
    }
}

核心线程如何”存活”?

核心线程的生命周期:

  runWorker() 循环
       |
       v
  task = getTask()
       |
       v
  workQueue.take()  <--- 队列为空时,线程在这里阻塞等待
       |                  这就是核心线程不会退出的原因:
       |                  take() 会一直等,直到队列有新任务
       v
  拿到任务 → task.run() → 回到循环开头

非核心线程的生命周期:

  runWorker() 循环
       |
       v
  task = getTask()
       |
       v
  workQueue.poll(keepAliveTime)  <--- 等待 keepAliveTime 后超时
       |
       v
  返回 null → getTask 返回 null → while 循环结束 → 线程退出

6.4 优雅关闭

// shutdown() — 温和关闭:不接受新任务,但会执行完队列中的任务
public void shutdown() {
    final ReentrantLock mainLock = this.mainLock;
    mainLock.lock();
    try {
        checkShutdownAccess();
        advanceRunState(SHUTDOWN);     // 状态改为 SHUTDOWN
        interruptIdleWorkers();         // 中断空闲线程(正在 take/poll 的线程)
        onShutdown();                   // 钩子方法(ScheduledThreadPoolExecutor 用)
    } finally {
        mainLock.unlock();
    }
    tryTerminate();
}

// shutdownNow() — 暴力关闭:不接受新任务,清空队列,中断所有线程
public List<Runnable> shutdownNow() {
    List<Runnable> tasks;
    final ReentrantLock mainLock = this.mainLock;
    mainLock.lock();
    try {
        checkShutdownAccess();
        advanceRunState(STOP);          // 状态改为 STOP
        interruptWorkers();              // 中断所有线程(包括正在执行任务的)
        tasks = drainQueue();            // 清空队列,返回未执行的任务
    } finally {
        mainLock.unlock();
    }
    tryTerminate();
    return tasks;
}

推荐的优雅关闭序列:

// 设计动机:先温和关闭,等一段时间;如果还没关完,再暴力关闭
// 这是 Java 官方文档推荐的最佳实践
public void gracefulShutdown(ExecutorService executor) {
    executor.shutdown(); // 第一步:拒绝新任务,等待已提交任务完成
    try {
        // 第二步:等待已提交的任务执行完毕,最多等 60 秒
        if (!executor.awaitTermination(60, TimeUnit.SECONDS)) {
            executor.shutdownNow(); // 第三步:超时后强制关闭
            // 第四步:再等一次,确认所有线程都已退出
            if (!executor.awaitTermination(60, TimeUnit.SECONDS))
                System.err.println("线程池未能完全关闭");
        }
    } catch (InterruptedException ie) {
        // 当前线程被中断,立即强制关闭
        executor.shutdownNow();
        Thread.currentThread().interrupt(); // 保留中断状态
    }
}

一句话总结: 线程池通过 ctl 变量管理状态和线程数,execute() 的三级策略(核心线程 → 队列 → 非核心线程 → 拒绝)和 Worker 的 getTask() 循环是理解线程池的关键。


七、CompletableFuture 编排原理

7.1 设计思想

Future 的局限性:

// 传统 Future 的问题:get() 是阻塞的
Future<String> future = executor.submit(() -> queryDatabase());
String result = future.get(); // 阻塞!调用线程什么都干不了,只能等
process(result);

// 如果有多个异步任务要编排,代码变成了"回调地狱"或"阻塞串行"
Future<A> fa = executor.submit(() -> taskA());
A a = fa.get(); // 阻塞
Future<B> fb = executor.submit(() -> taskB(a));
B b = fb.get(); // 阻塞
Future<C> fc = executor.submit(() -> taskC(b));
C c = fc.get(); // 阻塞

CompletableFuture 的解决方案:

// CompletableFuture = Future + 回调 + 组合 + 异常处理
// 设计动机:让异步编程像写同步代码一样流畅,无需阻塞等待
CompletableFuture
    .supplyAsync(() -> queryDatabase())        // 异步执行
    .thenApply(data -> transform(data))         // 上一步完成后自动执行(无需阻塞)
    .thenAccept(result -> sendResponse(result)) // 继续链式处理
    .exceptionally(ex -> handleError(ex));      // 统一异常处理

// 组合多个异步任务:
CompletableFuture<String> userFuture = supplyAsync(() -> getUser());
CompletableFuture<String> orderFuture = supplyAsync(() -> getOrder());

// 两个任务并行执行,都完成后合并结果
userFuture.thenCombine(orderFuture, (user, order) ->
    merge(user, order));

7.2 内部结构

public class CompletableFuture<T> implements Future<T>, CompletionStage<T> {
    // 结果字段:持有正常结果或异常
    // null = 未完成
    // 正常值 = 完成
    // AltResult(exception) = 异常完成
    // AltResult(null) = null 结果(区分 null 值和未完成)
    volatile Object result;

    // 回调栈:Treiber Stack(无锁并发栈)
    // 每个注册的回调(thenApply, thenAccept 等)都是一个 Completion 节点
    // 新注册的回调通过 CAS 压入栈顶
    volatile Completion stack;
}
CompletableFuture 内部的 Completion 栈(Treiber Stack):

  +-------------------+
  | CompletableFuture |
  | result: null      |  (未完成)
  | stack: --------+  |
  +----------------+--+
                   |
                   v
             +-----------+     +-----------+     +-----------+
             | Completion|---->| Completion|---->| Completion|----> null
             | (thenApply)     | (thenAccept)    | (exceptionally)
             +-----------+     +-----------+     +-----------+
             (最后注册的)       (第二个注册的)     (最先注册的)

注意:这是一个栈结构(后进先出),但实际执行时会遍历整个链

当 complete(value) 被调用时:
  1. 设置 result = value
  2. 弹出 stack 中的所有 Completion 节点
  3. 依次触发每个 Completion 的回调逻辑

7.3 异步回调链执行机制

thenApply 是如何注册回调的:

// 简化后的核心逻辑
public <U> CompletableFuture<U> thenApply(Function<? super T, ? extends U> fn) {
    return uniApplyStage(null, fn); // null 表示同步执行(用调用线程或完成线程)
}

public <U> CompletableFuture<U> thenApplyAsync(Function<? super T, ? extends U> fn) {
    return uniApplyStage(asyncPool, fn); // asyncPool 通常是 ForkJoinPool.commonPool()
}

private <V> CompletableFuture<V> uniApplyStage(Executor e, Function<? super T, ? extends V> f) {
    CompletableFuture<V> d = new CompletableFuture<V>(); // 创建下游 CF
    // 如果 this 已经完成了,直接执行 fn
    // 否则创建 UniApply 节点,CAS 压入 this.stack
    if (e != null || !d.uniApply(this, f, null)) {
        UniApply<T,V> c = new UniApply<T,V>(e, d, this, f);
        push(c);         // CAS 入栈
        c.tryFire(SYNC); // 再次尝试(防止在入栈过程中 this 已经完成)
    }
    return d; // 返回下游 CF,支持链式调用
}

complete() 是如何触发回调链的:

public boolean complete(T value) {
    boolean triggered = completeValue(value); // CAS 设置 result
    postComplete(); // 触发所有回调
    return triggered;
}

// postComplete 的核心逻辑
final void postComplete() {
    CompletableFuture<?> f = this; Completion h;
    // 遍历 stack 链表,依次触发每个 Completion
    while ((h = f.stack) != null ||
           (f != this && (h = (f = this).stack) != null)) {
        CompletableFuture<?> d; Completion t;
        // CAS 弹出栈顶
        if (f.casStack(h, t = h.next)) {
            if (t != null) {
                // 还有更多 Completion,继续处理
                if (f != this) {
                    pushStack(h);
                    continue;
                }
                h.next = null;
            }
            // 触发回调:f 是上游 CF 的结果,h 是注册的回调
            f = (d = h.tryFire(NESTED)) == null ? this : d;
        }
    }
}

线程切换规则:

+---------------------+----------------------------------------+
|  方法                |  执行线程                               |
+---------------------+----------------------------------------+
| thenApply(fn)       | 如果上游已完成:调用 thenApply 的线程     |
|                     | 如果上游未完成:完成上游的那个线程         |
+---------------------+----------------------------------------+
| thenApplyAsync(fn)  | ForkJoinPool.commonPool() 中的线程      |
+---------------------+----------------------------------------+
| thenApplyAsync(     | 指定的 Executor 中的线程                 |
|   fn, executor)     |                                        |
+---------------------+----------------------------------------+

7.4 异常传播机制

// 异常传播:上游的异常会沿着回调链向下传播
CompletableFuture<String> cf = CompletableFuture
    .supplyAsync(() -> {
        if (true) throw new RuntimeException("出错了");
        return "ok";
    })
    .thenApply(s -> s + " world")       // 不会执行(上游异常了)
    .thenApply(s -> s.toUpperCase());   // 不会执行(上游异常了)

// cf.get() 会抛出 ExecutionException,cause 是 RuntimeException("出错了")

三种异常处理方式的区别:

// === exceptionally:只处理异常,正常值透传 ===
// 设计动机:类似 try-catch,只在出错时执行
CompletableFuture<String> result = cf
    .exceptionally(ex -> {
        // 只有上游异常时才执行
        return "默认值"; // 返回兜底值,让下游继续正常执行
    });

// === handle:无论成功失败都执行,可以转换结果 ===
// 设计动机:类似 try-catch-finally + return,总是执行,总是返回新值
CompletableFuture<String> result = cf
    .handle((value, ex) -> {
        if (ex != null) {
            return "异常处理结果"; // 异常情况
        }
        return value + " 处理后";  // 正常情况
    });

// === whenComplete:无论成功失败都执行,但不改变结果 ===
// 设计动机:类似 try-finally,用于记录日志、释放资源等副作用操作
CompletableFuture<String> result = cf
    .whenComplete((value, ex) -> {
        // 无论成功失败都执行
        // 但不能改变 result 的值(返回值是 void)
        if (ex != null) {
            log.error("执行失败", ex);
        } else {
            log.info("执行成功: " + value);
        }
    });
三种异常处理方式对比:

+------------------+---------+----------+----------+
|  方法             | 正常执行 | 异常执行 | 能否改值 |
+------------------+---------+----------+----------+
| exceptionally    |   否    |    是    |    是    |
| handle           |   是    |    是    |    是    |
| whenComplete     |   是    |    是    |    否    |
+------------------+---------+----------+----------+

一句话总结: CompletableFuture 通过内部的 Treiber Stack 管理回调链,complete() 触发链式执行,实现了无阻塞的异步编排能力。


八、ThreadLocal 原理与内存泄漏

8.1 ThreadLocal 结构

Thread 对象
+-------------------------------+
|  Thread                       |
|                               |
|  threadLocals: ThreadLocalMap |--+
+-------------------------------+  |
                                   |
         +-------------------------+
         |
         v
+------ ThreadLocalMap ------+
|  Entry[] table              |
|                             |
|  [0]  [1]  [2]  [3]  ...  |
|   |    |    |    |         |
|   v    v    v    v         |
| Entry Entry null Entry     |
+----+------------------------+
     |
     v
+--- Entry extends WeakReference<ThreadLocal<?>> ---+
|  key   = WeakReference<ThreadLocal<?>>            |
|  value = Object(实际存储的值)                     |
+---------------------------------------------------+

完整的引用关系图:

栈内存                    堆内存
+--------+
| Thread | --强引用--> ThreadLocalMap --强引用--> Entry[]
+--------+                                         |
                                                    v
+----------+                                   Entry
| ThreadLocal | <--弱引用-- key (WeakReference)   |
| 变量 tl    | --强引用-->                     value --强引用--> 实际值对象
+----------+

一个线程可以有多个 ThreadLocal,每个 ThreadLocal 对应 Entry[] 中的一个槽位
// ThreadLocal 的核心方法
// 设计动机:每个线程有自己独立的变量副本,天然线程安全,无需同步
public class ThreadLocal<T> {

    public T get() {
        Thread t = Thread.currentThread();
        ThreadLocalMap map = getMap(t);   // 获取当前线程的 ThreadLocalMap
        if (map != null) {
            ThreadLocalMap.Entry e = map.getEntry(this); // this 就是 ThreadLocal 对象
            if (e != null) {
                T result = (T) e.value;
                return result;
            }
        }
        return setInitialValue();
    }

    public void set(T value) {
        Thread t = Thread.currentThread();
        ThreadLocalMap map = getMap(t);
        if (map != null)
            map.set(this, value);  // key = this(ThreadLocal对象), value = 用户的值
        else
            createMap(t, value);
    }

    // 重要!使用完毕必须调用 remove()
    public void remove() {
        ThreadLocalMap m = getMap(Thread.currentThread());
        if (m != null)
            m.remove(this);
    }

    ThreadLocalMap getMap(Thread t) {
        return t.threadLocals; // 每个线程自己的 map
    }
}

8.2 哈希冲突解决

ThreadLocalMap 使用开放定址法(线性探测) 而不是 HashMap 的链地址法。

为什么用开放定址法?
  1. ThreadLocal 数量通常很少(一个线程不会有几百个 ThreadLocal)
  2. 开放定址法对 CPU 缓存更友好(数据连续存储在数组中)
  3. 避免了链表节点的额外内存开销

线性探测过程:
  假设 table 长度为 16,ThreadLocal 的 hashCode 映射到 index=5

  设置 tl1(hash→5): table[5] = Entry(tl1, value1)
  设置 tl2(hash→5,冲突!): table[5] 被占 → 探测 table[6] = Entry(tl2, value2)
  设置 tl3(hash→5,冲突!): table[5] 被占 → table[6] 被占 → table[7] = Entry(tl3, value3)

  table:  [0] [1] [2] [3] [4] [5]    [6]    [7]    [8] ...
                                 tl1    tl2    tl3

魔数 0x61c88647(斐波那契散列):

// 设计动机:让 ThreadLocal 在 table 中尽量均匀分布,减少冲突
// 0x61c88647 是黄金分割比 * 2^32 的近似值
private static final int HASH_INCREMENT = 0x61c88647;

private static AtomicInteger nextHashCode = new AtomicInteger();

private static int nextHashCode() {
    return nextHashCode.getAndAdd(HASH_INCREMENT);
}

// 连续的 ThreadLocal 对象的 hashCode 差值恒为 0x61c88647
// 对 2 的幂次方取模后,这些值会均匀分布在数组中
// 例如 table 大小为 16 时:
// tl0: 0x61c88647 & 15 = 7
// tl1: 0xc3910c8e & 15 = 14
// tl2: 0x255992d5 & 15 = 5
// tl3: 0x8722191c & 15 = 12
// 分布非常均匀!

8.3 内存泄漏深度分析

引用链分析:

正常使用时的引用关系:

ThreadLocal对象 tl
  ^                  ^
  | 强引用(栈变量)    | 弱引用(Entry.key)
  |                  |
栈帧                Entry
                     |
                     | 强引用
                     v
                   Value 对象

当 tl = null(ThreadLocal 变量被回收)后:

(已GC)               弱引用断裂
                     |
                   Entry
                   key = null  <--- key 被 GC 了
                     |
                     | 强引用     <--- value 还在!没人引用 Entry 本身
                     v
                   Value 对象     <--- 无法被 GC,内存泄漏!

引用链:Thread → ThreadLocalMap → Entry[] → Entry → Value
只要 Thread 活着,这条链就断不了

为什么在线程池中特别危险?

正常线程:
  线程执行完毕 → 线程对象被 GC → ThreadLocalMap 被 GC → 所有 Entry 和 Value 被 GC
  不会泄漏!

线程池中的线程:
  线程执行完任务 → 线程不会销毁,回到池中等待下一个任务
  → ThreadLocalMap 一直存在
  → Entry(key=null, value=大对象) 一直存在
  → 内存泄漏!

  而且如果同一个线程不断接收新任务,每次任务都使用新的 ThreadLocal
  → ThreadLocalMap 中积累越来越多的 stale Entry
  → 内存泄漏越来越严重

泄漏示例:

// 典型的内存泄漏场景
public class ThreadLocalLeakDemo {

    // 线程池,线程长期存活
    private static final ExecutorService pool = Executors.newFixedThreadPool(10);

    public static void main(String[] args) {
        for (int i = 0; i < 100000; i++) {
            pool.execute(() -> {
                // 每次任务创建一个新的 ThreadLocal
                ThreadLocal<byte[]> tl = new ThreadLocal<>();
                tl.set(new byte[1024 * 1024]); // 1MB 数据

                // 使用 tl ...

                // 忘记调用 tl.remove()!
                // 任务结束后 tl 局部变量出栈,ThreadLocal 对象只剩弱引用
                // 下次 GC 时 key 变成 null,但 1MB 的 value 还在!
            });
        }
    }
}

正确用法:

// 最佳实践:始终在 finally 中调用 remove()
private static final ThreadLocal<UserContext> userContext = new ThreadLocal<>();

public void handleRequest(Request request) {
    try {
        userContext.set(new UserContext(request.getUserId()));
        // 业务逻辑...
        processRequest();
    } finally {
        userContext.remove(); // 必须在 finally 中清理!
    }
}

ThreadLocalMap 的自我清理机制:

虽然 ThreadLocalMap 在 get()set()remove() 时会顺带清理遇到的 stale Entry(key == null 的 Entry),但这只是”碰巧清理”,不能完全避免泄漏。因为如果那些 stale Entry 所在的槽位一直没有被访问到,它们就永远不会被清理。

8.4 InheritableThreadLocal

问题: 父线程的 ThreadLocal 值默认无法传递给子线程。

// InheritableThreadLocal 的设计动机:
// 某些场景下需要在子线程中访问父线程的上下文信息(如 traceId、userId)
public class InheritableThreadLocalDemo {

    private static final InheritableThreadLocal<String> traceId =
        new InheritableThreadLocal<>();

    public static void main(String[] args) {
        traceId.set("trace-001");

        // 创建子线程时,会复制父线程的 inheritableThreadLocals
        new Thread(() -> {
            System.out.println(traceId.get()); // 输出:trace-001
        }).start();
    }
}

原理: Thread 的构造函数中,如果父线程有 inheritableThreadLocals,会把父线程的 map 复制一份给子线程(浅拷贝)。

InheritableThreadLocal 在线程池中的问题:

线程池中的线程是预创建的,不是每次提交任务时才创建。
所以子线程的 inheritableThreadLocals 是在线程创建时从父线程复制的,
而不是在提交任务时。

父线程 A(traceId=001)提交任务 → 线程池线程 T1(创建时父线程是 main)
  T1 的 inheritableThreadLocals 是 main 线程的,不是线程 A 的!

解决方案:阿里巴巴的 TransmittableThreadLocal (TTL)
  原理:在任务提交时捕获当前线程的 ThreadLocal 值,
       在任务执行时恢复到工作线程中,任务结束后恢复原值。
// TransmittableThreadLocal 的使用方式(需要额外依赖 transmittable-thread-local)
// 设计动机:解决线程池场景下线程本地变量传递的问题

// 方式一:修饰线程池
ExecutorService ttlExecutor = TtlExecutors.getTtlExecutorService(
    Executors.newFixedThreadPool(10));

// 方式二:修饰 Runnable
Runnable ttlRunnable = TtlRunnable.get(() -> {
    // 这里可以拿到提交任务时的 TransmittableThreadLocal 值
});
executor.submit(ttlRunnable);

一句话总结: ThreadLocal 通过为每个线程维护独立的 ThreadLocalMap 实现线程隔离,但在线程池中使用时必须在 finally 中调用 remove(),否则 Entry 的弱引用 key 被 GC 后 value 无法回收,导致内存泄漏。


九、并发设计模式

9.1 生产者-消费者模式

核心思想: 通过一个共享队列解耦生产者和消费者,两者不直接通信。

+----------+    put()    +----------------+    take()    +----------+
| 生产者 1  | ---------> |                | ----------> | 消费者 1  |
+----------+             |  BlockingQueue |             +----------+
+----------+    put()    |  (缓冲区)      |    take()    +----------+
| 生产者 2  | ---------> |                | ----------> | 消费者 2  |
+----------+             +----------------+             +----------+

设计优势:
1. 解耦:生产者和消费者不需要知道对方的存在
2. 削峰:队列缓冲突发流量
3. 异步:生产者不需要等消费者处理完
// 实际场景:异步日志写入
// 设计动机:日志写磁盘很慢(IO 操作),不应该阻塞业务线程
// 业务线程把日志消息放入队列,专门的日志线程从队列取出并写入文件
public class AsyncLogger {

    private final BlockingQueue<String> logQueue = new LinkedBlockingQueue<>(10000);
    private volatile boolean running = true;

    // 生产者:业务线程调用
    public void log(String message) {
        if (!logQueue.offer(message)) {
            // 队列满了,丢弃日志(降级策略)
            System.err.println("日志队列满,丢弃: " + message);
        }
    }

    // 消费者:单独的日志线程
    public void start() {
        Thread logThread = new Thread(() -> {
            while (running || !logQueue.isEmpty()) {
                try {
                    String msg = logQueue.poll(100, TimeUnit.MILLISECONDS);
                    if (msg != null) {
                        writeToFile(msg); // 慢速 IO 操作
                    }
                } catch (InterruptedException e) {
                    Thread.currentThread().interrupt();
                }
            }
        }, "async-logger");
        logThread.setDaemon(true);
        logThread.start();
    }

    public void shutdown() {
        running = false; // 停止接收,处理完队列中剩余的日志后退出
    }

    private void writeToFile(String message) {
        // 实际的文件写入操作...
    }
}

一句话总结: 生产者-消费者模式通过 BlockingQueue 解耦、缓冲和异步化,是并发编程中最基础也最实用的设计模式。

9.2 读写分离模式

核心思想: 读操作之间不互斥,只有写操作需要独占。适合读多写少的场景。

ReadWriteLock 的访问规则:

          读者                    写者
+-----+-----+-----+     +-----+-----+-----+
|  R  |  R  |  R  |     |  W  |     |     |
|  同  |  时  |  读  |     |  独  | 阻塞 | 阻塞 |
+-----+-----+-----+     +-----+-----+-----+

读-读:共享(并发读)
读-写:互斥(有人读时不能写,有人写时不能读)
写-写:互斥(同时只能有一个写者)
// 设计动机:对于缓存这类读远多于写的数据结构
// 如果用 synchronized,读操作也会互斥,严重浪费并发度
// ReadWriteLock 让读操作完全并行,只在写时才加锁
public class ReadWriteCache<K, V> {

    private final Map<K, V> cache = new HashMap<>();
    private final ReadWriteLock lock = new ReentrantReadWriteLock();
    private final Lock readLock = lock.readLock();
    private final Lock writeLock = lock.writeLock();

    // 读操作:多个线程可以同时执行
    public V get(K key) {
        readLock.lock();
        try {
            return cache.get(key);
        } finally {
            readLock.unlock();
        }
    }

    // 写操作:独占执行
    public void put(K key, V value) {
        writeLock.lock();
        try {
            cache.put(key, value);
        } finally {
            writeLock.unlock();
        }
    }

    // 锁降级:写锁 → 读锁(安全的)
    // 设计动机:写完数据后,不释放写锁直接获取读锁,
    // 然后释放写锁。这样可以在写操作后立即以读模式继续使用数据,
    // 同时允许其他读者进入。
    public V putAndGet(K key, V value) {
        writeLock.lock();
        try {
            cache.put(key, value);
            readLock.lock(); // 在持有写锁的情况下获取读锁(锁降级)
        } finally {
            writeLock.unlock(); // 释放写锁,此时仍持有读锁
        }
        try {
            return cache.get(key); // 以读模式访问
        } finally {
            readLock.unlock();
        }
    }
    // 注意:读锁不能升级为写锁(会死锁),只能写锁降级为读锁
}

一句话总结: ReadWriteLock 通过读共享/写独占的策略,在读多写少场景下大幅提升并发性能,是 synchronized 的精细化替代方案。

9.3 分而治之 --- ForkJoinPool

核心思想: 把大任务拆分成小任务(Fork),分别执行后合并结果(Join)。

ForkJoin 的分治过程:

                    [计算 1~1000000 的和]
                     /                \
                  Fork               Fork
                   /                    \
     [计算 1~500000]                [计算 500001~1000000]
       /        \                     /          \
    Fork       Fork               Fork          Fork
     /            \                /                \
[1~250000]   [250001~500000]  [500001~750000]  [750001~1000000]
     \            /                \                /
      \--Join---/                   \----Join------/
          \                             /
           \---------Join--------------/
                      |
               最终结果:500000500000

工作窃取(Work-Stealing)算法:

每个工作线程有自己的双端队列(Deque):

线程 1 的队列(自己从头部取):
  HEAD --> [task1] [task2] [task3] [task4] <-- TAIL

线程 2 的队列(自己从头部取):
  HEAD --> [task5] <-- TAIL

线程 3 的队列(空闲!):
  HEAD --> <empty> <-- TAIL

工作窃取过程:
  线程 3 空闲了!从线程 1 的队列尾部偷一个任务:

线程 1 的队列:
  HEAD --> [task1] [task2] [task3] <-- TAIL   (task4 被偷走了)

线程 3 执行偷来的 task4

设计动机:
1. 自己的任务从头部取(LIFO),偷别人的从尾部取(FIFO),减少竞争
2. LIFO 取自己的任务可以利用 CPU 缓存局部性(最近 Fork 出的子任务数据还在缓存中)
3. FIFO 偷大任务(早期 Fork 的任务通常更大),偷一个大任务比偷多个小任务效率高
// RecursiveTask(有返回值) vs RecursiveAction(无返回值)
// 设计动机:展示 ForkJoin 的使用方式和阈值控制

public class ParallelSum extends RecursiveTask<Long> {

    private static final int THRESHOLD = 10000; // 阈值:小于此值直接计算
    private final long[] array;
    private final int start, end;

    public ParallelSum(long[] array, int start, int end) {
        this.array = array;
        this.start = start;
        this.end = end;
    }

    @Override
    protected Long compute() {
        int length = end - start;

        // 任务足够小,直接计算(不再分割)
        if (length <= THRESHOLD) {
            long sum = 0;
            for (int i = start; i < end; i++) {
                sum += array[i];
            }
            return sum;
        }

        // 任务太大,分成两半
        int mid = start + length / 2;
        ParallelSum leftTask = new ParallelSum(array, start, mid);
        ParallelSum rightTask = new ParallelSum(array, mid, end);

        leftTask.fork();  // 异步执行左半部分
        Long rightResult = rightTask.compute(); // 当前线程直接计算右半部分
        Long leftResult = leftTask.join();      // 等待左半部分完成

        return leftResult + rightResult; // 合并结果
    }

    public static void main(String[] args) {
        long[] array = new long[10_000_000];
        // 填充数组...

        ForkJoinPool pool = new ForkJoinPool(); // 默认线程数 = CPU 核心数
        long result = pool.invoke(new ParallelSum(array, 0, array.length));
        System.out.println("Sum = " + result);
    }
}

ForkJoinPool 与 Java 8 并行流的关系:

// Java 8 的并行流底层就是用 ForkJoinPool.commonPool() 实现的
long sum = LongStream.rangeClosed(1, 1_000_000)
    .parallel()  // 使用 ForkJoinPool.commonPool()
    .sum();

// 注意:共享的 commonPool 可能导致问题
// 如果一个并行流任务很慢(比如有 IO),会影响其他使用 commonPool 的任务
// 解决方案:使用自定义的 ForkJoinPool
ForkJoinPool customPool = new ForkJoinPool(4);
long sum = customPool.submit(() ->
    LongStream.rangeClosed(1, 1_000_000).parallel().sum()
).get();

一句话总结: ForkJoinPool 通过分治策略拆分任务 + 工作窃取算法平衡负载,是计算密集型并行任务的理想执行框架。

9.4 Future 模式

核心思想: 提交任务时立即返回一个”凭证”(Future),任务在后台异步执行,需要结果时再通过凭证获取。

传统同步模式(串行等待):
  主线程:[提交任务] --等待30秒--> [拿到结果] --> [继续工作]

Future 模式(异步获取):
  主线程:[提交任务] --> [拿到 Future 凭证] --> [做其他事情...] --> [future.get() 拿结果]
  工作线程:           [后台执行任务 30秒]

类比:在餐厅点餐后拿到取餐号(Future),不需要站在窗口等,
     可以先去找座位、倒水,等号码被叫到再去取餐。
// 设计动机:解耦任务的提交和结果的获取,允许调用方在等待期间做其他事情
public class FuturePatternDemo {

    public static void main(String[] args) throws Exception {
        ExecutorService executor = Executors.newCachedThreadPool();

        // 同时提交多个耗时任务
        Future<String> userFuture = executor.submit(() -> queryUserService());   // 2秒
        Future<String> orderFuture = executor.submit(() -> queryOrderService()); // 3秒
        Future<String> stockFuture = executor.submit(() -> queryStockService()); // 1秒

        // 三个任务并行执行,总耗时 = max(2, 3, 1) = 3秒
        // 而非串行的 2 + 3 + 1 = 6秒

        String user = userFuture.get();   // 可能已经完成,无需等待
        String order = orderFuture.get();
        String stock = stockFuture.get();

        return merge(user, order, stock);
    }
}

一句话总结: Future 模式通过异步执行 + 延迟获取结果,将任务的提交与结果的消费解耦,是实现并行化的基础模式。

9.5 Thread-Per-Message vs Worker Thread

Thread-Per-Message(每请求一线程):

传统模式:
  请求1 --> new Thread() --> 处理 --> 线程销毁
  请求2 --> new Thread() --> 处理 --> 线程销毁
  请求3 --> new Thread() --> 处理 --> 线程销毁

  问题:线程创建和销毁开销大,高并发时可能 OOM

Worker Thread(线程池模式):

线程池模式:
  线程池 [Thread1, Thread2, Thread3]

  请求1 --> Thread1 执行 --> 归还
  请求2 --> Thread2 执行 --> 归还
  请求3 --> Thread3 执行 --> 归还
  请求4 --> 等待...直到有线程空闲

  优势:线程复用,资源可控

Java 21 虚拟线程对 Thread-Per-Message 的影响:

// Java 21 之前:Thread-Per-Message 模式不可行(平台线程太重)
// 10000 个并发请求 = 10000 个平台线程 = ~10GB 内存(每个线程约 1MB 栈空间)

// Java 21 之后:虚拟线程让 Thread-Per-Message 重新可行
// 虚拟线程极其轻量(~几KB),百万级并发不是问题
// 设计动机:简化并发编程模型,回归简单直观的"一个请求一个线程"

try (var executor = Executors.newVirtualThreadPerTaskExecutor()) {
    // 每个任务一个虚拟线程,但虚拟线程几乎没有创建开销
    for (int i = 0; i < 100_000; i++) {
        executor.submit(() -> {
            // IO 密集型操作(网络请求、数据库查询)
            // 虚拟线程在 IO 时会自动让出底层平台线程
            String result = httpClient.send(request, bodyHandler);
            processResult(result);
        });
    }
}

// 虚拟线程的原理:
// 多个虚拟线程共享少量平台线程(载体线程)
// 当虚拟线程执行 IO 操作时,会被"挂载"到载体线程上
// IO 阻塞时,虚拟线程被"卸载",载体线程去执行其他虚拟线程
// IO 完成后,虚拟线程被重新"挂载"到某个载体线程上继续执行

// 平台线程          虚拟线程
// [载体线程1] <---- VT1(运行中)
// [载体线程2] <---- VT2(运行中)
//                   VT3(等待IO,已卸载)
//                   VT4(等待IO,已卸载)
//                   VT5(等待调度)
// VT3 的 IO 完成后,挂载到空闲的载体线程上继续执行

一句话总结: Worker Thread(线程池)是传统 Java 并发的标准模式;而 Java 21 的虚拟线程让 Thread-Per-Message 模式重获新生,以极低的开销实现”一个请求一个线程”的简洁编程模型。


总结

+----------------------------------------------------------+
|               Java 并发编程知识体系全景图                   |
+----------------------------------------------------------+
|                                                          |
|  底层基石                                                 |
|  ├── JMM(happens-before + 内存屏障)                     |
|  ├── CAS(CPU 级原子指令)                                |
|  └── volatile(可见性 + 有序性)                           |
|                                                          |
|  锁机制                                                   |
|  ├── synchronized(偏向锁→轻量级锁→重量级锁)              |
|  └── AQS(CLH队列 + state + CAS)                        |
|      ├── ReentrantLock(独占可重入)                       |
|      ├── Semaphore(共享许可)                             |
|      ├── CountDownLatch(共享倒计时)                      |
|      └── ReentrantReadWriteLock(读写分离)                |
|                                                          |
|  无锁方案                                                 |
|  ├── Atomic 系列(CAS 自旋)                              |
|  └── LongAdder(Cell 数组分散热点)                        |
|                                                          |
|  线程管理                                                 |
|  ├── ThreadLocal(线程隔离 + 注意内存泄漏)                |
|  ├── 线程池(ctl状态机 + Worker生命周期)                  |
|  └── ForkJoinPool(分治 + 工作窃取)                      |
|                                                          |
|  异步编排                                                 |
|  ├── Future(异步凭证)                                   |
|  └── CompletableFuture(回调链 + Treiber Stack)          |
|                                                          |
|  设计模式                                                 |
|  ├── 生产者-消费者(BlockingQueue)                        |
|  ├── 读写分离(ReadWriteLock)                            |
|  └── Thread-Per-Message → 虚拟线程(Java 21)             |
|                                                          |
+----------------------------------------------------------+

学习路径建议:

  1. 先理解 JMM 和 volatile,这是所有并发机制的理论基础
  2. 再学 synchronized 的锁升级,理解 JVM 层面的优化思路
  3. 然后学 CAS 和 AQS,这是 JUC 包的实现基础
  4. 最后学线程池、CompletableFuture 等高层抽象,会用且知道为什么这样设计
  5. ThreadLocal 要特别注意内存泄漏问题,面试高频考点

核心思想贯穿全文:

  • 空间换时间:ThreadLocal 用每线程一份副本换取无锁;LongAdder 用 Cell 数组换取低竞争
  • 分而治之:ForkJoinPool 把大任务拆小;LongAdder 把热点变量分散
  • 乐观 vs 悲观:CAS 是乐观策略(假设不冲突,冲突了重试);synchronized 是悲观策略(假设会冲突,先加锁)
  • 升级而非降级:synchronized 的锁只升级不降级,因为降级判断的成本太高
  • 延迟初始化:偏向锁、线程池的核心线程都是”需要时才创建”的思路