Java 并发编程实战使用手册

本手册面向有一定 Java 基础的开发者,侧重企业级实战而非理论堆砌。 每一行代码都附有注释,解释的是 “为什么这样写” 而非 “这行代码做了什么”


一、为什么需要并发编程

1.1 真实业务场景

在企业级开发中,以下场景几乎每天都会遇到:

场景串行痛点并发收益
订单批量处理10000 笔订单逐条处理需要 30 分钟10 线程并行,3 分钟完成
数据报表导出汇总 5 个数据源需要 25 秒并行查询,最慢的那个决定总耗时(约 8 秒)
多渠道消息推送短信 + 邮件 + App推送串行发送 3 秒并行推送 1 秒
接口聚合调用用户详情页需要调 5 个微服务,串行 2.5 秒并行调用 0.6 秒

1.2 用数据说话:串行 vs 并发

package com.example.concurrent.demo;

import java.util.concurrent.*;
import java.util.List;
import java.util.ArrayList;

/**
 * 模拟「用户详情页」接口聚合场景
 * 需要从 5 个微服务获取数据:用户基础信息、订单列表、优惠券、积分、收货地址
 */
public class SequentialVsConcurrentDemo {

    // 模拟远程调用,每个耗时 500ms
    private static String callRemoteService(String serviceName) {
        try {
            Thread.sleep(500); // 模拟网络IO延迟
        } catch (InterruptedException e) {
            Thread.currentThread().interrupt(); // 恢复中断标志,不能吞掉中断
        }
        return serviceName + " 的数据";
    }

    /**
     * 串行调用:总耗时 = 所有调用耗时之和
     */
    public static void sequential() {
        long start = System.currentTimeMillis();

        // 逐个调用,后一个必须等前一个完成
        String userInfo = callRemoteService("用户服务");
        String orders = callRemoteService("订单服务");
        String coupons = callRemoteService("优惠券服务");
        String points = callRemoteService("积分服务");
        String addresses = callRemoteService("地址服务");

        long cost = System.currentTimeMillis() - start;
        System.out.println("串行调用耗时:" + cost + "ms"); // 约 2500ms
    }

    /**
     * 并发调用:总耗时 ≈ 最慢那个调用的耗时
     */
    public static void concurrent() throws Exception {
        long start = System.currentTimeMillis();

        // 使用线程池而非裸创建线程——线程是重量级资源,必须池化复用
        ExecutorService executor = Executors.newFixedThreadPool(5);

        // 提交 5 个异步任务,每个任务返回一个 Future
        Future<String> userInfoFuture = executor.submit(
            () -> callRemoteService("用户服务")
        );
        Future<String> ordersFuture = executor.submit(
            () -> callRemoteService("订单服务")
        );
        Future<String> couponsFuture = executor.submit(
            () -> callRemoteService("优惠券服务")
        );
        Future<String> pointsFuture = executor.submit(
            () -> callRemoteService("积分服务")
        );
        Future<String> addressesFuture = executor.submit(
            () -> callRemoteService("地址服务")
        );

        // get() 会阻塞等待结果,但 5 个任务是并行执行的
        String userInfo = userInfoFuture.get();
        String orders = ordersFuture.get();
        String coupons = couponsFuture.get();
        String points = pointsFuture.get();
        String addresses = addressesFuture.get();

        long cost = System.currentTimeMillis() - start;
        System.out.println("并发调用耗时:" + cost + "ms"); // 约 500ms

        executor.shutdown(); // 优雅关闭线程池,不再接受新任务
    }

    public static void main(String[] args) throws Exception {
        sequential();  // 输出约 2500ms
        concurrent();  // 输出约 500ms
        // 性能提升约 5 倍,这就是并发的价值
    }
}

结论:并发编程不是炫技,而是在 IO 密集型场景下提升吞吐量和响应速度的核心手段。


二、线程创建的三种方式与选型

2.1 方式一:继承 Thread 类

package com.example.concurrent.thread;

/**
 * 方式一:继承 Thread
 *
 * ⚠️ 实际项目中几乎不用这种方式,原因:
 * 1. Java 单继承,继承了 Thread 就无法继承其他类
 * 2. 任务和线程耦合在一起,无法复用任务逻辑
 * 3. 没有返回值,无法知道任务执行结果
 */
public class OrderProcessThread extends Thread {

    private final String orderId;

    public OrderProcessThread(String orderId) {
        super("order-process-" + orderId); // 给线程命名,方便排查问题
        this.orderId = orderId;
    }

    @Override
    public void run() {
        // 业务逻辑直接写在 run 方法里,与线程绑定——这是最大的问题
        System.out.println(Thread.currentThread().getName()
            + " 正在处理订单:" + orderId);
    }

    public static void main(String[] args) {
        // 每次都要 new 一个线程,无法池化复用
        OrderProcessThread t = new OrderProcessThread("ORD-20240101-001");
        t.start(); // start() 才是启动线程,run() 只是普通方法调用
    }
}

2.2 方式二:实现 Runnable 接口

package com.example.concurrent.thread;

/**
 * 方式二:实现 Runnable 接口
 *
 * 优点:
 * 1. 任务(Runnable)和线程(Thread)解耦,任务可以被不同线程执行
 * 2. 可以同时继承其他类
 * 3. 天然适合提交给线程池
 *
 * 缺点:没有返回值,无法抛出受检异常
 */
public class OrderProcessTask implements Runnable {

    private final String orderId;

    public OrderProcessTask(String orderId) {
        this.orderId = orderId;
    }

    @Override
    public void run() {
        // 任务逻辑独立于线程存在,可以被线程池复用
        System.out.println(Thread.currentThread().getName()
            + " 正在处理订单:" + orderId);
        // 模拟业务处理
        try {
            Thread.sleep(100);
        } catch (InterruptedException e) {
            // 恢复中断标志——让上层调用者知道发生了中断
            Thread.currentThread().interrupt();
        }
    }

    public static void main(String[] args) {
        // 任务和线程分离,任务可以复用
        Runnable task = new OrderProcessTask("ORD-20240101-001");

        // 可以交给 Thread 执行
        new Thread(task, "worker-1").start();

        // 更好的做法:交给线程池执行(后面会讲)
        // executorService.submit(task);
    }
}

2.3 方式三:实现 Callable + Future

package com.example.concurrent.thread;

import java.util.concurrent.*;

/**
 * 方式三:实现 Callable + Future
 *
 * 企业级开发中最常用的方式,原因:
 * 1. 有返回值——可以拿到任务执行结果
 * 2. 可以抛出受检异常——异常不会被吞掉
 * 3. 配合 Future 可以实现超时控制
 */
public class OrderValidationTask implements Callable<Boolean> {

    private final String orderId;

    public OrderValidationTask(String orderId) {
        this.orderId = orderId;
    }

    @Override
    public Boolean call() throws Exception {
        // 可以抛异常——Runnable 的 run() 做不到这一点
        System.out.println(Thread.currentThread().getName()
            + " 正在校验订单:" + orderId);

        // 模拟校验逻辑:查库存、验地址、检查黑名单...
        Thread.sleep(200);

        // 返回校验结果——Runnable 做不到
        return true;
    }

    public static void main(String[] args) throws Exception {
        ExecutorService executor = Executors.newSingleThreadExecutor();

        // submit 返回 Future,持有异步计算的结果
        Future<Boolean> future = executor.submit(
            new OrderValidationTask("ORD-20240101-001")
        );

        // get(timeout) 带超时控制——防止任务卡死拖垮调用方
        try {
            Boolean isValid = future.get(3, TimeUnit.SECONDS);
            System.out.println("订单校验结果:" + isValid);
        } catch (TimeoutException e) {
            // 超时后取消任务,释放线程资源
            future.cancel(true);
            System.out.println("订单校验超时,已取消");
        }

        executor.shutdown();
    }
}

2.4 三种方式对比

维度ThreadRunnableCallable + Future
实现方式继承 Thread 类实现 Runnable 接口实现 Callable 接口
有返回值
可抛受检异常
可继承其他类否(Java 单继承)
可提交线程池不直接支持支持支持
支持超时控制是(Future.get(timeout))
企业项目推荐不推荐简单场景可用推荐

⚠️ 重要提醒:在实际企业项目中,我们几乎不会直接创建线程。 正确做法是把 Runnable/Callable 提交给线程池。 直接 new Thread() 意味着:无法复用、无法控制数量、无法监控——是生产事故的温床。


三、线程池 — 并发编程的核心基础设施

线程池是 Java 并发编程中最重要的基础设施,没有之一。 你可以不用锁,但你不能不用线程池。

3.1 为什么禁止使用 Executors 创建线程池

阿里巴巴 Java 开发手册明确规定:线程池不允许使用 Executors 创建。原因如下:

FixedThreadPool 和 SingleThreadExecutor 的风险

package com.example.concurrent.pool;

import java.util.concurrent.*;

/**
 * ⚠️ 反面教材:Executors.newFixedThreadPool 的 OOM 风险
 *
 * 根源:其工作队列是 LinkedBlockingQueue,默认容量为 Integer.MAX_VALUE(约 21 亿)
 * 如果任务生产速度 > 消费速度,队列会无限增长,最终 OOM
 */
public class FixedThreadPoolOOMDemo {

    public static void main(String[] args) {
        // 看似安全的 2 个线程——实际上队列是无界的
        ExecutorService pool = Executors.newFixedThreadPool(2);

        // 模拟:大量请求涌入,每个任务执行时间较长
        for (int i = 0; i < Integer.MAX_VALUE; i++) {
            pool.submit(() -> {
                try {
                    // 模拟慢任务(如调用第三方接口超时)
                    Thread.sleep(10_000);
                } catch (InterruptedException e) {
                    Thread.currentThread().interrupt();
                }
            });
        }
        // 结果:队列不断积压 -> 内存持续增长 -> OOM
        // java.lang.OutOfMemoryError: GC overhead limit exceeded
    }
}

CachedThreadPool 的风险

package com.example.concurrent.pool;

import java.util.concurrent.*;

/**
 * ⚠️ 反面教材:Executors.newCachedThreadPool 的风险
 *
 * 根源:maximumPoolSize 为 Integer.MAX_VALUE
 * 来一个任务就创建一个线程,如果大量突发请求,会创建海量线程
 * 每个线程约占 1MB 栈内存 -> 直接 OOM 或导致 CPU 上下文切换风暴
 */
public class CachedThreadPoolOOMDemo {

    public static void main(String[] args) {
        // 看似灵活的缓存线程池——实际是定时炸弹
        ExecutorService pool = Executors.newCachedThreadPool();

        // 模拟:瞬间涌入大量请求
        for (int i = 0; i < 100_000; i++) {
            pool.submit(() -> {
                try {
                    Thread.sleep(5_000); // 每个任务持续 5 秒
                } catch (InterruptedException e) {
                    Thread.currentThread().interrupt();
                }
            });
        }
        // 结果:瞬间创建 10 万个线程 -> 栈内存约 100GB -> OOM
    }
}

ScheduledThreadPool 的风险

package com.example.concurrent.pool;

import java.util.concurrent.*;

/**
 * ⚠️ 反面教材:Executors.newScheduledThreadPool 的风险
 *
 * 根源:工作队列是 DelayedWorkQueue,也是无界队列
 * 如果定时任务执行失败后不断重试,或者任务提交过多,同样可能 OOM
 */
public class ScheduledThreadPoolRiskDemo {

    public static void main(String[] args) {
        // 看似安全的定时线程池——队列依然无界
        ScheduledExecutorService pool = Executors.newScheduledThreadPool(2);

        // 大量定时任务堆积在 DelayedWorkQueue 中
        for (int i = 0; i < 1_000_000; i++) {
            pool.schedule(() -> {
                // 模拟任务
            }, 1, TimeUnit.HOURS); // 1 小时后执行,但任务对象立刻占用内存
        }
        // 100 万个任务对象堆积在内存中,可能导致 OOM
    }
}

各 Executors 工厂方法风险汇总

工厂方法队列类型最大线程数核心风险
newFixedThreadPoolLinkedBlockingQueue(无界)固定队列堆积 -> OOM
newSingleThreadExecutorLinkedBlockingQueue(无界)1队列堆积 -> OOM
newCachedThreadPoolSynchronousQueueInteger.MAX_VALUE线程数爆炸 -> OOM
newScheduledThreadPoolDelayedWorkQueue(无界)Integer.MAX_VALUE队列堆积 + 线程爆炸

⚠️ 正确做法:始终使用 new ThreadPoolExecutor(...) 手动创建,明确指定每个参数。


3.2 ThreadPoolExecutor 七大参数详解

用一个餐厅厨房的比喻来理解七大参数:

想象一个餐厅厨房:有正式厨师、临时帮工、等位区和客满策略。

┌──────────────────────────────────────────────────────────┐
│                    餐厅厨房(线程池)                       │
│                                                          │
│  [正式厨师 1] [正式厨师 2] [正式厨师 3]   <- corePoolSize   │
│                                                          │
│  [临时帮工 1] [临时帮工 2]               <- 额外线程        │
│  (正式 + 临时 = maximumPoolSize)                         │
│                                                          │
│  临时帮工空闲超过 keepAliveTime 就辞退                      │
│                                                          │
│  ┌─等位区(workQueue)──────────────┐                     │
│  │ [订单4] [订单5] [订单6] [订单7]  │  <- 有界队列         │
│  └─────────────────────────────────┘                     │
│                                                          │
│  客满策略(handler):拒绝 / 调用者自己做 / 丢弃最旧的       │
│                                                          │
│  厨师培训标准(threadFactory):统一命名、设置优先级          │
└──────────────────────────────────────────────────────────┘

七大参数逐一解析

package com.example.concurrent.pool;

import java.util.concurrent.*;
import java.util.concurrent.atomic.AtomicInteger;

/**
 * ThreadPoolExecutor 七大参数详解
 */
public class ThreadPoolParametersDemo {

    public static void main(String[] args) {
        ThreadPoolExecutor executor = new ThreadPoolExecutor(
            // 1. corePoolSize = 3 ——「正式厨师」数量
            // 这些线程常驻,即使空闲也不会被销毁(除非设置 allowCoreThreadTimeOut)
            // 决定了线程池的常态处理能力
            3,

            // 2. maximumPoolSize = 6 ——「正式厨师 + 临时帮工」的总人数上限
            // 当核心线程忙不过来、且队列满了,才会创建临时线程
            // 注意:不是队列没满就创建!必须队列也满了才会扩容
            6,

            // 3. keepAliveTime = 60 ——临时帮工的最大空闲等待时间
            // 临时线程空闲超过这个时间就会被销毁,释放资源
            // 设太短:线程频繁创建销毁,性能损耗
            // 设太长:空闲线程占用资源
            60,

            // 4. unit = SECONDS ——keepAliveTime 的时间单位
            TimeUnit.SECONDS,

            // 5. workQueue = 容量为 100 的有界队列 ——「等位区」
            // ⚠️ 必须用有界队列!无界队列 = OOM 定时炸弹
            // 容量太小:拒绝策略频繁触发,任务大量丢失
            // 容量太大:内存占用高,任务排队时间长
            new ArrayBlockingQueue<>(100),

            // 6. threadFactory ——「厨师培训标准」
            // 自定义线程工厂,核心作用:给线程命名
            // 线上出问题时,线程名是排查问题的第一线索
            new ThreadFactory() {
                private final AtomicInteger counter = new AtomicInteger(1);

                @Override
                public Thread newThread(Runnable r) {
                    Thread thread = new Thread(r);
                    // 命名规范:业务名-pool-序号
                    thread.setName("order-process-pool-" + counter.getAndIncrement());
                    // 设为非守护线程,确保任务执行完毕
                    thread.setDaemon(false);
                    return thread;
                }
            },

            // 7. handler = CallerRunsPolicy ——「客满策略」
            // 当核心线程满、队列满、临时线程也满时,触发拒绝策略
            // CallerRunsPolicy:让调用者线程自己执行该任务
            // 好处:天然限流,调用方线程忙于执行任务,就不会继续提交新任务
            new ThreadPoolExecutor.CallerRunsPolicy()
        );

        // 提交任务
        for (int i = 0; i < 200; i++) {
            final int taskId = i;
            executor.submit(() -> {
                System.out.println(Thread.currentThread().getName()
                    + " 执行任务 " + taskId);
                try {
                    Thread.sleep(100);
                } catch (InterruptedException e) {
                    Thread.currentThread().interrupt();
                }
            });
        }

        executor.shutdown(); // 不再接受新任务,等待已提交任务执行完毕
    }
}

四种拒绝策略对比

拒绝策略行为适用场景
AbortPolicy(默认)抛出 RejectedExecutionException需要感知任务被拒绝的场景
CallerRunsPolicy调用者线程自己执行不允许丢弃任务 + 需要限流
DiscardPolicy静默丢弃可以容忍任务丢失(如日志上报)
DiscardOldestPolicy丢弃队列头部最旧任务只关心最新任务(如实时行情刷新)

3.3 任务提交流程

当调用 executor.execute(task)executor.submit(task) 时,ThreadPoolExecutor 内部执行以下流程:

                       ┌──────────────┐
                       │  提交新任务   │
                       └──────┬───────┘

                              v
                  ┌───────────────────────┐
                  │ 当前线程数 < coreSize? │
                  └───────┬───────┬───────┘
                     是   │       │ 否
                          v       v
                ┌──────────┐  ┌──────────────────┐
                │创建核心线程│  │ 工作队列未满?      │
                │执行任务   │  └───┬──────────┬───┘
                └──────────┘   是  │          │ 否
                                   v          v
                          ┌──────────┐  ┌──────────────────────┐
                          │任务入队列 │  │当前线程数<maxPoolSize?│
                          │等待执行   │  └───┬──────────┬───────┘
                          └──────────┘   是  │          │ 否
                                             v          v
                                   ┌──────────┐  ┌──────────┐
                                   │创建临时线程│  │执行拒绝策略│
                                   │执行任务   │  │(handler) │
                                   └──────────┘  └──────────┘

⚠️ 关键易错点:很多人以为「核心线程满了就创建临时线程」——错! 正确顺序是:核心线程满 -> 任务进队列 -> 队列满 -> 这时候才创建临时线程。 所以如果你的队列容量设了 Integer.MAX_VALUE,临时线程永远不会被创建。


3.4 企业级线程池配置

3.4.1 线程数设置公式

任务类型公式解释
CPU 密集型线程数 = CPU 核心数 + 1加 1 是为了在某个线程因为偶发页缺失暂停时,额外的线程能顶上
IO 密集型线程数 = CPU 核心数 * 2 * (1 + IO等待时间/CPU计算时间)IO 等待时间越长,需要越多线程来利用 CPU
混合型拆分为 CPU 密集和 IO 密集两个线程池避免互相影响

⚠️ 公式只是起点,最终线程数必须通过压测确定。 不同机器、不同负载、不同业务逻辑,最优值差异巨大。

3.4.2 Spring Boot 中的企业级线程池配置

package com.example.concurrent.config;

import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor;

import java.util.concurrent.*;
import java.util.concurrent.atomic.AtomicInteger;

/**
 * 企业级线程池配置
 *
 * 核心原则:
 * 1. 不同业务使用不同线程池——隔离故障域
 * 2. 必须自定义线程名——排查问题的生命线
 * 3. 必须用有界队列——防止 OOM
 * 4. 必须配置拒绝策略——明确过载时的行为
 */
@Configuration
public class ThreadPoolConfig {

    /**
     * 订单处理线程池——IO 密集型
     * 主要操作:数据库读写、RPC 调用
     */
    @Bean("orderProcessExecutor")
    public ThreadPoolTaskExecutor orderProcessExecutor() {
        ThreadPoolTaskExecutor executor = new ThreadPoolTaskExecutor();

        // 获取 CPU 核心数作为计算基准
        int cpuCores = Runtime.getRuntime().availableProcessors();

        // IO 密集型:核心线程数 = CPU 核心数 * 2
        // 因为 IO 等待时线程会让出 CPU,所以需要更多线程来保持 CPU 繁忙
        executor.setCorePoolSize(cpuCores * 2);

        // 最大线程数 = 核心线程数 * 2,预留弹性空间应对突发流量
        executor.setMaxPoolSize(cpuCores * 4);

        // 有界队列容量——根据业务峰值评估,不能太大也不能太小
        // 太大:内存占用高,任务排队延迟大
        // 太小:拒绝策略频繁触发
        executor.setQueueCapacity(500);

        // 临时线程空闲 60 秒后回收
        executor.setKeepAliveSeconds(60);

        // ⚠️ 核心:给线程命名,线上出问题时能快速定位是哪个业务的线程池
        executor.setThreadNamePrefix("order-process-");

        // 拒绝策略:由调用者线程执行,天然限流
        executor.setRejectedExecutionHandler(
            new ThreadPoolExecutor.CallerRunsPolicy()
        );

        // 等待所有任务完成后再关闭——优雅停机
        executor.setWaitForTasksToCompleteOnShutdown(true);

        // 最多等待 60 秒,超时强制关闭——防止停机时间过长
        executor.setAwaitTerminationSeconds(60);

        executor.initialize();
        return executor;
    }

    /**
     * 报表导出线程池——CPU 密集型
     * 主要操作:数据计算、Excel 生成
     */
    @Bean("reportExportExecutor")
    public ThreadPoolTaskExecutor reportExportExecutor() {
        ThreadPoolTaskExecutor executor = new ThreadPoolTaskExecutor();

        int cpuCores = Runtime.getRuntime().availableProcessors();

        // CPU 密集型:核心线程数 = CPU 核心数 + 1
        // 过多线程反而因为上下文切换降低性能
        executor.setCorePoolSize(cpuCores + 1);
        executor.setMaxPoolSize(cpuCores + 1); // CPU 密集型不需要太多额外线程
        executor.setQueueCapacity(200);
        executor.setKeepAliveSeconds(60);
        executor.setThreadNamePrefix("report-export-");
        executor.setRejectedExecutionHandler(
            new ThreadPoolExecutor.CallerRunsPolicy()
        );
        executor.setWaitForTasksToCompleteOnShutdown(true);
        executor.setAwaitTerminationSeconds(120); // 报表导出可能耗时较长

        executor.initialize();
        return executor;
    }

    /**
     * 消息推送线程池——IO 密集型
     * 主要操作:调用短信网关、邮件服务、App 推送接口
     */
    @Bean("messagePushExecutor")
    public ThreadPoolTaskExecutor messagePushExecutor() {
        ThreadPoolTaskExecutor executor = new ThreadPoolTaskExecutor();

        int cpuCores = Runtime.getRuntime().availableProcessors();

        executor.setCorePoolSize(cpuCores * 2);
        executor.setMaxPoolSize(cpuCores * 4);
        executor.setQueueCapacity(1000); // 消息推送场景队列可以大一些
        executor.setKeepAliveSeconds(60);
        executor.setThreadNamePrefix("msg-push-");

        // 消息推送场景:使用 AbortPolicy,拒绝时抛异常,
        // 由上层捕获后写入消息补偿表,确保不丢消息
        executor.setRejectedExecutionHandler(
            new ThreadPoolExecutor.AbortPolicy()
        );

        executor.setWaitForTasksToCompleteOnShutdown(true);
        executor.setAwaitTerminationSeconds(30);

        executor.initialize();
        return executor;
    }
}

3.4.3 自定义线程工厂(不使用 Spring 时)

package com.example.concurrent.pool;

import java.util.concurrent.ThreadFactory;
import java.util.concurrent.atomic.AtomicInteger;

/**
 * 自定义线程工厂——企业开发必备
 *
 * 为什么需要?
 * 1. 默认线程名是 pool-N-thread-M,线上出问题完全看不出是哪个业务
 * 2. 需要统一设置守护线程、优先级、异常处理器等
 */
public class NamedThreadFactory implements ThreadFactory {

    // 线程池编号——区分同一业务的多个线程池实例
    private static final AtomicInteger POOL_COUNTER = new AtomicInteger(1);

    // 线程编号——区分同一线程池中的不同线程
    private final AtomicInteger threadCounter = new AtomicInteger(1);

    // 线程名前缀——标识业务含义
    private final String namePrefix;

    // 是否为守护线程
    private final boolean daemon;

    // 未捕获异常处理器——防止线程因为异常静默死亡
    private final Thread.UncaughtExceptionHandler exceptionHandler;

    public NamedThreadFactory(String businessName) {
        this(businessName, false);
    }

    public NamedThreadFactory(String businessName, boolean daemon) {
        // 命名规范:业务名-pool-N-thread-M
        this.namePrefix = businessName + "-pool-"
            + POOL_COUNTER.getAndIncrement() + "-thread-";
        this.daemon = daemon;
        // 默认异常处理:打印日志,实际项目中应接入告警系统
        this.exceptionHandler = (thread, throwable) -> {
            System.err.println("线程 " + thread.getName()
                + " 发生未捕获异常:" + throwable.getMessage());
            throwable.printStackTrace();
            // 实际项目中应该:
            // 1. 记录到日志系统
            // 2. 发送告警(钉钉/企微/邮件)
            // 3. 上报监控平台(Prometheus/Grafana)
        };
    }

    @Override
    public Thread newThread(Runnable r) {
        Thread thread = new Thread(r);
        thread.setName(namePrefix + threadCounter.getAndIncrement());
        thread.setDaemon(daemon);
        // 设置未捕获异常处理器——这是最后一道防线
        thread.setUncaughtExceptionHandler(exceptionHandler);
        return thread;
    }
}

3.4.4 优雅关闭线程池

package com.example.concurrent.pool;

import java.util.List;
import java.util.concurrent.*;

/**
 * 线程池优雅关闭——生产环境必须处理
 *
 * 为什么不能直接 shutdownNow()?
 * 因为正在执行的任务会被中断,队列中等待的任务会被丢弃
 * 可能导致:订单处理到一半被中断、数据不一致
 */
public class GracefulShutdownDemo {

    /**
     * 推荐的优雅关闭流程
     */
    public static void gracefulShutdown(ExecutorService executor,
                                         long timeout,
                                         TimeUnit unit) {
        // 第一步:停止接受新任务
        // shutdown() 不会中断正在执行的任务,只是不再接受新提交
        executor.shutdown();

        try {
            // 第二步:等待已提交任务完成
            // 给一个合理的等待时间(如 60 秒)
            if (!executor.awaitTermination(timeout, unit)) {
                // 第三步:等待超时,强制关闭
                // shutdownNow() 会中断正在执行的任务,返回队列中未执行的任务
                List<Runnable> droppedTasks = executor.shutdownNow();

                // 记录被丢弃的任务,用于后续补偿处理
                System.err.println("线程池强制关闭,丢弃 "
                    + droppedTasks.size() + " 个未执行任务");

                // 实际项目中:将未执行的任务写入数据库/消息队列,后续补偿
                // droppedTasks.forEach(task -> compensationService.save(task));

                // 再次等待一小段时间
                if (!executor.awaitTermination(10, TimeUnit.SECONDS)) {
                    System.err.println("线程池未能在规定时间内关闭");
                }
            }
        } catch (InterruptedException e) {
            // 当前线程被中断,立刻强制关闭
            executor.shutdownNow();
            // 恢复中断标志
            Thread.currentThread().interrupt();
        }
    }

    /**
     * 注册 JVM 关闭钩子——确保应用退出时线程池能优雅关闭
     */
    public static void registerShutdownHook(ExecutorService executor,
                                              String poolName) {
        Runtime.getRuntime().addShutdownHook(new Thread(() -> {
            System.out.println("JVM 正在关闭,开始优雅关闭线程池:" + poolName);
            gracefulShutdown(executor, 60, TimeUnit.SECONDS);
            System.out.println("线程池 " + poolName + " 已关闭");
        }, poolName + "-shutdown-hook"));
    }
}

3.4.5 线程池监控

package com.example.concurrent.pool;

import java.util.concurrent.*;

/**
 * 线程池监控——生产环境不可或缺
 *
 * 为什么要监控?
 * 1. 队列积压过多 -> 任务延迟增大 -> 用户体验下降
 * 2. 活跃线程持续打满 -> 说明线程数不够或下游太慢
 * 3. 拒绝次数增多 -> 说明需要扩容或限流
 */
public class ThreadPoolMonitor {

    /**
     * 打印线程池核心指标
     * 实际项目中应该将这些指标暴露给 Prometheus 等监控系统
     */
    public static void printMetrics(ThreadPoolExecutor executor,
                                     String poolName) {
        System.out.println("========== " + poolName + " 线程池指标 ==========");

        // 核心线程数——线程池的基准处理能力
        System.out.println("核心线程数: " + executor.getCorePoolSize());

        // 最大线程数——线程池的峰值处理能力
        System.out.println("最大线程数: " + executor.getMaximumPoolSize());

        // 当前线程数——实际存活的线程数
        System.out.println("当前线程数: " + executor.getPoolSize());

        // 活跃线程数——正在执行任务的线程数
        // 如果长期接近最大线程数,说明需要扩容
        System.out.println("活跃线程数: " + executor.getActiveCount());

        // 队列中等待的任务数——反映系统负载
        // 如果持续增长,说明消费速度跟不上生产速度
        System.out.println("队列等待任务数: " + executor.getQueue().size());

        // 队列剩余容量
        System.out.println("队列剩余容量: "
            + executor.getQueue().remainingCapacity());

        // 已完成任务总数
        System.out.println("已完成任务总数: " + executor.getCompletedTaskCount());

        // 历史最大线程数——反映峰值负载
        System.out.println("历史最大线程数: " + executor.getLargestPoolSize());

        System.out.println("==============================================");
    }

    /**
     * 定时监控线程池(可接入 Prometheus/Grafana)
     */
    public static ScheduledExecutorService startMonitor(
            ThreadPoolExecutor executor, String poolName) {
        ScheduledExecutorService monitor = Executors.newSingleThreadScheduledExecutor(
            r -> new Thread(r, poolName + "-monitor")
        );

        // 每 10 秒打印一次指标
        monitor.scheduleAtFixedRate(
            () -> printMetrics(executor, poolName),
            0, 10, TimeUnit.SECONDS
        );

        return monitor;
    }
}

3.5 动态线程池

传统线程池的参数在启动后就固定了,如果线上发现参数不合理,必须修改代码 + 重新发布。 动态线程池允许在运行时修改核心参数,实现秒级生效。

核心原理

package com.example.concurrent.pool;

import java.util.concurrent.*;

/**
 * 动态线程池的核心原理
 *
 * ThreadPoolExecutor 本身就提供了动态修改参数的 API:
 * - setCorePoolSize()
 * - setMaximumPoolSize()
 * - setKeepAliveTime()
 *
 * 我们只需要把这些 API 与配置中心(Nacos/Apollo)打通即可
 */
public class DynamicThreadPoolDemo {

    /**
     * 运行时修改线程池参数
     */
    public static void resizeThreadPool(ThreadPoolExecutor executor,
                                         int newCoreSize,
                                         int newMaxSize,
                                         int newQueueCapacity) {
        // ⚠️ 必须先设 maxPoolSize 再设 corePoolSize
        // 如果 newCoreSize > 当前 maxPoolSize,会抛 IllegalArgumentException
        if (newMaxSize >= newCoreSize) {
            executor.setMaximumPoolSize(newMaxSize);
            executor.setCorePoolSize(newCoreSize);
        }

        // ⚠️ 注意:队列容量不能直接修改(ArrayBlockingQueue 是 final 的)
        // 如果需要动态修改队列容量,需要使用 ResizableCapacityLinkedBlockingQueue
        // 这是开源动态线程池框架(如 Hippo4j、DynamicTp)的常用做法
        System.out.println("线程池参数已更新 -> "
            + "core=" + newCoreSize
            + ", max=" + newMaxSize);
    }

    /**
     * 模拟从配置中心获取配置并动态更新
     * 实际项目中使用 @NacosConfigListener 或 Apollo 的 ConfigChangeListener
     */
    public static void main(String[] args) throws InterruptedException {
        ThreadPoolExecutor executor = new ThreadPoolExecutor(
            4, 8, 60, TimeUnit.SECONDS,
            new LinkedBlockingQueue<>(200),
            new NamedThreadFactory("dynamic-demo"),
            new ThreadPoolExecutor.CallerRunsPolicy()
        );

        // 模拟运行一段时间后,运维发现线程数不够
        System.out.println("初始配置 -> core=" + executor.getCorePoolSize()
            + ", max=" + executor.getMaximumPoolSize());

        Thread.sleep(2000);

        // 动态扩容——无需重启应用
        resizeThreadPool(executor, 8, 16, 500);

        // 验证修改生效
        System.out.println("更新后配置 -> core=" + executor.getCorePoolSize()
            + ", max=" + executor.getMaximumPoolSize());

        executor.shutdown();
    }
}

⚠️ 生产建议:不要自己造轮子,推荐使用成熟的开源框架:

  • Hippo4j(美团开源)
  • DynamicTp(京东开源) 它们提供了配置中心集成、监控报警、运行时调参等完整能力。

四、锁的实战选型指南

4.1 synchronized — 内置锁

package com.example.concurrent.lock;

import java.util.HashMap;
import java.util.Map;

/**
 * synchronized 是 Java 内置的锁机制
 *
 * JDK 6 之后做了大量优化(偏向锁、轻量级锁、自适应自旋)
 * 在低竞争场景下,性能已经不输 ReentrantLock
 *
 * 什么时候用 synchronized 就够了?
 * 1. 不需要尝试加锁(tryLock)
 * 2. 不需要可中断加锁
 * 3. 不需要公平锁
 * 4. 不需要多个条件变量(Condition)
 * 5. 简单的互斥场景
 */
public class SynchronizedDemo {

    // 场景:简单的本地缓存
    private final Map<String, Object> cache = new HashMap<>();

    /**
     * 方式一:synchronized 方法
     * 锁对象是 this(当前实例)
     * 适用于整个方法都需要同步的场景
     */
    public synchronized Object getFromCache(String key) {
        return cache.get(key);
    }

    /**
     * 方式二:synchronized 代码块
     * 锁对象可以自由指定,粒度更细
     * 推荐使用——只锁需要同步的部分,减少锁持有时间
     */
    public Object getOrLoad(String key) {
        // 先无锁检查,大部分情况下缓存命中,无需加锁
        Object value = cache.get(key);
        if (value != null) {
            return value;
        }

        // 缓存未命中,加锁后再次检查(double-check)
        synchronized (this) {
            // ⚠️ 必须再次检查!
            // 因为在等待锁的过程中,可能其他线程已经加载了数据
            value = cache.get(key);
            if (value != null) {
                return value;
            }

            // 执行加载逻辑
            value = loadFromDatabase(key);
            cache.put(key, value);
            return value;
        }
    }

    /**
     * 方式三:锁定专门的锁对象
     * 推荐用 private final 的对象作为锁——防止外部代码意外锁定同一对象
     */
    private final Object writeLock = new Object();

    public void updateCache(String key, Object value) {
        synchronized (writeLock) {
            // 使用专门的锁对象而非 this
            // 原因:如果外部代码也用 synchronized(this) 锁了同一个实例
            // 就会产生意外的锁竞争
            cache.put(key, value);
        }
    }

    private Object loadFromDatabase(String key) {
        // 模拟数据库查询
        return "value-of-" + key;
    }
}

4.2 ReentrantLock — 可重入锁

package com.example.concurrent.lock;

import java.util.concurrent.TimeUnit;
import java.util.concurrent.locks.Condition;
import java.util.concurrent.locks.ReentrantLock;

/**
 * ReentrantLock 相比 synchronized 的四大优势:
 * 1. 可尝试加锁(tryLock)——避免死等
 * 2. 可中断加锁(lockInterruptibly)——响应中断
 * 3. 支持公平锁——防止线程饥饿
 * 4. 支持多条件变量(Condition)——更精确的等待/通知
 */
public class ReentrantLockDemo {

    // 非公平锁(默认)——吞吐量更高,但可能导致某些线程长时间拿不到锁
    private final ReentrantLock unfairLock = new ReentrantLock();

    // 公平锁——保证先请求的线程先获得锁,但吞吐量略低
    private final ReentrantLock fairLock = new ReentrantLock(true);

    /**
     * 优势一:tryLock —— 带超时的尝试加锁
     *
     * 场景:分布式系统中,不能无限等待锁
     * 比如:抢购场景,获取锁超时就直接返回"库存不足"
     */
    public boolean tryDeductStock(String productId, int quantity) {
        // 尝试获取锁,最多等待 500 毫秒
        // 如果 500ms 内没拿到锁,直接返回 false——不要死等
        try {
            if (unfairLock.tryLock(500, TimeUnit.MILLISECONDS)) {
                try {
                    // 获取到锁,执行扣减库存逻辑
                    System.out.println("扣减库存成功:" + productId);
                    return true;
                } finally {
                    // ⚠️ unlock 必须放在 finally 中!
                    // synchronized 自动释放锁,ReentrantLock 必须手动释放
                    // 如果忘记 unlock,将导致永久死锁
                    unfairLock.unlock();
                }
            } else {
                // 获取锁超时,快速失败
                System.out.println("库存扣减繁忙,请重试");
                return false;
            }
        } catch (InterruptedException e) {
            Thread.currentThread().interrupt();
            return false;
        }
    }

    /**
     * 优势二:lockInterruptibly —— 可中断加锁
     *
     * 场景:用户取消了操作,线程应该立即停止等待锁
     * 如果用 synchronized,线程会一直阻塞在锁上,无法响应中断
     */
    public void interruptibleOperation() {
        try {
            // 可中断地等待锁——如果线程被中断,立即抛出 InterruptedException
            unfairLock.lockInterruptibly();
            try {
                // 执行业务逻辑
                System.out.println("执行可中断操作");
            } finally {
                unfairLock.unlock();
            }
        } catch (InterruptedException e) {
            // 线程被中断,可以做清理工作
            System.out.println("操作被中断,正在清理...");
            Thread.currentThread().interrupt();
        }
    }

    /**
     * 优势三:多条件变量(Condition)
     *
     * 场景:实现一个有界阻塞队列
     * synchronized 只有一个等待集(wait/notify),无法区分"队列满"和"队列空"
     * Condition 可以创建多个等待集,实现更精确的通知
     */
    private final ReentrantLock queueLock = new ReentrantLock();
    // 队列非空条件——消费者等待此条件
    private final Condition notEmpty = queueLock.newCondition();
    // 队列非满条件——生产者等待此条件
    private final Condition notFull = queueLock.newCondition();

    private final Object[] items = new Object[100];
    private int count = 0;

    public void put(Object item) throws InterruptedException {
        queueLock.lock();
        try {
            // 队列满时,生产者等待在 notFull 条件上
            while (count == items.length) {
                notFull.await(); // 只等待"队列非满"这一个条件
            }
            items[count++] = item;
            // 放入元素后,通知消费者:队列不空了
            notEmpty.signal(); // 精确唤醒等待"队列非空"的消费者线程
        } finally {
            queueLock.unlock();
        }
    }

    public Object take() throws InterruptedException {
        queueLock.lock();
        try {
            // 队列空时,消费者等待在 notEmpty 条件上
            while (count == 0) {
                notEmpty.await(); // 只等待"队列非空"这一个条件
            }
            Object item = items[--count];
            // 取出元素后,通知生产者:队列不满了
            notFull.signal(); // 精确唤醒等待"队列非满"的生产者线程
            return item;
        } finally {
            queueLock.unlock();
        }
    }
}

4.3 ReadWriteLock — 读写锁

package com.example.concurrent.lock;

import java.util.HashMap;
import java.util.Map;
import java.util.concurrent.locks.ReadWriteLock;
import java.util.concurrent.locks.ReentrantReadWriteLock;

/**
 * ReadWriteLock 适用场景:读多写少
 *
 * 核心规则:
 * - 读-读 不互斥——多个线程可以同时读
 * - 读-写 互斥——有线程在读时,写线程必须等待
 * - 写-写 互斥——同一时刻只能有一个线程在写
 *
 * 典型场景:配置信息缓存、权限数据缓存等读远多于写的场景
 */
public class ReadWriteLockCacheDemo {

    // 读写锁
    private final ReadWriteLock rwLock = new ReentrantReadWriteLock();

    // 缓存容器——不需要用 ConcurrentHashMap,因为读写锁已经保证了线程安全
    private final Map<String, Object> cache = new HashMap<>();

    /**
     * 读操作——使用读锁
     * 多个线程可以同时获取读锁,并发读取
     */
    public Object get(String key) {
        rwLock.readLock().lock(); // 获取读锁
        try {
            return cache.get(key);
        } finally {
            rwLock.readLock().unlock(); // 释放读锁
        }
    }

    /**
     * 写操作——使用写锁
     * 写锁是排他的,同一时刻只有一个线程能写
     */
    public void put(String key, Object value) {
        rwLock.writeLock().lock(); // 获取写锁
        try {
            cache.put(key, value);
        } finally {
            rwLock.writeLock().unlock(); // 释放写锁
        }
    }

    /**
     * 锁降级模式:写锁 -> 读锁
     *
     * 场景:先更新缓存,然后在同一个原子操作中读取更新后的值
     * 这样可以保证读到的一定是自己刚写入的值,不会被其他写线程覆盖
     */
    public Object putAndGet(String key, Object value) {
        rwLock.writeLock().lock(); // 第一步:获取写锁
        try {
            cache.put(key, value);

            // 第二步:在持有写锁的同时获取读锁——这就是锁降级
            rwLock.readLock().lock();
        } finally {
            // 第三步:释放写锁,此时仍然持有读锁
            rwLock.writeLock().unlock();
        }

        // 此时只持有读锁,其他读线程可以并发访问
        try {
            return cache.get(key); // 保证读到的是自己刚写入的值
        } finally {
            rwLock.readLock().unlock(); // 释放读锁
        }
    }
}

⚠️ 注意:读写锁不支持锁升级(读锁 -> 写锁),只支持锁降级(写锁 -> 读锁)。 如果在持有读锁时尝试获取写锁,会导致永久阻塞(死锁)


4.4 StampedLock — 邮戳锁

package com.example.concurrent.lock;

import java.util.concurrent.locks.StampedLock;

/**
 * StampedLock(JDK 8 引入)
 *
 * 核心优势:乐观读——读操作不加锁!
 * 在读多写少且写操作非常少的场景下,性能比 ReadWriteLock 更好
 *
 * ⚠️ 限制:
 * 1. 不可重入——同一线程不能重复获取锁
 * 2. 不支持 Condition
 * 3. 使用复杂,容易写出 bug
 * 4. 如果写操作不是特别少,反而可能比 ReadWriteLock 更慢
 */
public class StampedLockDemo {

    private final StampedLock stampedLock = new StampedLock();

    private double x, y; // 坐标点——模拟共享数据

    /**
     * 写操作——和普通排他锁一样
     */
    public void move(double deltaX, double deltaY) {
        // writeLock() 返回一个 stamp(邮戳),解锁时需要传入
        long stamp = stampedLock.writeLock();
        try {
            x += deltaX;
            y += deltaY;
        } finally {
            stampedLock.unlockWrite(stamp); // 解锁时必须传入获取锁时返回的 stamp
        }
    }

    /**
     * 乐观读——StampedLock 的核心特性
     *
     * 流程:
     * 1. 先尝试乐观读(不加锁,只获取一个版本号)
     * 2. 读取共享数据
     * 3. 用版本号验证期间是否发生了写操作
     * 4. 如果没有写操作,直接使用读取的数据(零开销!)
     * 5. 如果发生了写操作,升级为悲观读锁重新读取
     */
    public double distanceFromOrigin() {
        // 第一步:尝试乐观读,获取版本号(stamp)
        // 这一步不加锁,所以零开销
        long stamp = stampedLock.tryOptimisticRead();

        // 第二步:读取共享数据到局部变量
        // ⚠️ 此时没有加锁,数据可能被写线程修改
        double currentX = x;
        double currentY = y;

        // 第三步:验证版本号
        // validate 返回 true 表示在读取期间没有写操作发生
        if (!stampedLock.validate(stamp)) {
            // 第四步:验证失败,说明有写操作干扰,升级为悲观读锁
            stamp = stampedLock.readLock();
            try {
                // 重新读取数据——此时已加锁,数据一定是一致的
                currentX = x;
                currentY = y;
            } finally {
                stampedLock.unlockRead(stamp);
            }
        }

        // 使用读取到的数据进行计算
        return Math.sqrt(currentX * currentX + currentY * currentY);
    }
}

4.5 锁选型对比

维度synchronizedReentrantLockReadWriteLockStampedLock
实现层级JVM 内置JDK APIJDK APIJDK API
可重入
公平锁支持支持
可中断支持支持支持
tryLock支持支持支持
条件变量1 个(wait/notify)多个(Condition)不常用不支持
读写分离是(含乐观读)
性能(低竞争)高(偏向锁优化)较高最高
使用复杂度
适用场景简单互斥需要高级特性读多写少读极多写极少
推荐程度首选需要时用合适场景用谨慎使用

选型建议:能用 synchronized 就用 synchronized(简单不易出错)。 只有当你需要 tryLock、可中断、公平锁、多 Condition 等高级特性时,才选 ReentrantLock


五、并发工具类实战

5.1 CountDownLatch — 倒计时门闩

package com.example.concurrent.tools;

import java.util.concurrent.*;
import java.util.Map;
import java.util.HashMap;

/**
 * CountDownLatch:等待多个任务全部完成后再继续
 *
 * 核心方法:
 * - countDown():计数器减 1
 * - await():阻塞等待计数器归零
 *
 * 真实场景:微服务接口聚合——等待所有下游服务返回后再组装响应
 *
 * ⚠️ 注意:CountDownLatch 是一次性的,计数器归零后不能重置
 *    如果需要循环使用,请用 CyclicBarrier
 */
public class CountDownLatchDemo {

    /**
     * 场景:用户详情页需要聚合 4 个微服务的数据
     */
    public Map<String, Object> getUserDetail(Long userId) throws Exception {
        Map<String, Object> result = new ConcurrentHashMap<>();

        // 计数器设为 4——需要等待 4 个服务全部返回
        CountDownLatch latch = new CountDownLatch(4);

        ExecutorService executor = Executors.newFixedThreadPool(4);

        // 异步调用用户基础信息服务
        executor.submit(() -> {
            try {
                result.put("userInfo", callUserService(userId));
            } catch (Exception e) {
                // 即使某个服务调用失败,也必须 countDown
                // 否则 await 会永远阻塞
                result.put("userInfo", "获取失败");
            } finally {
                // ⚠️ countDown 必须在 finally 中!
                // 无论成功失败都要减计数,否则主线程永远等不到
                latch.countDown();
            }
        });

        // 异步调用订单服务
        executor.submit(() -> {
            try {
                result.put("orders", callOrderService(userId));
            } catch (Exception e) {
                result.put("orders", "获取失败");
            } finally {
                latch.countDown();
            }
        });

        // 异步调用积分服务
        executor.submit(() -> {
            try {
                result.put("points", callPointsService(userId));
            } catch (Exception e) {
                result.put("points", "获取失败");
            } finally {
                latch.countDown();
            }
        });

        // 异步调用优惠券服务
        executor.submit(() -> {
            try {
                result.put("coupons", callCouponService(userId));
            } catch (Exception e) {
                result.put("coupons", "获取失败");
            } finally {
                latch.countDown();
            }
        });

        // 主线程等待所有服务返回,最多等 5 秒
        // ⚠️ 必须设置超时!否则某个服务挂了,主线程永远阻塞
        boolean completed = latch.await(5, TimeUnit.SECONDS);
        if (!completed) {
            // 超时处理:记录日志,返回已获取的部分数据
            System.err.println("部分服务调用超时,返回降级数据");
        }

        executor.shutdown();
        return result;
    }

    // 模拟微服务调用
    private Object callUserService(Long userId) throws Exception {
        Thread.sleep(200);
        return "用户信息";
    }

    private Object callOrderService(Long userId) throws Exception {
        Thread.sleep(300);
        return "订单列表";
    }

    private Object callPointsService(Long userId) throws Exception {
        Thread.sleep(150);
        return "积分信息";
    }

    private Object callCouponService(Long userId) throws Exception {
        Thread.sleep(250);
        return "优惠券列表";
    }
}

5.2 CyclicBarrier — 循环栅栏

package com.example.concurrent.tools;

import java.util.concurrent.*;
import java.util.List;
import java.util.ArrayList;

/**
 * CyclicBarrier:让多个线程在屏障点同步汇合
 *
 * 与 CountDownLatch 的区别:
 * - CountDownLatch 是一次性的,CyclicBarrier 可以重复使用(reset)
 * - CountDownLatch 是一个线程等多个线程,CyclicBarrier 是多个线程互相等
 * - CyclicBarrier 可以在所有线程到达屏障后执行一个汇总动作(barrierAction)
 *
 * 真实场景:多线程分片统计,每个线程统计一部分数据,
 *          所有线程统计完成后在屏障点汇总结果
 */
public class CyclicBarrierDemo {

    /**
     * 场景:多线程分片统计销售额报表
     * 每个线程统计一个地区的数据,全部完成后汇总
     */
    public void generateSalesReport() throws Exception {
        // 分 3 个线程统计 3 个地区
        int regionCount = 3;

        // 存放各地区统计结果
        List<Double> regionResults =
            new CopyOnWriteArrayList<>(); // 线程安全的 List

        // 创建 CyclicBarrier,第二个参数是屏障动作——所有线程到达后自动执行
        CyclicBarrier barrier = new CyclicBarrier(regionCount, () -> {
            // 这个动作由最后一个到达屏障的线程执行
            // 汇总所有地区的销售额
            double totalSales = regionResults.stream()
                .mapToDouble(Double::doubleValue)
                .sum();
            System.out.println("全国销售总额汇总完成:" + totalSales + " 万元");
        });

        ExecutorService executor = Executors.newFixedThreadPool(regionCount);

        String[] regions = {"华东", "华南", "华北"};

        for (String region : regions) {
            executor.submit(() -> {
                try {
                    // 模拟各地区统计耗时不同
                    double sales = calculateRegionSales(region);
                    regionResults.add(sales);

                    System.out.println(region + " 地区统计完成:" + sales + " 万元");

                    // 到达屏障点,等待其他线程
                    // 当所有线程都调用 await() 后,屏障打开,执行屏障动作
                    barrier.await(10, TimeUnit.SECONDS);

                    // 屏障打开后,所有线程继续执行后续逻辑
                    System.out.println(region + " 地区开始生成详细报表...");

                } catch (InterruptedException | BrokenBarrierException e) {
                    Thread.currentThread().interrupt();
                    System.err.println(region + " 地区统计异常");
                } catch (TimeoutException e) {
                    System.err.println(region + " 地区统计超时");
                }
            });
        }

        executor.shutdown();
        executor.awaitTermination(30, TimeUnit.SECONDS);
    }

    private double calculateRegionSales(String region) throws Exception {
        // 模拟不同地区计算耗时不同
        Thread.sleep((long) (Math.random() * 2000));
        return Math.random() * 1000;
    }
}

5.3 Semaphore — 信号量

package com.example.concurrent.tools;

import java.util.concurrent.*;

/**
 * Semaphore:控制同时访问某个资源的线程数量
 *
 * 核心思想:维护一组"许可证",线程获取许可后才能执行,执行完归还许可
 *
 * 真实场景:
 * 1. 数据库连接池限制并发连接数
 * 2. 接口限流——同一时刻最多 N 个请求
 * 3. 资源池管理
 */
public class SemaphoreDemo {

    /**
     * 场景一:接口限流
     * 限制对第三方 API 的并发调用数,防止被限流或打垮下游
     */
    public static class ThirdPartyApiLimiter {

        // 最多同时 10 个线程调用第三方 API
        // true = 公平模式,先来先服务,防止线程饥饿
        private final Semaphore semaphore = new Semaphore(10, true);

        public String callThirdPartyApi(String request) throws Exception {
            // 尝试获取许可,最多等待 3 秒
            // 如果 3 秒内没有空闲许可,说明并发太高,快速失败
            if (!semaphore.tryAcquire(3, TimeUnit.SECONDS)) {
                throw new RuntimeException("第三方 API 调用繁忙,请稍后重试");
            }

            try {
                // 获取到许可,执行 API 调用
                // 此时最多只有 10 个线程同时执行这段代码
                return doCallApi(request);
            } finally {
                // ⚠️ 必须在 finally 中释放许可!
                // 否则许可会越来越少,最终所有线程都无法获取许可
                semaphore.release();
            }
        }

        private String doCallApi(String request) throws Exception {
            Thread.sleep(200); // 模拟 API 调用耗时
            return "API 响应:" + request;
        }
    }

    /**
     * 场景二:简易数据库连接池
     */
    public static class SimpleConnectionPool {

        // 用 Semaphore 控制最大连接数
        private final Semaphore semaphore;
        // 用阻塞队列存放空闲连接
        private final BlockingQueue<Object> pool;

        public SimpleConnectionPool(int maxConnections) {
            this.semaphore = new Semaphore(maxConnections);
            this.pool = new LinkedBlockingQueue<>(maxConnections);

            // 初始化连接池
            for (int i = 0; i < maxConnections; i++) {
                pool.offer(createConnection(i));
            }
        }

        public Object getConnection(long timeout, TimeUnit unit)
                throws Exception {
            // 先获取许可——控制并发连接数
            if (!semaphore.tryAcquire(timeout, unit)) {
                throw new RuntimeException("获取数据库连接超时");
            }

            // 获取许可后,从池中取出一个连接
            // 因为许可数 = 连接数,所以 poll 不会返回 null
            return pool.poll();
        }

        public void releaseConnection(Object connection) {
            // 归还连接到池中
            pool.offer(connection);
            // 释放许可——允许其他线程获取连接
            semaphore.release();
        }

        private Object createConnection(int id) {
            return "Connection-" + id;
        }
    }
}

5.4 CompletableFuture — 异步编排利器(重点)

5.4.1 基础异步操作

package com.example.concurrent.future;

import java.util.concurrent.*;

/**
 * CompletableFuture(JDK 8)是 Java 异步编程的终极武器
 *
 * 相比 Future 的优势:
 * 1. 链式调用——不需要嵌套 get()
 * 2. 组合多个异步任务——allOf / anyOf
 * 3. 异常处理——exceptionally / handle
 * 4. 回调机制——不需要阻塞等待
 */
public class CompletableFutureBasicDemo {

    // ⚠️ 核心:一定要自定义线程池!不要用默认的 ForkJoinPool.commonPool()
    // 原因:
    // 1. commonPool 的线程数 = CPU 核心数 - 1,IO 密集型任务会不够用
    // 2. 所有没指定线程池的 CompletableFuture 共享 commonPool,互相影响
    // 3. commonPool 中的线程是守护线程,JVM 退出时任务可能还没执行完
    private static final ExecutorService ASYNC_POOL = new ThreadPoolExecutor(
        8, 16, 60, TimeUnit.SECONDS,
        new ArrayBlockingQueue<>(500),
        r -> {
            Thread t = new Thread(r);
            t.setName("async-biz-" + t.getId());
            t.setDaemon(false); // 非守护线程,确保任务执行完成
            return t;
        },
        new ThreadPoolExecutor.CallerRunsPolicy()
    );

    public static void main(String[] args) throws Exception {

        // 方式一:supplyAsync —— 有返回值的异步任务
        CompletableFuture<String> future1 = CompletableFuture.supplyAsync(
            () -> {
                // 这段代码在 ASYNC_POOL 的线程中执行
                return queryUserInfo(1001L);
            },
            ASYNC_POOL // ⚠️ 必须传入自定义线程池
        );

        // 方式二:runAsync —— 无返回值的异步任务
        CompletableFuture<Void> future2 = CompletableFuture.runAsync(
            () -> {
                // 无返回值的异步操作,如发送通知
                sendNotification("订单创建成功");
            },
            ASYNC_POOL
        );

        // 获取结果(带超时)
        String userInfo = future1.get(3, TimeUnit.SECONDS);
        System.out.println("用户信息:" + userInfo);

        ASYNC_POOL.shutdown();
    }

    private static String queryUserInfo(Long userId) {
        try { Thread.sleep(200); } catch (InterruptedException e) {
            Thread.currentThread().interrupt();
        }
        return "User-" + userId;
    }

    private static void sendNotification(String message) {
        try { Thread.sleep(100); } catch (InterruptedException e) {
            Thread.currentThread().interrupt();
        }
        System.out.println("通知已发送:" + message);
    }
}

5.4.2 链式调用

package com.example.concurrent.future;

import java.util.concurrent.*;

/**
 * CompletableFuture 链式调用——告别回调地狱
 */
public class CompletableFutureChainDemo {

    private static final ExecutorService POOL = new ThreadPoolExecutor(
        4, 8, 60, TimeUnit.SECONDS,
        new ArrayBlockingQueue<>(200),
        r -> new Thread(r, "chain-demo-" + Thread.currentThread().getId()),
        new ThreadPoolExecutor.CallerRunsPolicy()
    );

    public static void main(String[] args) throws Exception {

        // ============ thenApply:同步转换结果(map 操作) ============
        // 场景:查询用户 -> 获取用户等级 -> 计算折扣
        CompletableFuture<Double> discountFuture = CompletableFuture
            .supplyAsync(() -> queryUserId("张三"), POOL)  // 第一步:查询用户ID
            .thenApply(userId -> queryUserLevel(userId))    // 第二步:查询等级
            .thenApply(level -> calculateDiscount(level));  // 第三步:计算折扣

        System.out.println("折扣率:" + discountFuture.get());

        // ============ thenCompose:异步扁平化(flatMap 操作) ============
        // 当下一步操作本身也返回 CompletableFuture 时使用
        // 如果用 thenApply 会得到 CompletableFuture<CompletableFuture<T>>
        CompletableFuture<String> orderFuture = CompletableFuture
            .supplyAsync(() -> queryUserId("张三"), POOL)
            .thenCompose(userId -> queryUserOrdersAsync(userId));
            // thenCompose 会自动"摊平"嵌套的 CompletableFuture

        System.out.println("订单信息:" + orderFuture.get());

        // ============ thenCombine:合并两个独立任务的结果 ============
        // 场景:同时查询商品价格和用户折扣,然后计算最终价格
        CompletableFuture<Double> priceFuture = CompletableFuture
            .supplyAsync(() -> queryProductPrice("SKU-001"), POOL);

        CompletableFuture<Double> userDiscountFuture = CompletableFuture
            .supplyAsync(() -> queryUserDiscount(1001L), POOL);

        CompletableFuture<Double> finalPriceFuture = priceFuture
            .thenCombine(userDiscountFuture, (price, discount) -> {
                // 两个任务都完成后,合并结果
                return price * discount;
            });

        System.out.println("最终价格:" + finalPriceFuture.get());

        // ============ thenAccept:消费结果(不返回新值) ============
        CompletableFuture.supplyAsync(() -> queryUserId("张三"), POOL)
            .thenAccept(userId -> {
                // 消费结果,如记录日志、发送通知
                System.out.println("查询到用户ID:" + userId);
            });

        // ============ thenRun:前一步完成后执行(不关心前一步的结果) ============
        CompletableFuture.supplyAsync(() -> queryUserId("张三"), POOL)
            .thenRun(() -> {
                // 不需要前一步的结果,只是在它完成后触发
                System.out.println("用户查询完毕,清理临时数据...");
            });

        Thread.sleep(1000);
        POOL.shutdown();
    }

    // 模拟业务方法
    private static Long queryUserId(String name) {
        sleep(100);
        return 1001L;
    }

    private static Integer queryUserLevel(Long userId) {
        sleep(100);
        return 5; // VIP 5
    }

    private static Double calculateDiscount(Integer level) {
        return level >= 5 ? 0.8 : 0.95;
    }

    private static CompletableFuture<String> queryUserOrdersAsync(Long userId) {
        // 返回 CompletableFuture——这就是为什么用 thenCompose 而不是 thenApply
        return CompletableFuture.supplyAsync(
            () -> "用户 " + userId + " 的订单列表",
            POOL
        );
    }

    private static Double queryProductPrice(String sku) {
        sleep(150);
        return 299.0;
    }

    private static Double queryUserDiscount(Long userId) {
        sleep(100);
        return 0.85;
    }

    private static void sleep(long ms) {
        try { Thread.sleep(ms); } catch (InterruptedException e) {
            Thread.currentThread().interrupt();
        }
    }
}

5.4.3 异常处理

package com.example.concurrent.future;

import java.util.concurrent.*;

/**
 * CompletableFuture 异常处理
 *
 * ⚠️ 最大的坑:CompletableFuture 会「吞掉」异常!
 * 如果你不调用 get()、join() 或注册异常处理器,异常会静默消失
 * 线程池中的线程不会抛异常到控制台,你甚至不知道任务失败了
 */
public class CompletableFutureExceptionDemo {

    private static final ExecutorService POOL = Executors.newFixedThreadPool(4);

    public static void main(String[] args) throws Exception {

        // ============ exceptionally:捕获异常并提供降级值 ============
        // 类似 try-catch,只处理异常情况
        CompletableFuture<String> future1 = CompletableFuture
            .supplyAsync(() -> {
                // 模拟:调用用户服务可能失败
                if (Math.random() > 0.5) {
                    throw new RuntimeException("用户服务不可用");
                }
                return "用户信息";
            }, POOL)
            .exceptionally(throwable -> {
                // 捕获上游的异常,返回降级数据
                // throwable.getCause() 才是真正的异常(外层包了 CompletionException)
                System.err.println("降级处理:" + throwable.getMessage());
                return "默认用户信息"; // 降级值
            });

        System.out.println("结果:" + future1.get());

        // ============ handle:同时处理正常结果和异常 ============
        // 类似 try-catch-finally,无论成功失败都会执行
        CompletableFuture<String> future2 = CompletableFuture
            .supplyAsync(() -> {
                if (Math.random() > 0.5) {
                    throw new RuntimeException("订单服务不可用");
                }
                return "订单信息";
            }, POOL)
            .handle((result, throwable) -> {
                if (throwable != null) {
                    // 处理异常
                    System.err.println("异常:" + throwable.getMessage());
                    return "降级订单信息";
                }
                // 处理正常结果
                return "处理后的 " + result;
            });

        System.out.println("结果:" + future2.get());

        // ============ whenComplete:监听完成事件(不改变结果) ============
        // 适用于:记录日志、发送通知等副作用操作
        CompletableFuture<String> future3 = CompletableFuture
            .supplyAsync(() -> "数据", POOL)
            .whenComplete((result, throwable) -> {
                // ⚠️ whenComplete 不能改变结果,只是观察
                if (throwable != null) {
                    System.err.println("任务失败:" + throwable.getMessage());
                    // 可以在这里记录日志、发送告警
                } else {
                    System.out.println("任务成功:" + result);
                }
            });

        System.out.println("结果:" + future3.get());

        POOL.shutdown();
    }
}

5.4.4 多任务编排

package com.example.concurrent.future;

import java.util.concurrent.*;
import java.util.Map;
import java.util.HashMap;

/**
 * CompletableFuture 多任务编排——企业级实战
 *
 * 场景:用户详情页接口聚合
 * 需要并行调用:用户服务、订单服务、优惠券服务、积分服务、收货地址服务
 */
public class CompletableFutureOrchestrationDemo {

    // ⚠️ 企业级必须自定义线程池
    private static final ExecutorService BIZ_POOL = new ThreadPoolExecutor(
        10, 20, 60, TimeUnit.SECONDS,
        new ArrayBlockingQueue<>(500),
        r -> {
            Thread t = new Thread(r);
            t.setName("biz-async-" + t.getId());
            return t;
        },
        new ThreadPoolExecutor.CallerRunsPolicy()
    );

    /**
     * allOf:等待所有任务完成
     * 适用于:所有数据都需要才能返回的场景
     */
    public static Map<String, Object> getUserDetailWithAllOf(Long userId)
            throws Exception {

        Map<String, Object> result = new ConcurrentHashMap<>();

        // 创建 5 个异步任务
        CompletableFuture<Void> userInfoFuture = CompletableFuture
            .supplyAsync(() -> queryUserInfo(userId), BIZ_POOL)
            .thenAccept(data -> result.put("userInfo", data))
            .exceptionally(e -> {
                result.put("userInfo", "获取失败"); // 单个服务失败不影响整体
                return null;
            });

        CompletableFuture<Void> ordersFuture = CompletableFuture
            .supplyAsync(() -> queryOrders(userId), BIZ_POOL)
            .thenAccept(data -> result.put("orders", data))
            .exceptionally(e -> {
                result.put("orders", "获取失败");
                return null;
            });

        CompletableFuture<Void> couponsFuture = CompletableFuture
            .supplyAsync(() -> queryCoupons(userId), BIZ_POOL)
            .thenAccept(data -> result.put("coupons", data))
            .exceptionally(e -> {
                result.put("coupons", "获取失败");
                return null;
            });

        CompletableFuture<Void> pointsFuture = CompletableFuture
            .supplyAsync(() -> queryPoints(userId), BIZ_POOL)
            .thenAccept(data -> result.put("points", data))
            .exceptionally(e -> {
                result.put("points", "获取失败");
                return null;
            });

        CompletableFuture<Void> addressesFuture = CompletableFuture
            .supplyAsync(() -> queryAddresses(userId), BIZ_POOL)
            .thenAccept(data -> result.put("addresses", data))
            .exceptionally(e -> {
                result.put("addresses", "获取失败");
                return null;
            });

        // allOf:等待所有任务完成
        CompletableFuture.allOf(
            userInfoFuture, ordersFuture, couponsFuture,
            pointsFuture, addressesFuture
        ).get(5, TimeUnit.SECONDS); // ⚠️ 必须设置超时

        return result;
    }

    /**
     * anyOf:任意一个任务完成就返回
     * 适用于:多个备选方案,取最快返回的那个
     * 场景:多机房部署,同时请求多个机房,取最快的响应
     */
    public static String getDataFromFastestNode(String key) throws Exception {
        CompletableFuture<String> node1 = CompletableFuture
            .supplyAsync(() -> queryFromNode("北京机房", key), BIZ_POOL);

        CompletableFuture<String> node2 = CompletableFuture
            .supplyAsync(() -> queryFromNode("上海机房", key), BIZ_POOL);

        CompletableFuture<String> node3 = CompletableFuture
            .supplyAsync(() -> queryFromNode("广州机房", key), BIZ_POOL);

        // anyOf:只要有一个完成就返回
        Object result = CompletableFuture
            .anyOf(node1, node2, node3)
            .get(3, TimeUnit.SECONDS);

        return (String) result;
    }

    // ===== 模拟服务调用 =====

    private static String queryUserInfo(Long userId) {
        sleep(200);
        return "用户基础信息";
    }

    private static String queryOrders(Long userId) {
        sleep(300);
        return "订单列表";
    }

    private static String queryCoupons(Long userId) {
        sleep(250);
        return "优惠券列表";
    }

    private static String queryPoints(Long userId) {
        sleep(150);
        return "积分信息";
    }

    private static String queryAddresses(Long userId) {
        sleep(180);
        return "收货地址列表";
    }

    private static String queryFromNode(String nodeName, String key) {
        long delay = (long) (Math.random() * 500); // 随机延迟模拟不同机房网络
        sleep(delay);
        return nodeName + " 返回数据:" + key;
    }

    private static void sleep(long ms) {
        try { Thread.sleep(ms); } catch (InterruptedException e) {
            Thread.currentThread().interrupt();
        }
    }

    public static void main(String[] args) throws Exception {
        // 测试 allOf
        long start = System.currentTimeMillis();
        Map<String, Object> detail = getUserDetailWithAllOf(1001L);
        System.out.println("allOf 耗时:" + (System.currentTimeMillis() - start)
            + "ms");
        System.out.println("聚合结果:" + detail);

        // 测试 anyOf
        start = System.currentTimeMillis();
        String data = getDataFromFastestNode("product-001");
        System.out.println("anyOf 耗时:" + (System.currentTimeMillis() - start)
            + "ms");
        System.out.println("最快节点返回:" + data);

        BIZ_POOL.shutdown();
    }
}

5.4.5 CompletableFuture 常见陷阱

⚠️ 陷阱一:使用默认的 ForkJoinPool.commonPool()

// ❌ 错误写法:不传线程池,默认使用 ForkJoinPool.commonPool()
CompletableFuture.supplyAsync(() -> queryDatabase());

// ✅ 正确写法:传入自定义线程池
CompletableFuture.supplyAsync(() -> queryDatabase(), customPool);

为什么默认线程池是坑?

  1. commonPool 线程数 = CPU核心数 - 1,IO 密集型任务会严重不够用
  2. 整个 JVM 所有使用默认线程池的 CompletableFuture 共享同一个池,互相影响
  3. commonPool 中的线程是守护线程,main 方法结束时任务可能还没执行完

⚠️ 陷阱二:异常被吞掉

// ❌ 错误写法:异常被静默吞掉,没有任何错误信息
CompletableFuture.supplyAsync(() -> {
    throw new RuntimeException("业务异常");
}, pool);
// 如果不调用 get() 或 join(),这个异常永远不会被发现!

// ✅ 正确写法:必须注册异常处理
CompletableFuture.supplyAsync(() -> {
    throw new RuntimeException("业务异常");
}, pool).exceptionally(e -> {
    log.error("异步任务异常", e); // 记录日志
    return defaultValue;           // 返回降级值
});

⚠️ 陷阱三:thenApply vs thenApplyAsync 的线程切换

// thenApply:在上一步完成的线程中执行(可能是线程池线程)
future.thenApply(result -> process(result));

// thenApplyAsync:在指定线程池中执行
// 当后续操作也是耗时操作时,应该使用 Async 版本,避免占用上一步的线程
future.thenApplyAsync(result -> heavyProcess(result), pool);

六、并发容器选型

6.1 ConcurrentHashMap

package com.example.concurrent.container;

import java.util.concurrent.*;
import java.util.Collections;
import java.util.HashMap;
import java.util.Hashtable;
import java.util.Map;

/**
 * ConcurrentHashMap 选型指南
 *
 * 为什么不用 Hashtable?
 * - Hashtable 用一把大锁锁住整个表,并发性能极差
 *
 * 为什么不用 Collections.synchronizedMap()?
 * - 同样是一把大锁,只是语法糖,本质和 Hashtable 一样
 *
 * ConcurrentHashMap 的优势:
 * - JDK 7:分段锁(Segment),并发度 = 段数
 * - JDK 8+:CAS + synchronized 锁单个桶,并发度 = 桶数(更细粒度)
 */
public class ConcurrentHashMapDemo {

    /**
     * ⚠️ 最常见的陷阱:复合操作不是原子的!
     *
     * 即使 ConcurrentHashMap 的单个方法是线程安全的,
     * 但「先检查再操作」(check-then-act)组合起来不是原子的
     */
    public static class UnsafeUsage {
        private final ConcurrentHashMap<String, Integer> counter =
            new ConcurrentHashMap<>();

        // ❌ 错误写法:先 get 再 put 不是原子的
        public void unsafeIncrement(String key) {
            Integer value = counter.get(key);  // 线程 A 读到 10
            // ⚠️ 此处其他线程可能修改了值!   // 线程 B 也读到 10
            if (value == null) {
                counter.put(key, 1);            // 线程 A 写入 11
            } else {
                counter.put(key, value + 1);    // 线程 B 也写入 11(应该是 12)
            }
        }

        // ✅ 正确写法一:使用原子方法
        public void safeIncrement(String key) {
            // merge 是原子操作:如果 key 不存在则放入 1,存在则执行合并函数
            counter.merge(key, 1, Integer::sum);
        }

        // ✅ 正确写法二:使用 compute
        public void safeIncrementV2(String key) {
            // compute 也是原子操作:根据 key 和旧值计算新值
            counter.compute(key, (k, v) -> v == null ? 1 : v + 1);
        }
    }
}

三种线程安全 Map 对比

维度HashtablesynchronizedMapConcurrentHashMap
锁粒度整个表一把锁整个表一把锁单个桶级别
允许 null key/value取决于底层 Map
迭代器fail-fastfail-fast弱一致性(不抛异常)
并发性能优秀
适用场景遗留代码不推荐推荐

6.2 CopyOnWriteArrayList

package com.example.concurrent.container;

import java.util.List;
import java.util.concurrent.CopyOnWriteArrayList;

/**
 * CopyOnWriteArrayList:写时复制列表
 *
 * 核心原理:
 * - 读操作:直接读底层数组,无需加锁(零开销)
 * - 写操作:复制一份新数组,在新数组上修改,然后替换引用
 *
 * 适用场景:读远多于写(如监听器列表、黑名单列表、配置列表)
 *
 * ⚠️ 不适用场景:
 * - 写操作频繁(每次写都要复制整个数组,性能极差)
 * - 列表很大(复制开销大,内存占用翻倍)
 */
public class CopyOnWriteArrayListDemo {

    /**
     * 典型场景:事件监听器管理
     * 注册/取消注册操作极少(写少),事件触发通知所有监听器很频繁(读多)
     */
    public static class EventBus {

        // 使用 CopyOnWriteArrayList 存储监听器
        // 读操作(遍历通知)远多于写操作(注册/取消注册)
        private final List<EventListener> listeners =
            new CopyOnWriteArrayList<>();

        // 注册监听器(写操作,偶尔发生)
        public void register(EventListener listener) {
            listeners.add(listener); // 底层会复制数组,但注册操作不频繁
        }

        // 取消注册(写操作,偶尔发生)
        public void unregister(EventListener listener) {
            listeners.remove(listener);
        }

        // 触发事件通知(读操作,频繁发生)
        public void fireEvent(String event) {
            // ⚠️ 遍历时不需要加锁!即使其他线程正在注册新的监听器
            // 因为遍历的是当时的快照,不会被修改
            for (EventListener listener : listeners) {
                try {
                    listener.onEvent(event);
                } catch (Exception e) {
                    // 一个监听器异常不应该影响其他监听器
                    System.err.println("监听器处理异常:" + e.getMessage());
                }
            }
        }
    }

    // 监听器接口
    public interface EventListener {
        void onEvent(String event);
    }
}

6.3 BlockingQueue 阻塞队列家族

package com.example.concurrent.container;

import java.util.concurrent.*;

/**
 * BlockingQueue 家族——生产者-消费者模式的基础设施
 *
 * 核心特性:
 * - 队列满时,put() 会阻塞生产者
 * - 队列空时,take() 会阻塞消费者
 * 这样就不需要手动写 wait/notify 了
 */
public class BlockingQueueDemo {

    /**
     * 场景:订单消息队列——生产者-消费者模式
     */
    public static class OrderMessageQueue {

        // 使用有界阻塞队列,防止生产者速度过快导致 OOM
        private final BlockingQueue<String> queue;

        public OrderMessageQueue(int capacity) {
            // ArrayBlockingQueue:数组实现,有界,一把锁(生产和消费共享)
            this.queue = new ArrayBlockingQueue<>(capacity);
        }

        /**
         * 生产者:投递订单消息
         */
        public void produce(String orderMessage) throws InterruptedException {
            // put() 会在队列满时阻塞,直到有空间
            // 比 offer() 安全——offer() 队列满时返回 false,消息可能丢失
            queue.put(orderMessage);
            System.out.println(Thread.currentThread().getName()
                + " 投递消息:" + orderMessage
                + ",队列大小:" + queue.size());
        }

        /**
         * 消费者:消费订单消息
         */
        public void consume() throws InterruptedException {
            // take() 会在队列空时阻塞,直到有新消息
            String message = queue.take();
            System.out.println(Thread.currentThread().getName()
                + " 消费消息:" + message
                + ",队列剩余:" + queue.size());
            // 处理消息...
        }

        /**
         * 带超时的消费——避免永久阻塞
         */
        public String consumeWithTimeout(long timeout, TimeUnit unit)
                throws InterruptedException {
            // poll(timeout) 在指定时间内等待,超时返回 null
            return queue.poll(timeout, unit);
        }
    }

    public static void main(String[] args) throws Exception {
        OrderMessageQueue mq = new OrderMessageQueue(10);

        // 启动 2 个生产者线程
        for (int i = 0; i < 2; i++) {
            final int producerId = i;
            new Thread(() -> {
                try {
                    for (int j = 0; j < 20; j++) {
                        mq.produce("订单-" + producerId + "-" + j);
                        Thread.sleep(50); // 模拟生产间隔
                    }
                } catch (InterruptedException e) {
                    Thread.currentThread().interrupt();
                }
            }, "producer-" + i).start();
        }

        // 启动 3 个消费者线程
        for (int i = 0; i < 3; i++) {
            new Thread(() -> {
                try {
                    while (!Thread.currentThread().isInterrupted()) {
                        mq.consume();
                        Thread.sleep(100); // 模拟消费耗时
                    }
                } catch (InterruptedException e) {
                    Thread.currentThread().interrupt();
                }
            }, "consumer-" + i).start();
        }
    }
}

BlockingQueue 家族对比

队列类型底层结构有界/无界锁机制适用场景
ArrayBlockingQueue数组有界一把锁(生产消费共享)通用场景,内存可控
LinkedBlockingQueue链表可选有界(默认无界)两把锁(生产消费分离)吞吐量要求高
SynchronousQueue无缓冲0 容量无锁/CAS直接传递,如 CachedThreadPool
PriorityBlockingQueue无界一把锁需要优先级排序的场景
DelayQueue无界一把锁延迟任务(如订单超时关闭)

⚠️ 生产建议

  • 绝大多数场景用 ArrayBlockingQueue(有界、简单)
  • 需要更高吞吐量时用 LinkedBlockingQueue(记得设容量!)
  • SynchronousQueue 只在特殊场景使用(如线程池的 CachedThreadPool)

七、常见陷阱与避坑指南

7.1 死锁的四个条件及排查方法

package com.example.concurrent.pitfall;

/**
 * 陷阱一:死锁
 *
 * 死锁的四个必要条件:
 * 1. 互斥——资源一次只能被一个线程占用
 * 2. 持有并等待——持有一个资源的同时等待另一个资源
 * 3. 不可剥夺——已获取的资源不能被其他线程强制释放
 * 4. 循环等待——线程之间形成环形等待链
 *
 * 只要破坏任意一个条件就能避免死锁
 */
public class DeadlockDemo {

    private static final Object LOCK_A = new Object();
    private static final Object LOCK_B = new Object();

    /**
     * ❌ 会死锁的代码
     */
    public static void deadlockExample() {
        // 线程 1:先锁 A 再锁 B
        Thread t1 = new Thread(() -> {
            synchronized (LOCK_A) {
                System.out.println("线程1 获取了锁A");
                try { Thread.sleep(100); } catch (InterruptedException e) {
                    Thread.currentThread().interrupt();
                }
                // 此时线程2 已经持有锁B,线程1 等待锁B -> 死锁
                synchronized (LOCK_B) {
                    System.out.println("线程1 获取了锁B");
                }
            }
        }, "thread-1");

        // 线程 2:先锁 B 再锁 A(顺序相反 = 循环等待)
        Thread t2 = new Thread(() -> {
            synchronized (LOCK_B) {
                System.out.println("线程2 获取了锁B");
                try { Thread.sleep(100); } catch (InterruptedException e) {
                    Thread.currentThread().interrupt();
                }
                // 此时线程1 已经持有锁A,线程2 等待锁A -> 死锁
                synchronized (LOCK_A) {
                    System.out.println("线程2 获取了锁A");
                }
            }
        }, "thread-2");

        t1.start();
        t2.start();
    }

    /**
     * ✅ 修复方案:统一加锁顺序(破坏循环等待条件)
     * 所有线程都按 A -> B 的顺序加锁
     */
    public static void fixedExample() {
        Thread t1 = new Thread(() -> {
            synchronized (LOCK_A) { // 先锁 A
                synchronized (LOCK_B) { // 再锁 B
                    System.out.println("线程1 安全获取了两把锁");
                }
            }
        }, "thread-1");

        Thread t2 = new Thread(() -> {
            synchronized (LOCK_A) { // 也是先锁 A(顺序一致)
                synchronized (LOCK_B) { // 再锁 B
                    System.out.println("线程2 安全获取了两把锁");
                }
            }
        }, "thread-2");

        t1.start();
        t2.start();
    }
}

排查死锁的方法

# 方法一:使用 jstack 导出线程堆栈
# 1. 找到 Java 进程 PID
jps -l

# 2. 导出线程堆栈
jstack <PID> > thread_dump.txt

# 3. 在输出中搜索 "deadlock"
# jstack 会自动检测死锁并输出如下信息:
# Found one Java-level deadlock:
# =============================
# "thread-2":
#   waiting to lock monitor 0x00007f8b3c003898 (object 0x..., a java.lang.Object),
#   which is held by "thread-1"
# "thread-1":
#   waiting to lock monitor 0x00007f8b3c003948 (object 0x..., a java.lang.Object),
#   which is held by "thread-2"

# 方法二:使用 Arthas(推荐)
# thread -b  # 查找阻塞其他线程的线程

7.2 线程安全的假象:ConcurrentHashMap 的复合操作

package com.example.concurrent.pitfall;

import java.util.concurrent.ConcurrentHashMap;

/**
 * 陷阱二:ConcurrentHashMap 的复合操作不是线程安全的
 *
 * ConcurrentHashMap 保证的是单个方法的线程安全
 * 但是「先读后写」这种复合操作不是原子的
 */
public class CompoundOperationPitfall {

    private final ConcurrentHashMap<String, Integer> map =
        new ConcurrentHashMap<>();

    /**
     * ❌ 错误写法:先 get 再 put 不是原子的
     */
    public void unsafeIncrement(String key) {
        Integer oldValue = map.get(key);
        // ⚠️ 这里存在竞态条件!
        // 两个线程可能同时读到同一个 oldValue
        // 然后各自 +1 写回,结果只加了 1 而不是 2
        if (oldValue == null) {
            map.put(key, 1);
        } else {
            map.put(key, oldValue + 1);
        }
    }

    /**
     * ✅ 正确写法一:使用 ConcurrentHashMap 的原子方法
     */
    public void safeIncrementV1(String key) {
        // merge 方法是原子的:key 不存在时放入 1,存在时执行 Integer::sum
        map.merge(key, 1, Integer::sum);
    }

    /**
     * ✅ 正确写法二:使用 compute
     */
    public void safeIncrementV2(String key) {
        // compute 方法也是原子的
        map.compute(key, (k, v) -> v == null ? 1 : v + 1);
    }

    /**
     * ✅ 正确写法三:putIfAbsent + replace(CAS 风格)
     */
    public void safeIncrementV3(String key) {
        // putIfAbsent:只有 key 不存在时才放入
        map.putIfAbsent(key, 0);
        // 自旋 CAS
        while (true) {
            Integer oldValue = map.get(key);
            // replace(key, oldValue, newValue) 是原子的
            // 只有当前值等于 oldValue 时才替换成 newValue
            if (map.replace(key, oldValue, oldValue + 1)) {
                break; // 替换成功,退出循环
            }
            // 替换失败说明被其他线程修改了,重新读取并重试
        }
    }
}

7.3 线程池异常丢失

package com.example.concurrent.pitfall;

import java.util.concurrent.*;

/**
 * 陷阱三:线程池中的异常被静默吞掉
 *
 * 根源:
 * - execute() 提交的任务:异常会导致线程终止,可以被 UncaughtExceptionHandler 捕获
 * - submit() 提交的任务:异常被封装在 Future 中,只有调用 get() 时才会抛出
 *                       如果不调用 get(),异常永远不会被发现
 */
public class ThreadPoolExceptionPitfall {

    public static void main(String[] args) throws Exception {
        ExecutorService pool = Executors.newFixedThreadPool(2);

        // ❌ submit() 方式:异常被吞掉,控制台无任何输出
        pool.submit(() -> {
            System.out.println("submit 任务开始执行");
            throw new RuntimeException("submit 任务异常");
            // 这个异常被封装在 Future 中,永远不会被发现
        });

        Thread.sleep(1000);
        System.out.println("主线程继续执行——没有看到任何异常信息!");

        // ✅ 修复方式一:使用 Future.get() 获取异常
        Future<?> future = pool.submit(() -> {
            throw new RuntimeException("submit 任务异常");
        });
        try {
            future.get(); // 这里会抛出 ExecutionException
        } catch (ExecutionException e) {
            System.err.println("捕获到异常:" + e.getCause().getMessage());
        }

        // ✅ 修复方式二:使用 execute() 而非 submit()(针对不需要返回值的任务)
        // execute() 提交的任务,异常会传播到 UncaughtExceptionHandler

        // ✅ 修复方式三:在任务内部 try-catch
        pool.submit(() -> {
            try {
                throw new RuntimeException("业务异常");
            } catch (Exception e) {
                // 在任务内部处理异常——最可靠的方式
                System.err.println("任务内部捕获异常:" + e.getMessage());
                // 记录日志、发送告警等
            }
        });

        Thread.sleep(500);
        pool.shutdown();
    }
}

7.4 SimpleDateFormat 线程不安全

package com.example.concurrent.pitfall;

import java.text.SimpleDateFormat;
import java.time.LocalDateTime;
import java.time.format.DateTimeFormatter;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;

/**
 * 陷阱四:SimpleDateFormat 是线程不安全的
 *
 * 原因:SimpleDateFormat 内部有一个 Calendar 实例变量
 *       多线程同时调用 format/parse 时会相互覆盖 Calendar 的状态
 *       导致:格式化结果错误、NumberFormatException、ArrayIndexOutOfBoundsException
 */
public class SimpleDateFormatPitfall {

    // ❌ 错误写法:多个线程共享同一个 SimpleDateFormat
    private static final SimpleDateFormat UNSAFE_SDF =
        new SimpleDateFormat("yyyy-MM-dd HH:mm:ss");

    // ✅ 修复方式一:使用 ThreadLocal(JDK 7 及以下推荐)
    private static final ThreadLocal<SimpleDateFormat> SAFE_SDF =
        ThreadLocal.withInitial(
            () -> new SimpleDateFormat("yyyy-MM-dd HH:mm:ss")
        );

    // ✅ 修复方式二:使用 DateTimeFormatter(JDK 8+ 强烈推荐)
    // DateTimeFormatter 是不可变的、线程安全的
    private static final DateTimeFormatter FORMATTER =
        DateTimeFormatter.ofPattern("yyyy-MM-dd HH:mm:ss");

    public static void main(String[] args) {
        ExecutorService pool = Executors.newFixedThreadPool(10);

        for (int i = 0; i < 100; i++) {
            pool.submit(() -> {
                // ❌ 这样会产生各种奇怪的异常或错误结果
                // String result = UNSAFE_SDF.format(new java.util.Date());

                // ✅ 方式一:ThreadLocal
                // String result = SAFE_SDF.get()
                //     .format(new java.util.Date());

                // ✅ 方式二:DateTimeFormatter(推荐)
                String result = LocalDateTime.now().format(FORMATTER);

                System.out.println(Thread.currentThread().getName()
                    + " -> " + result);
            });
        }

        pool.shutdown();
    }
}

7.5 Double-Checked Locking 必须配合 volatile

package com.example.concurrent.pitfall;

/**
 * 陷阱五:双重检查锁定(DCL)单例模式必须使用 volatile
 *
 * 原因:对象创建不是原子操作,分三步:
 * 1. 分配内存空间
 * 2. 初始化对象
 * 3. 将引用指向内存空间
 *
 * 由于 CPU 指令重排序,步骤 2 和 3 可能颠倒:
 * 1. 分配内存空间
 * 3. 将引用指向内存空间(此时对象还未初始化!)
 * 2. 初始化对象
 *
 * 此时另一个线程在第一次 null 检查时看到 instance 不为 null,
 * 直接返回了一个未初始化的对象——NPE 或数据不一致
 */
public class DoubleCheckedLockingPitfall {

    // ❌ 错误写法:没有 volatile,可能返回未初始化的对象
    // private static DoubleCheckedLockingPitfall instance;

    // ✅ 正确写法:volatile 禁止指令重排序
    // volatile 保证:引用赋值一定在对象初始化之后
    private static volatile DoubleCheckedLockingPitfall instance;

    private DoubleCheckedLockingPitfall() {
        // 私有构造函数
    }

    public static DoubleCheckedLockingPitfall getInstance() {
        // 第一次检查:避免每次都加锁(性能优化)
        if (instance == null) {
            synchronized (DoubleCheckedLockingPitfall.class) {
                // 第二次检查:防止多个线程同时通过第一次检查后重复创建
                if (instance == null) {
                    // ⚠️ 这一行不是原子操作!
                    // 没有 volatile 时,其他线程可能看到半初始化的对象
                    instance = new DoubleCheckedLockingPitfall();
                }
            }
        }
        return instance;
    }

    /**
     * ✅ 更推荐的方式:静态内部类单例(JVM 保证线程安全,无需 volatile/synchronized)
     */
    public static class BetterSingleton {
        private BetterSingleton() {}

        // 静态内部类在第一次被使用时才加载
        // 类加载过程由 JVM 保证线程安全(ClassLoader 加锁)
        private static class Holder {
            private static final BetterSingleton INSTANCE =
                new BetterSingleton();
        }

        public static BetterSingleton getInstance() {
            return Holder.INSTANCE; // 触发 Holder 类加载
        }
    }
}

7.6 线程池参数配置不当导致 OOM

package com.example.concurrent.pitfall;

import java.util.concurrent.*;

/**
 * 陷阱六:线程池参数配置不当导致 OOM
 *
 * 常见错误:
 * 1. 使用无界队列(默认 LinkedBlockingQueue)
 * 2. maximumPoolSize 设置过大
 * 3. 忘记设置拒绝策略
 */
public class ThreadPoolOOMPitfall {

    /**
     * ❌ 错误配置:无界队列 + 大量慢任务 = OOM
     */
    public static void wrongConfig() {
        ThreadPoolExecutor pool = new ThreadPoolExecutor(
            2, 4, 60, TimeUnit.SECONDS,
            new LinkedBlockingQueue<>() // ⚠️ 默认容量 Integer.MAX_VALUE
        );

        // 大量慢任务提交后,全部堆积在队列中
        for (int i = 0; i < 1_000_000; i++) {
            pool.submit(() -> {
                try { Thread.sleep(10_000); } catch (InterruptedException e) {
                    Thread.currentThread().interrupt();
                }
            });
        }
        // 结果:队列中 100 万个 Runnable 对象 -> OOM
    }

    /**
     * ✅ 正确配置
     */
    public static void correctConfig() {
        ThreadPoolExecutor pool = new ThreadPoolExecutor(
            4,                              // 核心线程数:根据 CPU 核心数设置
            8,                              // 最大线程数:预留弹性
            60, TimeUnit.SECONDS,           // 空闲超时:60秒回收临时线程
            new ArrayBlockingQueue<>(200),  // ⚠️ 有界队列!容量根据业务评估
            r -> {
                Thread t = new Thread(r);
                t.setName("biz-pool-" + t.getId()); // 必须命名
                return t;
            },
            new ThreadPoolExecutor.CallerRunsPolicy() // 拒绝策略:调用者执行
        );

        // 即使提交 100 万个任务:
        // 1. 队列最多 200 个
        // 2. 超出后 CallerRunsPolicy 让调用者线程执行——天然限流
        // 3. 不会 OOM
    }
}

7.7 ThreadLocal 内存泄漏

package com.example.concurrent.pitfall;

import java.text.SimpleDateFormat;

/**
 * 陷阱七:ThreadLocal 内存泄漏
 *
 * 原因:
 * ThreadLocal 的实现是每个 Thread 内部有一个 ThreadLocalMap,
 * key 是 ThreadLocal 的弱引用,value 是强引用。
 *
 * 当 ThreadLocal 变量被回收后:
 * - key 变成 null(弱引用被 GC)
 * - 但 value 仍然被 Thread -> ThreadLocalMap -> Entry -> value 强引用
 * - 只要线程不终止,value 就不会被 GC -> 内存泄漏
 *
 * ⚠️ 在线程池中特别危险!
 * 因为线程池中的线程是复用的、长期存活的,ThreadLocal 的 value 永远不会被回收
 */
public class ThreadLocalLeakPitfall {

    // ❌ 错误用法:在线程池场景下不调用 remove()
    private static final ThreadLocal<SimpleDateFormat> DATE_FORMAT =
        ThreadLocal.withInitial(
            () -> new SimpleDateFormat("yyyy-MM-dd")
        );

    /**
     * ❌ 错误写法:使用完不清理
     */
    public void unsafeUsage() {
        SimpleDateFormat sdf = DATE_FORMAT.get();
        String result = sdf.format(new java.util.Date());
        // 用完就走了,没有调用 remove()
        // 线程池中的线程复用时,上一个任务的 value 还残留在 ThreadLocal 中
        // 可能导致:内存泄漏、数据串号(上一个请求的用户信息被下一个请求读到)
    }

    /**
     * ✅ 正确写法:用完必须 remove
     */
    public void safeUsage() {
        try {
            SimpleDateFormat sdf = DATE_FORMAT.get();
            String result = sdf.format(new java.util.Date());
            System.out.println(result);
        } finally {
            // ⚠️ 必须在 finally 中调用 remove()!
            // 清除当前线程的 ThreadLocal 值,防止内存泄漏和数据串号
            DATE_FORMAT.remove();
        }
    }

    /**
     * ✅ 更好的方案:不用 ThreadLocal,直接用线程安全的 DateTimeFormatter
     * JDK 8+ 的 DateTimeFormatter 是不可变的、线程安全的
     */
}

7.8 Spring @Async 默认线程池的坑

package com.example.concurrent.pitfall;

import org.springframework.scheduling.annotation.Async;
import org.springframework.stereotype.Service;

/**
 * 陷阱八:Spring @Async 默认线程池的坑
 *
 * ⚠️ Spring @Async 默认使用 SimpleAsyncTaskExecutor
 * 这个"线程池"其实根本不是线程池!它每次都创建一个新线程,不会复用!
 *
 * 风险:
 * 1. 无限制创建线程 -> 大量请求时线程数爆炸 -> OOM
 * 2. 没有队列 -> 无法缓冲突发流量
 * 3. 没有拒绝策略 -> 过载时无法限流
 */
@Service
public class SpringAsyncPitfall {

    /**
     * ❌ 错误用法:使用默认线程池
     */
    @Async  // 默认使用 SimpleAsyncTaskExecutor——不是真正的线程池!
    public void unsafeAsyncMethod() {
        // 每次调用都会创建一个新线程
        System.out.println("这个线程不会被复用:"
            + Thread.currentThread().getName());
    }

    /**
     * ✅ 正确用法:指定自定义线程池
     * 需要先在配置类中定义名为 "orderProcessExecutor" 的线程池 Bean
     * 见 3.4.2 节的 ThreadPoolConfig
     */
    @Async("orderProcessExecutor")  // 明确指定使用哪个线程池
    public void safeAsyncMethod() {
        System.out.println("使用自定义线程池:"
            + Thread.currentThread().getName());
    }
}

八、与 Spring 生态配合

8.1 @Async 正确使用方式

8.1.1 完整配置

package com.example.concurrent.spring;

import org.springframework.aop.interceptor.AsyncUncaughtExceptionHandler;
import org.springframework.context.annotation.Configuration;
import org.springframework.scheduling.annotation.AsyncConfigurer;
import org.springframework.scheduling.annotation.EnableAsync;
import org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor;

import java.lang.reflect.Method;
import java.util.concurrent.Executor;
import java.util.concurrent.ThreadPoolExecutor;

/**
 * @Async 完整配置
 *
 * 步骤:
 * 1. @EnableAsync 开启异步支持
 * 2. 实现 AsyncConfigurer 自定义默认线程池和异常处理器
 * 3. 定义业务专用线程池 Bean
 */
@Configuration
@EnableAsync // 开启 @Async 注解支持
public class AsyncConfig implements AsyncConfigurer {

    /**
     * 自定义默认异步线程池——替换掉 SimpleAsyncTaskExecutor
     * 当 @Async 不指定线程池名称时使用这个
     */
    @Override
    public Executor getAsyncExecutor() {
        ThreadPoolTaskExecutor executor = new ThreadPoolTaskExecutor();
        executor.setCorePoolSize(8);
        executor.setMaxPoolSize(16);
        executor.setQueueCapacity(500);
        executor.setKeepAliveSeconds(60);
        executor.setThreadNamePrefix("default-async-");
        executor.setRejectedExecutionHandler(
            new ThreadPoolExecutor.CallerRunsPolicy()
        );
        // ⚠️ 设置优雅关闭
        executor.setWaitForTasksToCompleteOnShutdown(true);
        executor.setAwaitTerminationSeconds(60);
        executor.initialize();
        return executor;
    }

    /**
     * 自定义异步异常处理器
     * 处理 @Async 方法中抛出的、没有被 Future 捕获的异常
     *
     * ⚠️ 注意:只对返回 void 的 @Async 方法生效
     * 返回 Future/CompletableFuture 的方法,异常会封装在 Future 中
     */
    @Override
    public AsyncUncaughtExceptionHandler getAsyncUncaughtExceptionHandler() {
        return new AsyncUncaughtExceptionHandler() {
            @Override
            public void handleUncaughtException(Throwable ex,
                                                  Method method,
                                                  Object... params) {
                // 记录异常日志
                System.err.println("@Async 方法异常:"
                    + method.getDeclaringClass().getName()
                    + "#" + method.getName());
                System.err.println("异常信息:" + ex.getMessage());
                // 实际项目中:发送告警通知
                // alertService.sendAlert("@Async异常", ex);
            }
        };
    }
}

8.1.2 @Async 方法的返回值

package com.example.concurrent.spring;

import org.springframework.scheduling.annotation.Async;
import org.springframework.scheduling.annotation.AsyncResult;
import org.springframework.stereotype.Service;

import java.util.concurrent.CompletableFuture;
import java.util.concurrent.Future;

/**
 * @Async 方法的三种返回值类型
 */
@Service
public class AsyncService {

    /**
     * 返回 void:发出去就不管了,"发射后遗忘"模式
     *
     * 适用场景:发送通知、记录日志等不需要等待结果的操作
     * ⚠️ 异常只能通过 AsyncUncaughtExceptionHandler 捕获
     */
    @Async("orderProcessExecutor")
    public void sendNotification(Long userId, String message) {
        // 异步发送通知,调用方不等待结果
        System.out.println("异步发送通知给用户:" + userId);
    }

    /**
     * 返回 Future<T>:可以获取异步结果
     *
     * 适用场景:需要获取异步执行结果的场景
     * ⚠️ 但 Future 的 API 比较简陋,推荐用 CompletableFuture
     */
    @Async("orderProcessExecutor")
    public Future<String> asyncQueryWithFuture(Long userId) {
        String result = "用户 " + userId + " 的信息";
        // 使用 AsyncResult 包装返回值
        return new AsyncResult<>(result);
    }

    /**
     * 返回 CompletableFuture<T>:推荐方式
     *
     * 适用场景:需要链式调用、组合多个异步结果
     * CompletableFuture 功能更强大,支持 thenApply、thenCompose 等操作
     */
    @Async("orderProcessExecutor")
    public CompletableFuture<String> asyncQueryWithCompletableFuture(
            Long userId) {
        try {
            Thread.sleep(200); // 模拟耗时操作
            String result = "用户 " + userId + " 的详细信息";
            // CompletableFuture.completedFuture 直接包装已完成的结果
            return CompletableFuture.completedFuture(result);
        } catch (InterruptedException e) {
            Thread.currentThread().interrupt();
            // 异常情况返回失败的 Future
            CompletableFuture<String> failedFuture = new CompletableFuture<>();
            failedFuture.completeExceptionally(e);
            return failedFuture;
        }
    }
}

8.1.3 @Async 使用注意事项

⚠️ 注意事项一:@Async 方法必须通过代理调用,同类内部调用无效。

@Service
public class OrderService {

    // ❌ 错误:同一个类中直接调用 @Async 方法,不走代理,不会异步执行
    public void createOrder() {
        sendOrderNotification(); // 这是 this.sendOrderNotification()
        // 直接调用 this 的方法,不经过 Spring AOP 代理
        // 所以 @Async 注解不会生效,还是同步执行!
    }

    @Async
    public void sendOrderNotification() {
        System.out.println("发送订单通知");
    }
}

// ✅ 正确:@Async 方法放在另一个 Bean 中,通过注入调用
@Service
public class OrderServiceFixed {

    // 注入通知服务——调用时走 Spring 代理
    private final NotificationService notificationService;

    public OrderServiceFixed(NotificationService notificationService) {
        this.notificationService = notificationService;
    }

    public void createOrder() {
        // 通过注入的 Bean 调用——走 AOP 代理,@Async 生效
        notificationService.sendOrderNotification();
    }
}

@Service
class NotificationService {
    @Async("orderProcessExecutor")
    public void sendOrderNotification() {
        System.out.println("异步发送订单通知");
    }
}

⚠️ 注意事项二:@Async 方法不能是 private 或 static。 原因:Spring AOP 基于代理实现,private/static 方法无法被代理。


8.2 事务与并发

8.2.1 @Transactional 跨线程失效

package com.example.concurrent.spring;

import org.springframework.stereotype.Service;
import org.springframework.transaction.annotation.Transactional;

import java.util.List;
import java.util.concurrent.*;

/**
 * ⚠️ 核心问题:@Transactional 是基于 ThreadLocal 实现的
 *
 * Spring 的事务管理器将事务信息(Connection)存储在当前线程的 ThreadLocal 中。
 * 子线程拿不到父线程的 ThreadLocal,因此在子线程中:
 * - 没有事务上下文
 * - 每个子线程各自开启独立连接
 * - 主线程回滚时,子线程的操作不会回滚
 */
@Service
public class TransactionConcurrencyPitfall {

    // 假设注入了 OrderRepository 和 InventoryRepository
    // private final OrderRepository orderRepository;
    // private final InventoryRepository inventoryRepository;

    /**
     * ❌ 错误写法:多线程中使用 @Transactional 期望全部回滚
     */
    @Transactional
    public void batchCreateOrders(List<String> orderIds) {
        ExecutorService pool = Executors.newFixedThreadPool(4);

        List<Future<?>> futures = new java.util.ArrayList<>();
        for (String orderId : orderIds) {
            futures.add(pool.submit(() -> {
                // ⚠️ 这里运行在子线程中!
                // 子线程没有主线程的事务上下文
                // 每个子线程使用独立的数据库连接
                // 即使主线程回滚,子线程已提交的数据不会回滚
                createSingleOrder(orderId);
            }));
        }

        // 等待所有任务完成
        for (Future<?> future : futures) {
            try {
                future.get();
            } catch (Exception e) {
                // 即使这里抛异常触发回滚
                // 子线程中已经 commit 的数据也无法回滚!
                throw new RuntimeException("批量创建失败", e);
            }
        }
    }

    private void createSingleOrder(String orderId) {
        // 数据库操作
        System.out.println("创建订单:" + orderId);
    }

    /**
     * ✅ 解决方案一:分批处理 + 补偿机制
     * 不追求全局事务,而是单个订单独立事务 + 失败重试/补偿
     */
    public void batchCreateOrdersV2(List<String> orderIds) {
        ExecutorService pool = Executors.newFixedThreadPool(4);

        List<Future<Boolean>> futures = new java.util.ArrayList<>();
        for (String orderId : orderIds) {
            futures.add(pool.submit(() -> {
                try {
                    // 每个订单在自己的线程中独立开启事务
                    createSingleOrderWithTransaction(orderId);
                    return true;
                } catch (Exception e) {
                    // 记录失败的订单,后续补偿
                    System.err.println("订单创建失败:" + orderId);
                    return false;
                }
            }));
        }

        // 收集失败的订单,写入补偿表
        List<String> failedOrders = new java.util.ArrayList<>();
        for (int i = 0; i < futures.size(); i++) {
            try {
                if (!futures.get(i).get()) {
                    failedOrders.add(orderIds.get(i));
                }
            } catch (Exception e) {
                failedOrders.add(orderIds.get(i));
            }
        }

        if (!failedOrders.isEmpty()) {
            // 将失败的订单写入补偿表,由定时任务重试
            System.err.println("以下订单需要补偿处理:" + failedOrders);
            // compensationService.save(failedOrders);
        }

        pool.shutdown();
    }

    // 这个方法应该由 Spring 管理事务(通过另一个 Bean 调用)
    @Transactional
    public void createSingleOrderWithTransaction(String orderId) {
        createSingleOrder(orderId);
    }

    /**
     * ✅ 解决方案二:编程式事务
     * 使用 TransactionTemplate 在子线程中手动管理事务
     */
    // 需要注入 TransactionTemplate
    // private final TransactionTemplate transactionTemplate;
    //
    // public void batchWithProgrammaticTx(List<String> orderIds) {
    //     ExecutorService pool = Executors.newFixedThreadPool(4);
    //     for (String orderId : orderIds) {
    //         pool.submit(() -> {
    //             // 在子线程中使用编程式事务
    //             transactionTemplate.execute(status -> {
    //                 try {
    //                     createSingleOrder(orderId);
    //                     return true;
    //                 } catch (Exception e) {
    //                     status.setRollbackOnly(); // 手动标记回滚
    //                     return false;
    //                 }
    //             });
    //         });
    //     }
    //     pool.shutdown();
    // }
}

8.3 并发场景下的 Bean 线程安全

package com.example.concurrent.spring;

import org.springframework.stereotype.Service;
import org.springframework.web.context.annotation.RequestScope;

import java.util.ArrayList;
import java.util.List;

/**
 * ⚠️ Spring Bean 默认是单例的(Singleton)
 *
 * 单例意味着:所有线程(所有 HTTP 请求)共享同一个实例
 * 如果 Bean 内部有可变状态(成员变量),就存在线程安全问题
 */

// ❌ 错误示例:有状态的单例 Bean
@Service
class UnsafeOrderService {

    // ⚠️ 可变的成员变量——所有线程共享!
    // 请求 A 正在处理时,请求 B 可能修改这个变量
    private List<String> processingOrders = new ArrayList<>();

    // ⚠️ 这也是可变状态
    private int totalCount = 0;

    public void processOrder(String orderId) {
        // 多个线程同时操作同一个 ArrayList——线程不安全
        processingOrders.add(orderId);
        totalCount++;  // 非原子操作——竞态条件
    }
}

// ✅ 解决方案一:无状态设计(最推荐)
@Service
class SafeOrderServiceV1 {

    // 没有可变的成员变量——天然线程安全
    // 所有数据通过方法参数传入,通过返回值输出

    public String processOrder(String orderId) {
        // 所有变量都是局部的——每个线程有自己的栈帧,互不干扰
        List<String> localList = new ArrayList<>();
        localList.add(orderId);
        return "处理完成:" + orderId;
    }
}

// ✅ 解决方案二:使用 ThreadLocal 存储请求级别的状态
@Service
class SafeOrderServiceV2 {

    // ThreadLocal 为每个线程维护独立的副本
    private final ThreadLocal<List<String>> processingOrders =
        ThreadLocal.withInitial(ArrayList::new);

    public void processOrder(String orderId) {
        try {
            // 每个线程操作自己的 List,互不干扰
            processingOrders.get().add(orderId);
        } finally {
            // ⚠️ 使用完必须清理!防止线程池复用时数据泄漏
            processingOrders.remove();
        }
    }
}

// ✅ 解决方案三:使用 Request Scope(Web 场景)
@Service
@RequestScope  // 每个 HTTP 请求创建一个新实例
class SafeOrderServiceV3 {

    // 每个请求有自己的实例,成员变量不会被共享
    private final List<String> processingOrders = new ArrayList<>();

    public void processOrder(String orderId) {
        // 安全:这个实例只属于当前请求
        processingOrders.add(orderId);
    }
}

三种解决方案对比

方案实现方式优点缺点推荐场景
无状态设计不存储可变成员变量最简单、最安全无法缓存中间状态大部分业务场景
ThreadLocal线程隔离的变量副本可以存储线程私有状态必须手动清理、内存泄漏风险日志跟踪、用户上下文传递
Request Scope每个请求一个实例自然隔离、Spring 自动管理性能略差(频繁创建实例)、注入限制Web 请求级别状态管理

附录:快速选型决策树

需要并发执行任务?
├── 简单的异步执行(不需要返回值)
│   └── 使用线程池 + Runnable

├── 需要获取异步结果?
│   ├── 单个异步任务 -> Callable + Future
│   ├── 多个任务编排 -> CompletableFuture(推荐)
│   └── Spring 环境 -> @Async + CompletableFuture

├── 需要控制并发数?
│   └── Semaphore

├── 需要等待多个任务完成?
│   ├── 一次性等待 -> CountDownLatch
│   └── 可重复使用 -> CyclicBarrier

├── 需要线程安全的集合?
│   ├── Map -> ConcurrentHashMap
│   ├── List(读多写少) -> CopyOnWriteArrayList
│   └── 队列(生产者-消费者) -> BlockingQueue

└── 需要互斥访问?
    ├── 简单场景 -> synchronized
    ├── 需要 tryLock/公平锁/多 Condition -> ReentrantLock
    ├── 读多写少 -> ReadWriteLock
    └── 读极多写极少 -> StampedLock

附录:核心 API 速查表

类/接口核心方法说明
ExecutorServicesubmit(), execute(), shutdown()线程池提交任务
ThreadPoolExecutor7 参数构造函数自定义线程池
CompletableFuturesupplyAsync(), thenApply(), allOf()异步编排
CountDownLatchcountDown(), await()等待多任务完成
CyclicBarrierawait()多线程同步汇合
Semaphoreacquire(), release(), tryAcquire()控制并发数
ReentrantLocklock(), tryLock(), lockInterruptibly()显式锁
ReadWriteLockreadLock(), writeLock()读写分离锁
ConcurrentHashMapputIfAbsent(), compute(), merge()线程安全 Map
BlockingQueueput(), take(), offer(), poll()阻塞队列
ThreadLocalget(), set(), remove()线程本地变量

最后的话:并发编程的核心不是记住 API,而是理解什么时候该用、什么时候不该用。 能不用锁就不用锁(无状态设计),能用 synchronized 就不用 ReentrantLock, 能用 CompletableFuture 就不要手动管理线程。 简单即正确,正确即性能。