Java 并发编程实战使用手册
本手册面向有一定 Java 基础的开发者,侧重企业级实战而非理论堆砌。 每一行代码都附有注释,解释的是 “为什么这样写” 而非 “这行代码做了什么”。
一、为什么需要并发编程
1.1 真实业务场景
在企业级开发中,以下场景几乎每天都会遇到:
| 场景 | 串行痛点 | 并发收益 |
|---|---|---|
| 订单批量处理 | 10000 笔订单逐条处理需要 30 分钟 | 10 线程并行,3 分钟完成 |
| 数据报表导出 | 汇总 5 个数据源需要 25 秒 | 并行查询,最慢的那个决定总耗时(约 8 秒) |
| 多渠道消息推送 | 短信 + 邮件 + App推送串行发送 3 秒 | 并行推送 1 秒 |
| 接口聚合调用 | 用户详情页需要调 5 个微服务,串行 2.5 秒 | 并行调用 0.6 秒 |
1.2 用数据说话:串行 vs 并发
package com.example.concurrent.demo;
import java.util.concurrent.*;
import java.util.List;
import java.util.ArrayList;
/**
* 模拟「用户详情页」接口聚合场景
* 需要从 5 个微服务获取数据:用户基础信息、订单列表、优惠券、积分、收货地址
*/
public class SequentialVsConcurrentDemo {
// 模拟远程调用,每个耗时 500ms
private static String callRemoteService(String serviceName) {
try {
Thread.sleep(500); // 模拟网络IO延迟
} catch (InterruptedException e) {
Thread.currentThread().interrupt(); // 恢复中断标志,不能吞掉中断
}
return serviceName + " 的数据";
}
/**
* 串行调用:总耗时 = 所有调用耗时之和
*/
public static void sequential() {
long start = System.currentTimeMillis();
// 逐个调用,后一个必须等前一个完成
String userInfo = callRemoteService("用户服务");
String orders = callRemoteService("订单服务");
String coupons = callRemoteService("优惠券服务");
String points = callRemoteService("积分服务");
String addresses = callRemoteService("地址服务");
long cost = System.currentTimeMillis() - start;
System.out.println("串行调用耗时:" + cost + "ms"); // 约 2500ms
}
/**
* 并发调用:总耗时 ≈ 最慢那个调用的耗时
*/
public static void concurrent() throws Exception {
long start = System.currentTimeMillis();
// 使用线程池而非裸创建线程——线程是重量级资源,必须池化复用
ExecutorService executor = Executors.newFixedThreadPool(5);
// 提交 5 个异步任务,每个任务返回一个 Future
Future<String> userInfoFuture = executor.submit(
() -> callRemoteService("用户服务")
);
Future<String> ordersFuture = executor.submit(
() -> callRemoteService("订单服务")
);
Future<String> couponsFuture = executor.submit(
() -> callRemoteService("优惠券服务")
);
Future<String> pointsFuture = executor.submit(
() -> callRemoteService("积分服务")
);
Future<String> addressesFuture = executor.submit(
() -> callRemoteService("地址服务")
);
// get() 会阻塞等待结果,但 5 个任务是并行执行的
String userInfo = userInfoFuture.get();
String orders = ordersFuture.get();
String coupons = couponsFuture.get();
String points = pointsFuture.get();
String addresses = addressesFuture.get();
long cost = System.currentTimeMillis() - start;
System.out.println("并发调用耗时:" + cost + "ms"); // 约 500ms
executor.shutdown(); // 优雅关闭线程池,不再接受新任务
}
public static void main(String[] args) throws Exception {
sequential(); // 输出约 2500ms
concurrent(); // 输出约 500ms
// 性能提升约 5 倍,这就是并发的价值
}
}
结论:并发编程不是炫技,而是在 IO 密集型场景下提升吞吐量和响应速度的核心手段。
二、线程创建的三种方式与选型
2.1 方式一:继承 Thread 类
package com.example.concurrent.thread;
/**
* 方式一:继承 Thread
*
* ⚠️ 实际项目中几乎不用这种方式,原因:
* 1. Java 单继承,继承了 Thread 就无法继承其他类
* 2. 任务和线程耦合在一起,无法复用任务逻辑
* 3. 没有返回值,无法知道任务执行结果
*/
public class OrderProcessThread extends Thread {
private final String orderId;
public OrderProcessThread(String orderId) {
super("order-process-" + orderId); // 给线程命名,方便排查问题
this.orderId = orderId;
}
@Override
public void run() {
// 业务逻辑直接写在 run 方法里,与线程绑定——这是最大的问题
System.out.println(Thread.currentThread().getName()
+ " 正在处理订单:" + orderId);
}
public static void main(String[] args) {
// 每次都要 new 一个线程,无法池化复用
OrderProcessThread t = new OrderProcessThread("ORD-20240101-001");
t.start(); // start() 才是启动线程,run() 只是普通方法调用
}
}
2.2 方式二:实现 Runnable 接口
package com.example.concurrent.thread;
/**
* 方式二:实现 Runnable 接口
*
* 优点:
* 1. 任务(Runnable)和线程(Thread)解耦,任务可以被不同线程执行
* 2. 可以同时继承其他类
* 3. 天然适合提交给线程池
*
* 缺点:没有返回值,无法抛出受检异常
*/
public class OrderProcessTask implements Runnable {
private final String orderId;
public OrderProcessTask(String orderId) {
this.orderId = orderId;
}
@Override
public void run() {
// 任务逻辑独立于线程存在,可以被线程池复用
System.out.println(Thread.currentThread().getName()
+ " 正在处理订单:" + orderId);
// 模拟业务处理
try {
Thread.sleep(100);
} catch (InterruptedException e) {
// 恢复中断标志——让上层调用者知道发生了中断
Thread.currentThread().interrupt();
}
}
public static void main(String[] args) {
// 任务和线程分离,任务可以复用
Runnable task = new OrderProcessTask("ORD-20240101-001");
// 可以交给 Thread 执行
new Thread(task, "worker-1").start();
// 更好的做法:交给线程池执行(后面会讲)
// executorService.submit(task);
}
}
2.3 方式三:实现 Callable + Future
package com.example.concurrent.thread;
import java.util.concurrent.*;
/**
* 方式三:实现 Callable + Future
*
* 企业级开发中最常用的方式,原因:
* 1. 有返回值——可以拿到任务执行结果
* 2. 可以抛出受检异常——异常不会被吞掉
* 3. 配合 Future 可以实现超时控制
*/
public class OrderValidationTask implements Callable<Boolean> {
private final String orderId;
public OrderValidationTask(String orderId) {
this.orderId = orderId;
}
@Override
public Boolean call() throws Exception {
// 可以抛异常——Runnable 的 run() 做不到这一点
System.out.println(Thread.currentThread().getName()
+ " 正在校验订单:" + orderId);
// 模拟校验逻辑:查库存、验地址、检查黑名单...
Thread.sleep(200);
// 返回校验结果——Runnable 做不到
return true;
}
public static void main(String[] args) throws Exception {
ExecutorService executor = Executors.newSingleThreadExecutor();
// submit 返回 Future,持有异步计算的结果
Future<Boolean> future = executor.submit(
new OrderValidationTask("ORD-20240101-001")
);
// get(timeout) 带超时控制——防止任务卡死拖垮调用方
try {
Boolean isValid = future.get(3, TimeUnit.SECONDS);
System.out.println("订单校验结果:" + isValid);
} catch (TimeoutException e) {
// 超时后取消任务,释放线程资源
future.cancel(true);
System.out.println("订单校验超时,已取消");
}
executor.shutdown();
}
}
2.4 三种方式对比
| 维度 | Thread | Runnable | Callable + Future |
|---|---|---|---|
| 实现方式 | 继承 Thread 类 | 实现 Runnable 接口 | 实现 Callable 接口 |
| 有返回值 | 否 | 否 | 是 |
| 可抛受检异常 | 否 | 否 | 是 |
| 可继承其他类 | 否(Java 单继承) | 是 | 是 |
| 可提交线程池 | 不直接支持 | 支持 | 支持 |
| 支持超时控制 | 否 | 否 | 是(Future.get(timeout)) |
| 企业项目推荐 | 不推荐 | 简单场景可用 | 推荐 |
⚠️ 重要提醒:在实际企业项目中,我们几乎不会直接创建线程。 正确做法是把 Runnable/Callable 提交给线程池。 直接
new Thread()意味着:无法复用、无法控制数量、无法监控——是生产事故的温床。
三、线程池 — 并发编程的核心基础设施
线程池是 Java 并发编程中最重要的基础设施,没有之一。 你可以不用锁,但你不能不用线程池。
3.1 为什么禁止使用 Executors 创建线程池
阿里巴巴 Java 开发手册明确规定:线程池不允许使用 Executors 创建。原因如下:
FixedThreadPool 和 SingleThreadExecutor 的风险
package com.example.concurrent.pool;
import java.util.concurrent.*;
/**
* ⚠️ 反面教材:Executors.newFixedThreadPool 的 OOM 风险
*
* 根源:其工作队列是 LinkedBlockingQueue,默认容量为 Integer.MAX_VALUE(约 21 亿)
* 如果任务生产速度 > 消费速度,队列会无限增长,最终 OOM
*/
public class FixedThreadPoolOOMDemo {
public static void main(String[] args) {
// 看似安全的 2 个线程——实际上队列是无界的
ExecutorService pool = Executors.newFixedThreadPool(2);
// 模拟:大量请求涌入,每个任务执行时间较长
for (int i = 0; i < Integer.MAX_VALUE; i++) {
pool.submit(() -> {
try {
// 模拟慢任务(如调用第三方接口超时)
Thread.sleep(10_000);
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
}
});
}
// 结果:队列不断积压 -> 内存持续增长 -> OOM
// java.lang.OutOfMemoryError: GC overhead limit exceeded
}
}
CachedThreadPool 的风险
package com.example.concurrent.pool;
import java.util.concurrent.*;
/**
* ⚠️ 反面教材:Executors.newCachedThreadPool 的风险
*
* 根源:maximumPoolSize 为 Integer.MAX_VALUE
* 来一个任务就创建一个线程,如果大量突发请求,会创建海量线程
* 每个线程约占 1MB 栈内存 -> 直接 OOM 或导致 CPU 上下文切换风暴
*/
public class CachedThreadPoolOOMDemo {
public static void main(String[] args) {
// 看似灵活的缓存线程池——实际是定时炸弹
ExecutorService pool = Executors.newCachedThreadPool();
// 模拟:瞬间涌入大量请求
for (int i = 0; i < 100_000; i++) {
pool.submit(() -> {
try {
Thread.sleep(5_000); // 每个任务持续 5 秒
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
}
});
}
// 结果:瞬间创建 10 万个线程 -> 栈内存约 100GB -> OOM
}
}
ScheduledThreadPool 的风险
package com.example.concurrent.pool;
import java.util.concurrent.*;
/**
* ⚠️ 反面教材:Executors.newScheduledThreadPool 的风险
*
* 根源:工作队列是 DelayedWorkQueue,也是无界队列
* 如果定时任务执行失败后不断重试,或者任务提交过多,同样可能 OOM
*/
public class ScheduledThreadPoolRiskDemo {
public static void main(String[] args) {
// 看似安全的定时线程池——队列依然无界
ScheduledExecutorService pool = Executors.newScheduledThreadPool(2);
// 大量定时任务堆积在 DelayedWorkQueue 中
for (int i = 0; i < 1_000_000; i++) {
pool.schedule(() -> {
// 模拟任务
}, 1, TimeUnit.HOURS); // 1 小时后执行,但任务对象立刻占用内存
}
// 100 万个任务对象堆积在内存中,可能导致 OOM
}
}
各 Executors 工厂方法风险汇总:
| 工厂方法 | 队列类型 | 最大线程数 | 核心风险 |
|---|---|---|---|
newFixedThreadPool | LinkedBlockingQueue(无界) | 固定 | 队列堆积 -> OOM |
newSingleThreadExecutor | LinkedBlockingQueue(无界) | 1 | 队列堆积 -> OOM |
newCachedThreadPool | SynchronousQueue | Integer.MAX_VALUE | 线程数爆炸 -> OOM |
newScheduledThreadPool | DelayedWorkQueue(无界) | Integer.MAX_VALUE | 队列堆积 + 线程爆炸 |
⚠️ 正确做法:始终使用
new ThreadPoolExecutor(...)手动创建,明确指定每个参数。
3.2 ThreadPoolExecutor 七大参数详解
用一个餐厅厨房的比喻来理解七大参数:
想象一个餐厅厨房:有正式厨师、临时帮工、等位区和客满策略。
┌──────────────────────────────────────────────────────────┐
│ 餐厅厨房(线程池) │
│ │
│ [正式厨师 1] [正式厨师 2] [正式厨师 3] <- corePoolSize │
│ │
│ [临时帮工 1] [临时帮工 2] <- 额外线程 │
│ (正式 + 临时 = maximumPoolSize) │
│ │
│ 临时帮工空闲超过 keepAliveTime 就辞退 │
│ │
│ ┌─等位区(workQueue)──────────────┐ │
│ │ [订单4] [订单5] [订单6] [订单7] │ <- 有界队列 │
│ └─────────────────────────────────┘ │
│ │
│ 客满策略(handler):拒绝 / 调用者自己做 / 丢弃最旧的 │
│ │
│ 厨师培训标准(threadFactory):统一命名、设置优先级 │
└──────────────────────────────────────────────────────────┘
七大参数逐一解析:
package com.example.concurrent.pool;
import java.util.concurrent.*;
import java.util.concurrent.atomic.AtomicInteger;
/**
* ThreadPoolExecutor 七大参数详解
*/
public class ThreadPoolParametersDemo {
public static void main(String[] args) {
ThreadPoolExecutor executor = new ThreadPoolExecutor(
// 1. corePoolSize = 3 ——「正式厨师」数量
// 这些线程常驻,即使空闲也不会被销毁(除非设置 allowCoreThreadTimeOut)
// 决定了线程池的常态处理能力
3,
// 2. maximumPoolSize = 6 ——「正式厨师 + 临时帮工」的总人数上限
// 当核心线程忙不过来、且队列满了,才会创建临时线程
// 注意:不是队列没满就创建!必须队列也满了才会扩容
6,
// 3. keepAliveTime = 60 ——临时帮工的最大空闲等待时间
// 临时线程空闲超过这个时间就会被销毁,释放资源
// 设太短:线程频繁创建销毁,性能损耗
// 设太长:空闲线程占用资源
60,
// 4. unit = SECONDS ——keepAliveTime 的时间单位
TimeUnit.SECONDS,
// 5. workQueue = 容量为 100 的有界队列 ——「等位区」
// ⚠️ 必须用有界队列!无界队列 = OOM 定时炸弹
// 容量太小:拒绝策略频繁触发,任务大量丢失
// 容量太大:内存占用高,任务排队时间长
new ArrayBlockingQueue<>(100),
// 6. threadFactory ——「厨师培训标准」
// 自定义线程工厂,核心作用:给线程命名
// 线上出问题时,线程名是排查问题的第一线索
new ThreadFactory() {
private final AtomicInteger counter = new AtomicInteger(1);
@Override
public Thread newThread(Runnable r) {
Thread thread = new Thread(r);
// 命名规范:业务名-pool-序号
thread.setName("order-process-pool-" + counter.getAndIncrement());
// 设为非守护线程,确保任务执行完毕
thread.setDaemon(false);
return thread;
}
},
// 7. handler = CallerRunsPolicy ——「客满策略」
// 当核心线程满、队列满、临时线程也满时,触发拒绝策略
// CallerRunsPolicy:让调用者线程自己执行该任务
// 好处:天然限流,调用方线程忙于执行任务,就不会继续提交新任务
new ThreadPoolExecutor.CallerRunsPolicy()
);
// 提交任务
for (int i = 0; i < 200; i++) {
final int taskId = i;
executor.submit(() -> {
System.out.println(Thread.currentThread().getName()
+ " 执行任务 " + taskId);
try {
Thread.sleep(100);
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
}
});
}
executor.shutdown(); // 不再接受新任务,等待已提交任务执行完毕
}
}
四种拒绝策略对比:
| 拒绝策略 | 行为 | 适用场景 |
|---|---|---|
AbortPolicy(默认) | 抛出 RejectedExecutionException | 需要感知任务被拒绝的场景 |
CallerRunsPolicy | 调用者线程自己执行 | 不允许丢弃任务 + 需要限流 |
DiscardPolicy | 静默丢弃 | 可以容忍任务丢失(如日志上报) |
DiscardOldestPolicy | 丢弃队列头部最旧任务 | 只关心最新任务(如实时行情刷新) |
3.3 任务提交流程
当调用 executor.execute(task) 或 executor.submit(task) 时,ThreadPoolExecutor 内部执行以下流程:
┌──────────────┐
│ 提交新任务 │
└──────┬───────┘
│
v
┌───────────────────────┐
│ 当前线程数 < coreSize? │
└───────┬───────┬───────┘
是 │ │ 否
v v
┌──────────┐ ┌──────────────────┐
│创建核心线程│ │ 工作队列未满? │
│执行任务 │ └───┬──────────┬───┘
└──────────┘ 是 │ │ 否
v v
┌──────────┐ ┌──────────────────────┐
│任务入队列 │ │当前线程数<maxPoolSize?│
│等待执行 │ └───┬──────────┬───────┘
└──────────┘ 是 │ │ 否
v v
┌──────────┐ ┌──────────┐
│创建临时线程│ │执行拒绝策略│
│执行任务 │ │(handler) │
└──────────┘ └──────────┘
⚠️ 关键易错点:很多人以为「核心线程满了就创建临时线程」——错! 正确顺序是:核心线程满 -> 任务进队列 -> 队列满 -> 这时候才创建临时线程。 所以如果你的队列容量设了
Integer.MAX_VALUE,临时线程永远不会被创建。
3.4 企业级线程池配置
3.4.1 线程数设置公式
| 任务类型 | 公式 | 解释 |
|---|---|---|
| CPU 密集型 | 线程数 = CPU 核心数 + 1 | 加 1 是为了在某个线程因为偶发页缺失暂停时,额外的线程能顶上 |
| IO 密集型 | 线程数 = CPU 核心数 * 2 * (1 + IO等待时间/CPU计算时间) | IO 等待时间越长,需要越多线程来利用 CPU |
| 混合型 | 拆分为 CPU 密集和 IO 密集两个线程池 | 避免互相影响 |
⚠️ 公式只是起点,最终线程数必须通过压测确定。 不同机器、不同负载、不同业务逻辑,最优值差异巨大。
3.4.2 Spring Boot 中的企业级线程池配置
package com.example.concurrent.config;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor;
import java.util.concurrent.*;
import java.util.concurrent.atomic.AtomicInteger;
/**
* 企业级线程池配置
*
* 核心原则:
* 1. 不同业务使用不同线程池——隔离故障域
* 2. 必须自定义线程名——排查问题的生命线
* 3. 必须用有界队列——防止 OOM
* 4. 必须配置拒绝策略——明确过载时的行为
*/
@Configuration
public class ThreadPoolConfig {
/**
* 订单处理线程池——IO 密集型
* 主要操作:数据库读写、RPC 调用
*/
@Bean("orderProcessExecutor")
public ThreadPoolTaskExecutor orderProcessExecutor() {
ThreadPoolTaskExecutor executor = new ThreadPoolTaskExecutor();
// 获取 CPU 核心数作为计算基准
int cpuCores = Runtime.getRuntime().availableProcessors();
// IO 密集型:核心线程数 = CPU 核心数 * 2
// 因为 IO 等待时线程会让出 CPU,所以需要更多线程来保持 CPU 繁忙
executor.setCorePoolSize(cpuCores * 2);
// 最大线程数 = 核心线程数 * 2,预留弹性空间应对突发流量
executor.setMaxPoolSize(cpuCores * 4);
// 有界队列容量——根据业务峰值评估,不能太大也不能太小
// 太大:内存占用高,任务排队延迟大
// 太小:拒绝策略频繁触发
executor.setQueueCapacity(500);
// 临时线程空闲 60 秒后回收
executor.setKeepAliveSeconds(60);
// ⚠️ 核心:给线程命名,线上出问题时能快速定位是哪个业务的线程池
executor.setThreadNamePrefix("order-process-");
// 拒绝策略:由调用者线程执行,天然限流
executor.setRejectedExecutionHandler(
new ThreadPoolExecutor.CallerRunsPolicy()
);
// 等待所有任务完成后再关闭——优雅停机
executor.setWaitForTasksToCompleteOnShutdown(true);
// 最多等待 60 秒,超时强制关闭——防止停机时间过长
executor.setAwaitTerminationSeconds(60);
executor.initialize();
return executor;
}
/**
* 报表导出线程池——CPU 密集型
* 主要操作:数据计算、Excel 生成
*/
@Bean("reportExportExecutor")
public ThreadPoolTaskExecutor reportExportExecutor() {
ThreadPoolTaskExecutor executor = new ThreadPoolTaskExecutor();
int cpuCores = Runtime.getRuntime().availableProcessors();
// CPU 密集型:核心线程数 = CPU 核心数 + 1
// 过多线程反而因为上下文切换降低性能
executor.setCorePoolSize(cpuCores + 1);
executor.setMaxPoolSize(cpuCores + 1); // CPU 密集型不需要太多额外线程
executor.setQueueCapacity(200);
executor.setKeepAliveSeconds(60);
executor.setThreadNamePrefix("report-export-");
executor.setRejectedExecutionHandler(
new ThreadPoolExecutor.CallerRunsPolicy()
);
executor.setWaitForTasksToCompleteOnShutdown(true);
executor.setAwaitTerminationSeconds(120); // 报表导出可能耗时较长
executor.initialize();
return executor;
}
/**
* 消息推送线程池——IO 密集型
* 主要操作:调用短信网关、邮件服务、App 推送接口
*/
@Bean("messagePushExecutor")
public ThreadPoolTaskExecutor messagePushExecutor() {
ThreadPoolTaskExecutor executor = new ThreadPoolTaskExecutor();
int cpuCores = Runtime.getRuntime().availableProcessors();
executor.setCorePoolSize(cpuCores * 2);
executor.setMaxPoolSize(cpuCores * 4);
executor.setQueueCapacity(1000); // 消息推送场景队列可以大一些
executor.setKeepAliveSeconds(60);
executor.setThreadNamePrefix("msg-push-");
// 消息推送场景:使用 AbortPolicy,拒绝时抛异常,
// 由上层捕获后写入消息补偿表,确保不丢消息
executor.setRejectedExecutionHandler(
new ThreadPoolExecutor.AbortPolicy()
);
executor.setWaitForTasksToCompleteOnShutdown(true);
executor.setAwaitTerminationSeconds(30);
executor.initialize();
return executor;
}
}
3.4.3 自定义线程工厂(不使用 Spring 时)
package com.example.concurrent.pool;
import java.util.concurrent.ThreadFactory;
import java.util.concurrent.atomic.AtomicInteger;
/**
* 自定义线程工厂——企业开发必备
*
* 为什么需要?
* 1. 默认线程名是 pool-N-thread-M,线上出问题完全看不出是哪个业务
* 2. 需要统一设置守护线程、优先级、异常处理器等
*/
public class NamedThreadFactory implements ThreadFactory {
// 线程池编号——区分同一业务的多个线程池实例
private static final AtomicInteger POOL_COUNTER = new AtomicInteger(1);
// 线程编号——区分同一线程池中的不同线程
private final AtomicInteger threadCounter = new AtomicInteger(1);
// 线程名前缀——标识业务含义
private final String namePrefix;
// 是否为守护线程
private final boolean daemon;
// 未捕获异常处理器——防止线程因为异常静默死亡
private final Thread.UncaughtExceptionHandler exceptionHandler;
public NamedThreadFactory(String businessName) {
this(businessName, false);
}
public NamedThreadFactory(String businessName, boolean daemon) {
// 命名规范:业务名-pool-N-thread-M
this.namePrefix = businessName + "-pool-"
+ POOL_COUNTER.getAndIncrement() + "-thread-";
this.daemon = daemon;
// 默认异常处理:打印日志,实际项目中应接入告警系统
this.exceptionHandler = (thread, throwable) -> {
System.err.println("线程 " + thread.getName()
+ " 发生未捕获异常:" + throwable.getMessage());
throwable.printStackTrace();
// 实际项目中应该:
// 1. 记录到日志系统
// 2. 发送告警(钉钉/企微/邮件)
// 3. 上报监控平台(Prometheus/Grafana)
};
}
@Override
public Thread newThread(Runnable r) {
Thread thread = new Thread(r);
thread.setName(namePrefix + threadCounter.getAndIncrement());
thread.setDaemon(daemon);
// 设置未捕获异常处理器——这是最后一道防线
thread.setUncaughtExceptionHandler(exceptionHandler);
return thread;
}
}
3.4.4 优雅关闭线程池
package com.example.concurrent.pool;
import java.util.List;
import java.util.concurrent.*;
/**
* 线程池优雅关闭——生产环境必须处理
*
* 为什么不能直接 shutdownNow()?
* 因为正在执行的任务会被中断,队列中等待的任务会被丢弃
* 可能导致:订单处理到一半被中断、数据不一致
*/
public class GracefulShutdownDemo {
/**
* 推荐的优雅关闭流程
*/
public static void gracefulShutdown(ExecutorService executor,
long timeout,
TimeUnit unit) {
// 第一步:停止接受新任务
// shutdown() 不会中断正在执行的任务,只是不再接受新提交
executor.shutdown();
try {
// 第二步:等待已提交任务完成
// 给一个合理的等待时间(如 60 秒)
if (!executor.awaitTermination(timeout, unit)) {
// 第三步:等待超时,强制关闭
// shutdownNow() 会中断正在执行的任务,返回队列中未执行的任务
List<Runnable> droppedTasks = executor.shutdownNow();
// 记录被丢弃的任务,用于后续补偿处理
System.err.println("线程池强制关闭,丢弃 "
+ droppedTasks.size() + " 个未执行任务");
// 实际项目中:将未执行的任务写入数据库/消息队列,后续补偿
// droppedTasks.forEach(task -> compensationService.save(task));
// 再次等待一小段时间
if (!executor.awaitTermination(10, TimeUnit.SECONDS)) {
System.err.println("线程池未能在规定时间内关闭");
}
}
} catch (InterruptedException e) {
// 当前线程被中断,立刻强制关闭
executor.shutdownNow();
// 恢复中断标志
Thread.currentThread().interrupt();
}
}
/**
* 注册 JVM 关闭钩子——确保应用退出时线程池能优雅关闭
*/
public static void registerShutdownHook(ExecutorService executor,
String poolName) {
Runtime.getRuntime().addShutdownHook(new Thread(() -> {
System.out.println("JVM 正在关闭,开始优雅关闭线程池:" + poolName);
gracefulShutdown(executor, 60, TimeUnit.SECONDS);
System.out.println("线程池 " + poolName + " 已关闭");
}, poolName + "-shutdown-hook"));
}
}
3.4.5 线程池监控
package com.example.concurrent.pool;
import java.util.concurrent.*;
/**
* 线程池监控——生产环境不可或缺
*
* 为什么要监控?
* 1. 队列积压过多 -> 任务延迟增大 -> 用户体验下降
* 2. 活跃线程持续打满 -> 说明线程数不够或下游太慢
* 3. 拒绝次数增多 -> 说明需要扩容或限流
*/
public class ThreadPoolMonitor {
/**
* 打印线程池核心指标
* 实际项目中应该将这些指标暴露给 Prometheus 等监控系统
*/
public static void printMetrics(ThreadPoolExecutor executor,
String poolName) {
System.out.println("========== " + poolName + " 线程池指标 ==========");
// 核心线程数——线程池的基准处理能力
System.out.println("核心线程数: " + executor.getCorePoolSize());
// 最大线程数——线程池的峰值处理能力
System.out.println("最大线程数: " + executor.getMaximumPoolSize());
// 当前线程数——实际存活的线程数
System.out.println("当前线程数: " + executor.getPoolSize());
// 活跃线程数——正在执行任务的线程数
// 如果长期接近最大线程数,说明需要扩容
System.out.println("活跃线程数: " + executor.getActiveCount());
// 队列中等待的任务数——反映系统负载
// 如果持续增长,说明消费速度跟不上生产速度
System.out.println("队列等待任务数: " + executor.getQueue().size());
// 队列剩余容量
System.out.println("队列剩余容量: "
+ executor.getQueue().remainingCapacity());
// 已完成任务总数
System.out.println("已完成任务总数: " + executor.getCompletedTaskCount());
// 历史最大线程数——反映峰值负载
System.out.println("历史最大线程数: " + executor.getLargestPoolSize());
System.out.println("==============================================");
}
/**
* 定时监控线程池(可接入 Prometheus/Grafana)
*/
public static ScheduledExecutorService startMonitor(
ThreadPoolExecutor executor, String poolName) {
ScheduledExecutorService monitor = Executors.newSingleThreadScheduledExecutor(
r -> new Thread(r, poolName + "-monitor")
);
// 每 10 秒打印一次指标
monitor.scheduleAtFixedRate(
() -> printMetrics(executor, poolName),
0, 10, TimeUnit.SECONDS
);
return monitor;
}
}
3.5 动态线程池
传统线程池的参数在启动后就固定了,如果线上发现参数不合理,必须修改代码 + 重新发布。 动态线程池允许在运行时修改核心参数,实现秒级生效。
核心原理
package com.example.concurrent.pool;
import java.util.concurrent.*;
/**
* 动态线程池的核心原理
*
* ThreadPoolExecutor 本身就提供了动态修改参数的 API:
* - setCorePoolSize()
* - setMaximumPoolSize()
* - setKeepAliveTime()
*
* 我们只需要把这些 API 与配置中心(Nacos/Apollo)打通即可
*/
public class DynamicThreadPoolDemo {
/**
* 运行时修改线程池参数
*/
public static void resizeThreadPool(ThreadPoolExecutor executor,
int newCoreSize,
int newMaxSize,
int newQueueCapacity) {
// ⚠️ 必须先设 maxPoolSize 再设 corePoolSize
// 如果 newCoreSize > 当前 maxPoolSize,会抛 IllegalArgumentException
if (newMaxSize >= newCoreSize) {
executor.setMaximumPoolSize(newMaxSize);
executor.setCorePoolSize(newCoreSize);
}
// ⚠️ 注意:队列容量不能直接修改(ArrayBlockingQueue 是 final 的)
// 如果需要动态修改队列容量,需要使用 ResizableCapacityLinkedBlockingQueue
// 这是开源动态线程池框架(如 Hippo4j、DynamicTp)的常用做法
System.out.println("线程池参数已更新 -> "
+ "core=" + newCoreSize
+ ", max=" + newMaxSize);
}
/**
* 模拟从配置中心获取配置并动态更新
* 实际项目中使用 @NacosConfigListener 或 Apollo 的 ConfigChangeListener
*/
public static void main(String[] args) throws InterruptedException {
ThreadPoolExecutor executor = new ThreadPoolExecutor(
4, 8, 60, TimeUnit.SECONDS,
new LinkedBlockingQueue<>(200),
new NamedThreadFactory("dynamic-demo"),
new ThreadPoolExecutor.CallerRunsPolicy()
);
// 模拟运行一段时间后,运维发现线程数不够
System.out.println("初始配置 -> core=" + executor.getCorePoolSize()
+ ", max=" + executor.getMaximumPoolSize());
Thread.sleep(2000);
// 动态扩容——无需重启应用
resizeThreadPool(executor, 8, 16, 500);
// 验证修改生效
System.out.println("更新后配置 -> core=" + executor.getCorePoolSize()
+ ", max=" + executor.getMaximumPoolSize());
executor.shutdown();
}
}
⚠️ 生产建议:不要自己造轮子,推荐使用成熟的开源框架:
- Hippo4j(美团开源)
- DynamicTp(京东开源) 它们提供了配置中心集成、监控报警、运行时调参等完整能力。
四、锁的实战选型指南
4.1 synchronized — 内置锁
package com.example.concurrent.lock;
import java.util.HashMap;
import java.util.Map;
/**
* synchronized 是 Java 内置的锁机制
*
* JDK 6 之后做了大量优化(偏向锁、轻量级锁、自适应自旋)
* 在低竞争场景下,性能已经不输 ReentrantLock
*
* 什么时候用 synchronized 就够了?
* 1. 不需要尝试加锁(tryLock)
* 2. 不需要可中断加锁
* 3. 不需要公平锁
* 4. 不需要多个条件变量(Condition)
* 5. 简单的互斥场景
*/
public class SynchronizedDemo {
// 场景:简单的本地缓存
private final Map<String, Object> cache = new HashMap<>();
/**
* 方式一:synchronized 方法
* 锁对象是 this(当前实例)
* 适用于整个方法都需要同步的场景
*/
public synchronized Object getFromCache(String key) {
return cache.get(key);
}
/**
* 方式二:synchronized 代码块
* 锁对象可以自由指定,粒度更细
* 推荐使用——只锁需要同步的部分,减少锁持有时间
*/
public Object getOrLoad(String key) {
// 先无锁检查,大部分情况下缓存命中,无需加锁
Object value = cache.get(key);
if (value != null) {
return value;
}
// 缓存未命中,加锁后再次检查(double-check)
synchronized (this) {
// ⚠️ 必须再次检查!
// 因为在等待锁的过程中,可能其他线程已经加载了数据
value = cache.get(key);
if (value != null) {
return value;
}
// 执行加载逻辑
value = loadFromDatabase(key);
cache.put(key, value);
return value;
}
}
/**
* 方式三:锁定专门的锁对象
* 推荐用 private final 的对象作为锁——防止外部代码意外锁定同一对象
*/
private final Object writeLock = new Object();
public void updateCache(String key, Object value) {
synchronized (writeLock) {
// 使用专门的锁对象而非 this
// 原因:如果外部代码也用 synchronized(this) 锁了同一个实例
// 就会产生意外的锁竞争
cache.put(key, value);
}
}
private Object loadFromDatabase(String key) {
// 模拟数据库查询
return "value-of-" + key;
}
}
4.2 ReentrantLock — 可重入锁
package com.example.concurrent.lock;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.locks.Condition;
import java.util.concurrent.locks.ReentrantLock;
/**
* ReentrantLock 相比 synchronized 的四大优势:
* 1. 可尝试加锁(tryLock)——避免死等
* 2. 可中断加锁(lockInterruptibly)——响应中断
* 3. 支持公平锁——防止线程饥饿
* 4. 支持多条件变量(Condition)——更精确的等待/通知
*/
public class ReentrantLockDemo {
// 非公平锁(默认)——吞吐量更高,但可能导致某些线程长时间拿不到锁
private final ReentrantLock unfairLock = new ReentrantLock();
// 公平锁——保证先请求的线程先获得锁,但吞吐量略低
private final ReentrantLock fairLock = new ReentrantLock(true);
/**
* 优势一:tryLock —— 带超时的尝试加锁
*
* 场景:分布式系统中,不能无限等待锁
* 比如:抢购场景,获取锁超时就直接返回"库存不足"
*/
public boolean tryDeductStock(String productId, int quantity) {
// 尝试获取锁,最多等待 500 毫秒
// 如果 500ms 内没拿到锁,直接返回 false——不要死等
try {
if (unfairLock.tryLock(500, TimeUnit.MILLISECONDS)) {
try {
// 获取到锁,执行扣减库存逻辑
System.out.println("扣减库存成功:" + productId);
return true;
} finally {
// ⚠️ unlock 必须放在 finally 中!
// synchronized 自动释放锁,ReentrantLock 必须手动释放
// 如果忘记 unlock,将导致永久死锁
unfairLock.unlock();
}
} else {
// 获取锁超时,快速失败
System.out.println("库存扣减繁忙,请重试");
return false;
}
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
return false;
}
}
/**
* 优势二:lockInterruptibly —— 可中断加锁
*
* 场景:用户取消了操作,线程应该立即停止等待锁
* 如果用 synchronized,线程会一直阻塞在锁上,无法响应中断
*/
public void interruptibleOperation() {
try {
// 可中断地等待锁——如果线程被中断,立即抛出 InterruptedException
unfairLock.lockInterruptibly();
try {
// 执行业务逻辑
System.out.println("执行可中断操作");
} finally {
unfairLock.unlock();
}
} catch (InterruptedException e) {
// 线程被中断,可以做清理工作
System.out.println("操作被中断,正在清理...");
Thread.currentThread().interrupt();
}
}
/**
* 优势三:多条件变量(Condition)
*
* 场景:实现一个有界阻塞队列
* synchronized 只有一个等待集(wait/notify),无法区分"队列满"和"队列空"
* Condition 可以创建多个等待集,实现更精确的通知
*/
private final ReentrantLock queueLock = new ReentrantLock();
// 队列非空条件——消费者等待此条件
private final Condition notEmpty = queueLock.newCondition();
// 队列非满条件——生产者等待此条件
private final Condition notFull = queueLock.newCondition();
private final Object[] items = new Object[100];
private int count = 0;
public void put(Object item) throws InterruptedException {
queueLock.lock();
try {
// 队列满时,生产者等待在 notFull 条件上
while (count == items.length) {
notFull.await(); // 只等待"队列非满"这一个条件
}
items[count++] = item;
// 放入元素后,通知消费者:队列不空了
notEmpty.signal(); // 精确唤醒等待"队列非空"的消费者线程
} finally {
queueLock.unlock();
}
}
public Object take() throws InterruptedException {
queueLock.lock();
try {
// 队列空时,消费者等待在 notEmpty 条件上
while (count == 0) {
notEmpty.await(); // 只等待"队列非空"这一个条件
}
Object item = items[--count];
// 取出元素后,通知生产者:队列不满了
notFull.signal(); // 精确唤醒等待"队列非满"的生产者线程
return item;
} finally {
queueLock.unlock();
}
}
}
4.3 ReadWriteLock — 读写锁
package com.example.concurrent.lock;
import java.util.HashMap;
import java.util.Map;
import java.util.concurrent.locks.ReadWriteLock;
import java.util.concurrent.locks.ReentrantReadWriteLock;
/**
* ReadWriteLock 适用场景:读多写少
*
* 核心规则:
* - 读-读 不互斥——多个线程可以同时读
* - 读-写 互斥——有线程在读时,写线程必须等待
* - 写-写 互斥——同一时刻只能有一个线程在写
*
* 典型场景:配置信息缓存、权限数据缓存等读远多于写的场景
*/
public class ReadWriteLockCacheDemo {
// 读写锁
private final ReadWriteLock rwLock = new ReentrantReadWriteLock();
// 缓存容器——不需要用 ConcurrentHashMap,因为读写锁已经保证了线程安全
private final Map<String, Object> cache = new HashMap<>();
/**
* 读操作——使用读锁
* 多个线程可以同时获取读锁,并发读取
*/
public Object get(String key) {
rwLock.readLock().lock(); // 获取读锁
try {
return cache.get(key);
} finally {
rwLock.readLock().unlock(); // 释放读锁
}
}
/**
* 写操作——使用写锁
* 写锁是排他的,同一时刻只有一个线程能写
*/
public void put(String key, Object value) {
rwLock.writeLock().lock(); // 获取写锁
try {
cache.put(key, value);
} finally {
rwLock.writeLock().unlock(); // 释放写锁
}
}
/**
* 锁降级模式:写锁 -> 读锁
*
* 场景:先更新缓存,然后在同一个原子操作中读取更新后的值
* 这样可以保证读到的一定是自己刚写入的值,不会被其他写线程覆盖
*/
public Object putAndGet(String key, Object value) {
rwLock.writeLock().lock(); // 第一步:获取写锁
try {
cache.put(key, value);
// 第二步:在持有写锁的同时获取读锁——这就是锁降级
rwLock.readLock().lock();
} finally {
// 第三步:释放写锁,此时仍然持有读锁
rwLock.writeLock().unlock();
}
// 此时只持有读锁,其他读线程可以并发访问
try {
return cache.get(key); // 保证读到的是自己刚写入的值
} finally {
rwLock.readLock().unlock(); // 释放读锁
}
}
}
⚠️ 注意:读写锁不支持锁升级(读锁 -> 写锁),只支持锁降级(写锁 -> 读锁)。 如果在持有读锁时尝试获取写锁,会导致永久阻塞(死锁)。
4.4 StampedLock — 邮戳锁
package com.example.concurrent.lock;
import java.util.concurrent.locks.StampedLock;
/**
* StampedLock(JDK 8 引入)
*
* 核心优势:乐观读——读操作不加锁!
* 在读多写少且写操作非常少的场景下,性能比 ReadWriteLock 更好
*
* ⚠️ 限制:
* 1. 不可重入——同一线程不能重复获取锁
* 2. 不支持 Condition
* 3. 使用复杂,容易写出 bug
* 4. 如果写操作不是特别少,反而可能比 ReadWriteLock 更慢
*/
public class StampedLockDemo {
private final StampedLock stampedLock = new StampedLock();
private double x, y; // 坐标点——模拟共享数据
/**
* 写操作——和普通排他锁一样
*/
public void move(double deltaX, double deltaY) {
// writeLock() 返回一个 stamp(邮戳),解锁时需要传入
long stamp = stampedLock.writeLock();
try {
x += deltaX;
y += deltaY;
} finally {
stampedLock.unlockWrite(stamp); // 解锁时必须传入获取锁时返回的 stamp
}
}
/**
* 乐观读——StampedLock 的核心特性
*
* 流程:
* 1. 先尝试乐观读(不加锁,只获取一个版本号)
* 2. 读取共享数据
* 3. 用版本号验证期间是否发生了写操作
* 4. 如果没有写操作,直接使用读取的数据(零开销!)
* 5. 如果发生了写操作,升级为悲观读锁重新读取
*/
public double distanceFromOrigin() {
// 第一步:尝试乐观读,获取版本号(stamp)
// 这一步不加锁,所以零开销
long stamp = stampedLock.tryOptimisticRead();
// 第二步:读取共享数据到局部变量
// ⚠️ 此时没有加锁,数据可能被写线程修改
double currentX = x;
double currentY = y;
// 第三步:验证版本号
// validate 返回 true 表示在读取期间没有写操作发生
if (!stampedLock.validate(stamp)) {
// 第四步:验证失败,说明有写操作干扰,升级为悲观读锁
stamp = stampedLock.readLock();
try {
// 重新读取数据——此时已加锁,数据一定是一致的
currentX = x;
currentY = y;
} finally {
stampedLock.unlockRead(stamp);
}
}
// 使用读取到的数据进行计算
return Math.sqrt(currentX * currentX + currentY * currentY);
}
}
4.5 锁选型对比
| 维度 | synchronized | ReentrantLock | ReadWriteLock | StampedLock |
|---|---|---|---|---|
| 实现层级 | JVM 内置 | JDK API | JDK API | JDK API |
| 可重入 | 是 | 是 | 是 | 否 |
| 公平锁 | 否 | 支持 | 支持 | 否 |
| 可中断 | 否 | 支持 | 支持 | 支持 |
| tryLock | 否 | 支持 | 支持 | 支持 |
| 条件变量 | 1 个(wait/notify) | 多个(Condition) | 不常用 | 不支持 |
| 读写分离 | 否 | 否 | 是 | 是(含乐观读) |
| 性能(低竞争) | 高(偏向锁优化) | 高 | 较高 | 最高 |
| 使用复杂度 | 低 | 中 | 中 | 高 |
| 适用场景 | 简单互斥 | 需要高级特性 | 读多写少 | 读极多写极少 |
| 推荐程度 | 首选 | 需要时用 | 合适场景用 | 谨慎使用 |
选型建议:能用
synchronized就用synchronized(简单不易出错)。 只有当你需要 tryLock、可中断、公平锁、多 Condition 等高级特性时,才选ReentrantLock。
五、并发工具类实战
5.1 CountDownLatch — 倒计时门闩
package com.example.concurrent.tools;
import java.util.concurrent.*;
import java.util.Map;
import java.util.HashMap;
/**
* CountDownLatch:等待多个任务全部完成后再继续
*
* 核心方法:
* - countDown():计数器减 1
* - await():阻塞等待计数器归零
*
* 真实场景:微服务接口聚合——等待所有下游服务返回后再组装响应
*
* ⚠️ 注意:CountDownLatch 是一次性的,计数器归零后不能重置
* 如果需要循环使用,请用 CyclicBarrier
*/
public class CountDownLatchDemo {
/**
* 场景:用户详情页需要聚合 4 个微服务的数据
*/
public Map<String, Object> getUserDetail(Long userId) throws Exception {
Map<String, Object> result = new ConcurrentHashMap<>();
// 计数器设为 4——需要等待 4 个服务全部返回
CountDownLatch latch = new CountDownLatch(4);
ExecutorService executor = Executors.newFixedThreadPool(4);
// 异步调用用户基础信息服务
executor.submit(() -> {
try {
result.put("userInfo", callUserService(userId));
} catch (Exception e) {
// 即使某个服务调用失败,也必须 countDown
// 否则 await 会永远阻塞
result.put("userInfo", "获取失败");
} finally {
// ⚠️ countDown 必须在 finally 中!
// 无论成功失败都要减计数,否则主线程永远等不到
latch.countDown();
}
});
// 异步调用订单服务
executor.submit(() -> {
try {
result.put("orders", callOrderService(userId));
} catch (Exception e) {
result.put("orders", "获取失败");
} finally {
latch.countDown();
}
});
// 异步调用积分服务
executor.submit(() -> {
try {
result.put("points", callPointsService(userId));
} catch (Exception e) {
result.put("points", "获取失败");
} finally {
latch.countDown();
}
});
// 异步调用优惠券服务
executor.submit(() -> {
try {
result.put("coupons", callCouponService(userId));
} catch (Exception e) {
result.put("coupons", "获取失败");
} finally {
latch.countDown();
}
});
// 主线程等待所有服务返回,最多等 5 秒
// ⚠️ 必须设置超时!否则某个服务挂了,主线程永远阻塞
boolean completed = latch.await(5, TimeUnit.SECONDS);
if (!completed) {
// 超时处理:记录日志,返回已获取的部分数据
System.err.println("部分服务调用超时,返回降级数据");
}
executor.shutdown();
return result;
}
// 模拟微服务调用
private Object callUserService(Long userId) throws Exception {
Thread.sleep(200);
return "用户信息";
}
private Object callOrderService(Long userId) throws Exception {
Thread.sleep(300);
return "订单列表";
}
private Object callPointsService(Long userId) throws Exception {
Thread.sleep(150);
return "积分信息";
}
private Object callCouponService(Long userId) throws Exception {
Thread.sleep(250);
return "优惠券列表";
}
}
5.2 CyclicBarrier — 循环栅栏
package com.example.concurrent.tools;
import java.util.concurrent.*;
import java.util.List;
import java.util.ArrayList;
/**
* CyclicBarrier:让多个线程在屏障点同步汇合
*
* 与 CountDownLatch 的区别:
* - CountDownLatch 是一次性的,CyclicBarrier 可以重复使用(reset)
* - CountDownLatch 是一个线程等多个线程,CyclicBarrier 是多个线程互相等
* - CyclicBarrier 可以在所有线程到达屏障后执行一个汇总动作(barrierAction)
*
* 真实场景:多线程分片统计,每个线程统计一部分数据,
* 所有线程统计完成后在屏障点汇总结果
*/
public class CyclicBarrierDemo {
/**
* 场景:多线程分片统计销售额报表
* 每个线程统计一个地区的数据,全部完成后汇总
*/
public void generateSalesReport() throws Exception {
// 分 3 个线程统计 3 个地区
int regionCount = 3;
// 存放各地区统计结果
List<Double> regionResults =
new CopyOnWriteArrayList<>(); // 线程安全的 List
// 创建 CyclicBarrier,第二个参数是屏障动作——所有线程到达后自动执行
CyclicBarrier barrier = new CyclicBarrier(regionCount, () -> {
// 这个动作由最后一个到达屏障的线程执行
// 汇总所有地区的销售额
double totalSales = regionResults.stream()
.mapToDouble(Double::doubleValue)
.sum();
System.out.println("全国销售总额汇总完成:" + totalSales + " 万元");
});
ExecutorService executor = Executors.newFixedThreadPool(regionCount);
String[] regions = {"华东", "华南", "华北"};
for (String region : regions) {
executor.submit(() -> {
try {
// 模拟各地区统计耗时不同
double sales = calculateRegionSales(region);
regionResults.add(sales);
System.out.println(region + " 地区统计完成:" + sales + " 万元");
// 到达屏障点,等待其他线程
// 当所有线程都调用 await() 后,屏障打开,执行屏障动作
barrier.await(10, TimeUnit.SECONDS);
// 屏障打开后,所有线程继续执行后续逻辑
System.out.println(region + " 地区开始生成详细报表...");
} catch (InterruptedException | BrokenBarrierException e) {
Thread.currentThread().interrupt();
System.err.println(region + " 地区统计异常");
} catch (TimeoutException e) {
System.err.println(region + " 地区统计超时");
}
});
}
executor.shutdown();
executor.awaitTermination(30, TimeUnit.SECONDS);
}
private double calculateRegionSales(String region) throws Exception {
// 模拟不同地区计算耗时不同
Thread.sleep((long) (Math.random() * 2000));
return Math.random() * 1000;
}
}
5.3 Semaphore — 信号量
package com.example.concurrent.tools;
import java.util.concurrent.*;
/**
* Semaphore:控制同时访问某个资源的线程数量
*
* 核心思想:维护一组"许可证",线程获取许可后才能执行,执行完归还许可
*
* 真实场景:
* 1. 数据库连接池限制并发连接数
* 2. 接口限流——同一时刻最多 N 个请求
* 3. 资源池管理
*/
public class SemaphoreDemo {
/**
* 场景一:接口限流
* 限制对第三方 API 的并发调用数,防止被限流或打垮下游
*/
public static class ThirdPartyApiLimiter {
// 最多同时 10 个线程调用第三方 API
// true = 公平模式,先来先服务,防止线程饥饿
private final Semaphore semaphore = new Semaphore(10, true);
public String callThirdPartyApi(String request) throws Exception {
// 尝试获取许可,最多等待 3 秒
// 如果 3 秒内没有空闲许可,说明并发太高,快速失败
if (!semaphore.tryAcquire(3, TimeUnit.SECONDS)) {
throw new RuntimeException("第三方 API 调用繁忙,请稍后重试");
}
try {
// 获取到许可,执行 API 调用
// 此时最多只有 10 个线程同时执行这段代码
return doCallApi(request);
} finally {
// ⚠️ 必须在 finally 中释放许可!
// 否则许可会越来越少,最终所有线程都无法获取许可
semaphore.release();
}
}
private String doCallApi(String request) throws Exception {
Thread.sleep(200); // 模拟 API 调用耗时
return "API 响应:" + request;
}
}
/**
* 场景二:简易数据库连接池
*/
public static class SimpleConnectionPool {
// 用 Semaphore 控制最大连接数
private final Semaphore semaphore;
// 用阻塞队列存放空闲连接
private final BlockingQueue<Object> pool;
public SimpleConnectionPool(int maxConnections) {
this.semaphore = new Semaphore(maxConnections);
this.pool = new LinkedBlockingQueue<>(maxConnections);
// 初始化连接池
for (int i = 0; i < maxConnections; i++) {
pool.offer(createConnection(i));
}
}
public Object getConnection(long timeout, TimeUnit unit)
throws Exception {
// 先获取许可——控制并发连接数
if (!semaphore.tryAcquire(timeout, unit)) {
throw new RuntimeException("获取数据库连接超时");
}
// 获取许可后,从池中取出一个连接
// 因为许可数 = 连接数,所以 poll 不会返回 null
return pool.poll();
}
public void releaseConnection(Object connection) {
// 归还连接到池中
pool.offer(connection);
// 释放许可——允许其他线程获取连接
semaphore.release();
}
private Object createConnection(int id) {
return "Connection-" + id;
}
}
}
5.4 CompletableFuture — 异步编排利器(重点)
5.4.1 基础异步操作
package com.example.concurrent.future;
import java.util.concurrent.*;
/**
* CompletableFuture(JDK 8)是 Java 异步编程的终极武器
*
* 相比 Future 的优势:
* 1. 链式调用——不需要嵌套 get()
* 2. 组合多个异步任务——allOf / anyOf
* 3. 异常处理——exceptionally / handle
* 4. 回调机制——不需要阻塞等待
*/
public class CompletableFutureBasicDemo {
// ⚠️ 核心:一定要自定义线程池!不要用默认的 ForkJoinPool.commonPool()
// 原因:
// 1. commonPool 的线程数 = CPU 核心数 - 1,IO 密集型任务会不够用
// 2. 所有没指定线程池的 CompletableFuture 共享 commonPool,互相影响
// 3. commonPool 中的线程是守护线程,JVM 退出时任务可能还没执行完
private static final ExecutorService ASYNC_POOL = new ThreadPoolExecutor(
8, 16, 60, TimeUnit.SECONDS,
new ArrayBlockingQueue<>(500),
r -> {
Thread t = new Thread(r);
t.setName("async-biz-" + t.getId());
t.setDaemon(false); // 非守护线程,确保任务执行完成
return t;
},
new ThreadPoolExecutor.CallerRunsPolicy()
);
public static void main(String[] args) throws Exception {
// 方式一:supplyAsync —— 有返回值的异步任务
CompletableFuture<String> future1 = CompletableFuture.supplyAsync(
() -> {
// 这段代码在 ASYNC_POOL 的线程中执行
return queryUserInfo(1001L);
},
ASYNC_POOL // ⚠️ 必须传入自定义线程池
);
// 方式二:runAsync —— 无返回值的异步任务
CompletableFuture<Void> future2 = CompletableFuture.runAsync(
() -> {
// 无返回值的异步操作,如发送通知
sendNotification("订单创建成功");
},
ASYNC_POOL
);
// 获取结果(带超时)
String userInfo = future1.get(3, TimeUnit.SECONDS);
System.out.println("用户信息:" + userInfo);
ASYNC_POOL.shutdown();
}
private static String queryUserInfo(Long userId) {
try { Thread.sleep(200); } catch (InterruptedException e) {
Thread.currentThread().interrupt();
}
return "User-" + userId;
}
private static void sendNotification(String message) {
try { Thread.sleep(100); } catch (InterruptedException e) {
Thread.currentThread().interrupt();
}
System.out.println("通知已发送:" + message);
}
}
5.4.2 链式调用
package com.example.concurrent.future;
import java.util.concurrent.*;
/**
* CompletableFuture 链式调用——告别回调地狱
*/
public class CompletableFutureChainDemo {
private static final ExecutorService POOL = new ThreadPoolExecutor(
4, 8, 60, TimeUnit.SECONDS,
new ArrayBlockingQueue<>(200),
r -> new Thread(r, "chain-demo-" + Thread.currentThread().getId()),
new ThreadPoolExecutor.CallerRunsPolicy()
);
public static void main(String[] args) throws Exception {
// ============ thenApply:同步转换结果(map 操作) ============
// 场景:查询用户 -> 获取用户等级 -> 计算折扣
CompletableFuture<Double> discountFuture = CompletableFuture
.supplyAsync(() -> queryUserId("张三"), POOL) // 第一步:查询用户ID
.thenApply(userId -> queryUserLevel(userId)) // 第二步:查询等级
.thenApply(level -> calculateDiscount(level)); // 第三步:计算折扣
System.out.println("折扣率:" + discountFuture.get());
// ============ thenCompose:异步扁平化(flatMap 操作) ============
// 当下一步操作本身也返回 CompletableFuture 时使用
// 如果用 thenApply 会得到 CompletableFuture<CompletableFuture<T>>
CompletableFuture<String> orderFuture = CompletableFuture
.supplyAsync(() -> queryUserId("张三"), POOL)
.thenCompose(userId -> queryUserOrdersAsync(userId));
// thenCompose 会自动"摊平"嵌套的 CompletableFuture
System.out.println("订单信息:" + orderFuture.get());
// ============ thenCombine:合并两个独立任务的结果 ============
// 场景:同时查询商品价格和用户折扣,然后计算最终价格
CompletableFuture<Double> priceFuture = CompletableFuture
.supplyAsync(() -> queryProductPrice("SKU-001"), POOL);
CompletableFuture<Double> userDiscountFuture = CompletableFuture
.supplyAsync(() -> queryUserDiscount(1001L), POOL);
CompletableFuture<Double> finalPriceFuture = priceFuture
.thenCombine(userDiscountFuture, (price, discount) -> {
// 两个任务都完成后,合并结果
return price * discount;
});
System.out.println("最终价格:" + finalPriceFuture.get());
// ============ thenAccept:消费结果(不返回新值) ============
CompletableFuture.supplyAsync(() -> queryUserId("张三"), POOL)
.thenAccept(userId -> {
// 消费结果,如记录日志、发送通知
System.out.println("查询到用户ID:" + userId);
});
// ============ thenRun:前一步完成后执行(不关心前一步的结果) ============
CompletableFuture.supplyAsync(() -> queryUserId("张三"), POOL)
.thenRun(() -> {
// 不需要前一步的结果,只是在它完成后触发
System.out.println("用户查询完毕,清理临时数据...");
});
Thread.sleep(1000);
POOL.shutdown();
}
// 模拟业务方法
private static Long queryUserId(String name) {
sleep(100);
return 1001L;
}
private static Integer queryUserLevel(Long userId) {
sleep(100);
return 5; // VIP 5
}
private static Double calculateDiscount(Integer level) {
return level >= 5 ? 0.8 : 0.95;
}
private static CompletableFuture<String> queryUserOrdersAsync(Long userId) {
// 返回 CompletableFuture——这就是为什么用 thenCompose 而不是 thenApply
return CompletableFuture.supplyAsync(
() -> "用户 " + userId + " 的订单列表",
POOL
);
}
private static Double queryProductPrice(String sku) {
sleep(150);
return 299.0;
}
private static Double queryUserDiscount(Long userId) {
sleep(100);
return 0.85;
}
private static void sleep(long ms) {
try { Thread.sleep(ms); } catch (InterruptedException e) {
Thread.currentThread().interrupt();
}
}
}
5.4.3 异常处理
package com.example.concurrent.future;
import java.util.concurrent.*;
/**
* CompletableFuture 异常处理
*
* ⚠️ 最大的坑:CompletableFuture 会「吞掉」异常!
* 如果你不调用 get()、join() 或注册异常处理器,异常会静默消失
* 线程池中的线程不会抛异常到控制台,你甚至不知道任务失败了
*/
public class CompletableFutureExceptionDemo {
private static final ExecutorService POOL = Executors.newFixedThreadPool(4);
public static void main(String[] args) throws Exception {
// ============ exceptionally:捕获异常并提供降级值 ============
// 类似 try-catch,只处理异常情况
CompletableFuture<String> future1 = CompletableFuture
.supplyAsync(() -> {
// 模拟:调用用户服务可能失败
if (Math.random() > 0.5) {
throw new RuntimeException("用户服务不可用");
}
return "用户信息";
}, POOL)
.exceptionally(throwable -> {
// 捕获上游的异常,返回降级数据
// throwable.getCause() 才是真正的异常(外层包了 CompletionException)
System.err.println("降级处理:" + throwable.getMessage());
return "默认用户信息"; // 降级值
});
System.out.println("结果:" + future1.get());
// ============ handle:同时处理正常结果和异常 ============
// 类似 try-catch-finally,无论成功失败都会执行
CompletableFuture<String> future2 = CompletableFuture
.supplyAsync(() -> {
if (Math.random() > 0.5) {
throw new RuntimeException("订单服务不可用");
}
return "订单信息";
}, POOL)
.handle((result, throwable) -> {
if (throwable != null) {
// 处理异常
System.err.println("异常:" + throwable.getMessage());
return "降级订单信息";
}
// 处理正常结果
return "处理后的 " + result;
});
System.out.println("结果:" + future2.get());
// ============ whenComplete:监听完成事件(不改变结果) ============
// 适用于:记录日志、发送通知等副作用操作
CompletableFuture<String> future3 = CompletableFuture
.supplyAsync(() -> "数据", POOL)
.whenComplete((result, throwable) -> {
// ⚠️ whenComplete 不能改变结果,只是观察
if (throwable != null) {
System.err.println("任务失败:" + throwable.getMessage());
// 可以在这里记录日志、发送告警
} else {
System.out.println("任务成功:" + result);
}
});
System.out.println("结果:" + future3.get());
POOL.shutdown();
}
}
5.4.4 多任务编排
package com.example.concurrent.future;
import java.util.concurrent.*;
import java.util.Map;
import java.util.HashMap;
/**
* CompletableFuture 多任务编排——企业级实战
*
* 场景:用户详情页接口聚合
* 需要并行调用:用户服务、订单服务、优惠券服务、积分服务、收货地址服务
*/
public class CompletableFutureOrchestrationDemo {
// ⚠️ 企业级必须自定义线程池
private static final ExecutorService BIZ_POOL = new ThreadPoolExecutor(
10, 20, 60, TimeUnit.SECONDS,
new ArrayBlockingQueue<>(500),
r -> {
Thread t = new Thread(r);
t.setName("biz-async-" + t.getId());
return t;
},
new ThreadPoolExecutor.CallerRunsPolicy()
);
/**
* allOf:等待所有任务完成
* 适用于:所有数据都需要才能返回的场景
*/
public static Map<String, Object> getUserDetailWithAllOf(Long userId)
throws Exception {
Map<String, Object> result = new ConcurrentHashMap<>();
// 创建 5 个异步任务
CompletableFuture<Void> userInfoFuture = CompletableFuture
.supplyAsync(() -> queryUserInfo(userId), BIZ_POOL)
.thenAccept(data -> result.put("userInfo", data))
.exceptionally(e -> {
result.put("userInfo", "获取失败"); // 单个服务失败不影响整体
return null;
});
CompletableFuture<Void> ordersFuture = CompletableFuture
.supplyAsync(() -> queryOrders(userId), BIZ_POOL)
.thenAccept(data -> result.put("orders", data))
.exceptionally(e -> {
result.put("orders", "获取失败");
return null;
});
CompletableFuture<Void> couponsFuture = CompletableFuture
.supplyAsync(() -> queryCoupons(userId), BIZ_POOL)
.thenAccept(data -> result.put("coupons", data))
.exceptionally(e -> {
result.put("coupons", "获取失败");
return null;
});
CompletableFuture<Void> pointsFuture = CompletableFuture
.supplyAsync(() -> queryPoints(userId), BIZ_POOL)
.thenAccept(data -> result.put("points", data))
.exceptionally(e -> {
result.put("points", "获取失败");
return null;
});
CompletableFuture<Void> addressesFuture = CompletableFuture
.supplyAsync(() -> queryAddresses(userId), BIZ_POOL)
.thenAccept(data -> result.put("addresses", data))
.exceptionally(e -> {
result.put("addresses", "获取失败");
return null;
});
// allOf:等待所有任务完成
CompletableFuture.allOf(
userInfoFuture, ordersFuture, couponsFuture,
pointsFuture, addressesFuture
).get(5, TimeUnit.SECONDS); // ⚠️ 必须设置超时
return result;
}
/**
* anyOf:任意一个任务完成就返回
* 适用于:多个备选方案,取最快返回的那个
* 场景:多机房部署,同时请求多个机房,取最快的响应
*/
public static String getDataFromFastestNode(String key) throws Exception {
CompletableFuture<String> node1 = CompletableFuture
.supplyAsync(() -> queryFromNode("北京机房", key), BIZ_POOL);
CompletableFuture<String> node2 = CompletableFuture
.supplyAsync(() -> queryFromNode("上海机房", key), BIZ_POOL);
CompletableFuture<String> node3 = CompletableFuture
.supplyAsync(() -> queryFromNode("广州机房", key), BIZ_POOL);
// anyOf:只要有一个完成就返回
Object result = CompletableFuture
.anyOf(node1, node2, node3)
.get(3, TimeUnit.SECONDS);
return (String) result;
}
// ===== 模拟服务调用 =====
private static String queryUserInfo(Long userId) {
sleep(200);
return "用户基础信息";
}
private static String queryOrders(Long userId) {
sleep(300);
return "订单列表";
}
private static String queryCoupons(Long userId) {
sleep(250);
return "优惠券列表";
}
private static String queryPoints(Long userId) {
sleep(150);
return "积分信息";
}
private static String queryAddresses(Long userId) {
sleep(180);
return "收货地址列表";
}
private static String queryFromNode(String nodeName, String key) {
long delay = (long) (Math.random() * 500); // 随机延迟模拟不同机房网络
sleep(delay);
return nodeName + " 返回数据:" + key;
}
private static void sleep(long ms) {
try { Thread.sleep(ms); } catch (InterruptedException e) {
Thread.currentThread().interrupt();
}
}
public static void main(String[] args) throws Exception {
// 测试 allOf
long start = System.currentTimeMillis();
Map<String, Object> detail = getUserDetailWithAllOf(1001L);
System.out.println("allOf 耗时:" + (System.currentTimeMillis() - start)
+ "ms");
System.out.println("聚合结果:" + detail);
// 测试 anyOf
start = System.currentTimeMillis();
String data = getDataFromFastestNode("product-001");
System.out.println("anyOf 耗时:" + (System.currentTimeMillis() - start)
+ "ms");
System.out.println("最快节点返回:" + data);
BIZ_POOL.shutdown();
}
}
5.4.5 CompletableFuture 常见陷阱
⚠️ 陷阱一:使用默认的 ForkJoinPool.commonPool()
// ❌ 错误写法:不传线程池,默认使用 ForkJoinPool.commonPool()
CompletableFuture.supplyAsync(() -> queryDatabase());
// ✅ 正确写法:传入自定义线程池
CompletableFuture.supplyAsync(() -> queryDatabase(), customPool);
为什么默认线程池是坑?
commonPool线程数 =CPU核心数 - 1,IO 密集型任务会严重不够用- 整个 JVM 所有使用默认线程池的 CompletableFuture 共享同一个池,互相影响
commonPool中的线程是守护线程,main 方法结束时任务可能还没执行完
⚠️ 陷阱二:异常被吞掉
// ❌ 错误写法:异常被静默吞掉,没有任何错误信息
CompletableFuture.supplyAsync(() -> {
throw new RuntimeException("业务异常");
}, pool);
// 如果不调用 get() 或 join(),这个异常永远不会被发现!
// ✅ 正确写法:必须注册异常处理
CompletableFuture.supplyAsync(() -> {
throw new RuntimeException("业务异常");
}, pool).exceptionally(e -> {
log.error("异步任务异常", e); // 记录日志
return defaultValue; // 返回降级值
});
⚠️ 陷阱三:thenApply vs thenApplyAsync 的线程切换
// thenApply:在上一步完成的线程中执行(可能是线程池线程)
future.thenApply(result -> process(result));
// thenApplyAsync:在指定线程池中执行
// 当后续操作也是耗时操作时,应该使用 Async 版本,避免占用上一步的线程
future.thenApplyAsync(result -> heavyProcess(result), pool);
六、并发容器选型
6.1 ConcurrentHashMap
package com.example.concurrent.container;
import java.util.concurrent.*;
import java.util.Collections;
import java.util.HashMap;
import java.util.Hashtable;
import java.util.Map;
/**
* ConcurrentHashMap 选型指南
*
* 为什么不用 Hashtable?
* - Hashtable 用一把大锁锁住整个表,并发性能极差
*
* 为什么不用 Collections.synchronizedMap()?
* - 同样是一把大锁,只是语法糖,本质和 Hashtable 一样
*
* ConcurrentHashMap 的优势:
* - JDK 7:分段锁(Segment),并发度 = 段数
* - JDK 8+:CAS + synchronized 锁单个桶,并发度 = 桶数(更细粒度)
*/
public class ConcurrentHashMapDemo {
/**
* ⚠️ 最常见的陷阱:复合操作不是原子的!
*
* 即使 ConcurrentHashMap 的单个方法是线程安全的,
* 但「先检查再操作」(check-then-act)组合起来不是原子的
*/
public static class UnsafeUsage {
private final ConcurrentHashMap<String, Integer> counter =
new ConcurrentHashMap<>();
// ❌ 错误写法:先 get 再 put 不是原子的
public void unsafeIncrement(String key) {
Integer value = counter.get(key); // 线程 A 读到 10
// ⚠️ 此处其他线程可能修改了值! // 线程 B 也读到 10
if (value == null) {
counter.put(key, 1); // 线程 A 写入 11
} else {
counter.put(key, value + 1); // 线程 B 也写入 11(应该是 12)
}
}
// ✅ 正确写法一:使用原子方法
public void safeIncrement(String key) {
// merge 是原子操作:如果 key 不存在则放入 1,存在则执行合并函数
counter.merge(key, 1, Integer::sum);
}
// ✅ 正确写法二:使用 compute
public void safeIncrementV2(String key) {
// compute 也是原子操作:根据 key 和旧值计算新值
counter.compute(key, (k, v) -> v == null ? 1 : v + 1);
}
}
}
三种线程安全 Map 对比:
| 维度 | Hashtable | synchronizedMap | ConcurrentHashMap |
|---|---|---|---|
| 锁粒度 | 整个表一把锁 | 整个表一把锁 | 单个桶级别 |
| 允许 null key/value | 否 | 取决于底层 Map | 否 |
| 迭代器 | fail-fast | fail-fast | 弱一致性(不抛异常) |
| 并发性能 | 差 | 差 | 优秀 |
| 适用场景 | 遗留代码 | 不推荐 | 推荐 |
6.2 CopyOnWriteArrayList
package com.example.concurrent.container;
import java.util.List;
import java.util.concurrent.CopyOnWriteArrayList;
/**
* CopyOnWriteArrayList:写时复制列表
*
* 核心原理:
* - 读操作:直接读底层数组,无需加锁(零开销)
* - 写操作:复制一份新数组,在新数组上修改,然后替换引用
*
* 适用场景:读远多于写(如监听器列表、黑名单列表、配置列表)
*
* ⚠️ 不适用场景:
* - 写操作频繁(每次写都要复制整个数组,性能极差)
* - 列表很大(复制开销大,内存占用翻倍)
*/
public class CopyOnWriteArrayListDemo {
/**
* 典型场景:事件监听器管理
* 注册/取消注册操作极少(写少),事件触发通知所有监听器很频繁(读多)
*/
public static class EventBus {
// 使用 CopyOnWriteArrayList 存储监听器
// 读操作(遍历通知)远多于写操作(注册/取消注册)
private final List<EventListener> listeners =
new CopyOnWriteArrayList<>();
// 注册监听器(写操作,偶尔发生)
public void register(EventListener listener) {
listeners.add(listener); // 底层会复制数组,但注册操作不频繁
}
// 取消注册(写操作,偶尔发生)
public void unregister(EventListener listener) {
listeners.remove(listener);
}
// 触发事件通知(读操作,频繁发生)
public void fireEvent(String event) {
// ⚠️ 遍历时不需要加锁!即使其他线程正在注册新的监听器
// 因为遍历的是当时的快照,不会被修改
for (EventListener listener : listeners) {
try {
listener.onEvent(event);
} catch (Exception e) {
// 一个监听器异常不应该影响其他监听器
System.err.println("监听器处理异常:" + e.getMessage());
}
}
}
}
// 监听器接口
public interface EventListener {
void onEvent(String event);
}
}
6.3 BlockingQueue 阻塞队列家族
package com.example.concurrent.container;
import java.util.concurrent.*;
/**
* BlockingQueue 家族——生产者-消费者模式的基础设施
*
* 核心特性:
* - 队列满时,put() 会阻塞生产者
* - 队列空时,take() 会阻塞消费者
* 这样就不需要手动写 wait/notify 了
*/
public class BlockingQueueDemo {
/**
* 场景:订单消息队列——生产者-消费者模式
*/
public static class OrderMessageQueue {
// 使用有界阻塞队列,防止生产者速度过快导致 OOM
private final BlockingQueue<String> queue;
public OrderMessageQueue(int capacity) {
// ArrayBlockingQueue:数组实现,有界,一把锁(生产和消费共享)
this.queue = new ArrayBlockingQueue<>(capacity);
}
/**
* 生产者:投递订单消息
*/
public void produce(String orderMessage) throws InterruptedException {
// put() 会在队列满时阻塞,直到有空间
// 比 offer() 安全——offer() 队列满时返回 false,消息可能丢失
queue.put(orderMessage);
System.out.println(Thread.currentThread().getName()
+ " 投递消息:" + orderMessage
+ ",队列大小:" + queue.size());
}
/**
* 消费者:消费订单消息
*/
public void consume() throws InterruptedException {
// take() 会在队列空时阻塞,直到有新消息
String message = queue.take();
System.out.println(Thread.currentThread().getName()
+ " 消费消息:" + message
+ ",队列剩余:" + queue.size());
// 处理消息...
}
/**
* 带超时的消费——避免永久阻塞
*/
public String consumeWithTimeout(long timeout, TimeUnit unit)
throws InterruptedException {
// poll(timeout) 在指定时间内等待,超时返回 null
return queue.poll(timeout, unit);
}
}
public static void main(String[] args) throws Exception {
OrderMessageQueue mq = new OrderMessageQueue(10);
// 启动 2 个生产者线程
for (int i = 0; i < 2; i++) {
final int producerId = i;
new Thread(() -> {
try {
for (int j = 0; j < 20; j++) {
mq.produce("订单-" + producerId + "-" + j);
Thread.sleep(50); // 模拟生产间隔
}
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
}
}, "producer-" + i).start();
}
// 启动 3 个消费者线程
for (int i = 0; i < 3; i++) {
new Thread(() -> {
try {
while (!Thread.currentThread().isInterrupted()) {
mq.consume();
Thread.sleep(100); // 模拟消费耗时
}
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
}
}, "consumer-" + i).start();
}
}
}
BlockingQueue 家族对比:
| 队列类型 | 底层结构 | 有界/无界 | 锁机制 | 适用场景 |
|---|---|---|---|---|
ArrayBlockingQueue | 数组 | 有界 | 一把锁(生产消费共享) | 通用场景,内存可控 |
LinkedBlockingQueue | 链表 | 可选有界(默认无界) | 两把锁(生产消费分离) | 吞吐量要求高 |
SynchronousQueue | 无缓冲 | 0 容量 | 无锁/CAS | 直接传递,如 CachedThreadPool |
PriorityBlockingQueue | 堆 | 无界 | 一把锁 | 需要优先级排序的场景 |
DelayQueue | 堆 | 无界 | 一把锁 | 延迟任务(如订单超时关闭) |
⚠️ 生产建议:
- 绝大多数场景用
ArrayBlockingQueue(有界、简单)- 需要更高吞吐量时用
LinkedBlockingQueue(记得设容量!)SynchronousQueue只在特殊场景使用(如线程池的 CachedThreadPool)
七、常见陷阱与避坑指南
7.1 死锁的四个条件及排查方法
package com.example.concurrent.pitfall;
/**
* 陷阱一:死锁
*
* 死锁的四个必要条件:
* 1. 互斥——资源一次只能被一个线程占用
* 2. 持有并等待——持有一个资源的同时等待另一个资源
* 3. 不可剥夺——已获取的资源不能被其他线程强制释放
* 4. 循环等待——线程之间形成环形等待链
*
* 只要破坏任意一个条件就能避免死锁
*/
public class DeadlockDemo {
private static final Object LOCK_A = new Object();
private static final Object LOCK_B = new Object();
/**
* ❌ 会死锁的代码
*/
public static void deadlockExample() {
// 线程 1:先锁 A 再锁 B
Thread t1 = new Thread(() -> {
synchronized (LOCK_A) {
System.out.println("线程1 获取了锁A");
try { Thread.sleep(100); } catch (InterruptedException e) {
Thread.currentThread().interrupt();
}
// 此时线程2 已经持有锁B,线程1 等待锁B -> 死锁
synchronized (LOCK_B) {
System.out.println("线程1 获取了锁B");
}
}
}, "thread-1");
// 线程 2:先锁 B 再锁 A(顺序相反 = 循环等待)
Thread t2 = new Thread(() -> {
synchronized (LOCK_B) {
System.out.println("线程2 获取了锁B");
try { Thread.sleep(100); } catch (InterruptedException e) {
Thread.currentThread().interrupt();
}
// 此时线程1 已经持有锁A,线程2 等待锁A -> 死锁
synchronized (LOCK_A) {
System.out.println("线程2 获取了锁A");
}
}
}, "thread-2");
t1.start();
t2.start();
}
/**
* ✅ 修复方案:统一加锁顺序(破坏循环等待条件)
* 所有线程都按 A -> B 的顺序加锁
*/
public static void fixedExample() {
Thread t1 = new Thread(() -> {
synchronized (LOCK_A) { // 先锁 A
synchronized (LOCK_B) { // 再锁 B
System.out.println("线程1 安全获取了两把锁");
}
}
}, "thread-1");
Thread t2 = new Thread(() -> {
synchronized (LOCK_A) { // 也是先锁 A(顺序一致)
synchronized (LOCK_B) { // 再锁 B
System.out.println("线程2 安全获取了两把锁");
}
}
}, "thread-2");
t1.start();
t2.start();
}
}
排查死锁的方法:
# 方法一:使用 jstack 导出线程堆栈
# 1. 找到 Java 进程 PID
jps -l
# 2. 导出线程堆栈
jstack <PID> > thread_dump.txt
# 3. 在输出中搜索 "deadlock"
# jstack 会自动检测死锁并输出如下信息:
# Found one Java-level deadlock:
# =============================
# "thread-2":
# waiting to lock monitor 0x00007f8b3c003898 (object 0x..., a java.lang.Object),
# which is held by "thread-1"
# "thread-1":
# waiting to lock monitor 0x00007f8b3c003948 (object 0x..., a java.lang.Object),
# which is held by "thread-2"
# 方法二:使用 Arthas(推荐)
# thread -b # 查找阻塞其他线程的线程
7.2 线程安全的假象:ConcurrentHashMap 的复合操作
package com.example.concurrent.pitfall;
import java.util.concurrent.ConcurrentHashMap;
/**
* 陷阱二:ConcurrentHashMap 的复合操作不是线程安全的
*
* ConcurrentHashMap 保证的是单个方法的线程安全
* 但是「先读后写」这种复合操作不是原子的
*/
public class CompoundOperationPitfall {
private final ConcurrentHashMap<String, Integer> map =
new ConcurrentHashMap<>();
/**
* ❌ 错误写法:先 get 再 put 不是原子的
*/
public void unsafeIncrement(String key) {
Integer oldValue = map.get(key);
// ⚠️ 这里存在竞态条件!
// 两个线程可能同时读到同一个 oldValue
// 然后各自 +1 写回,结果只加了 1 而不是 2
if (oldValue == null) {
map.put(key, 1);
} else {
map.put(key, oldValue + 1);
}
}
/**
* ✅ 正确写法一:使用 ConcurrentHashMap 的原子方法
*/
public void safeIncrementV1(String key) {
// merge 方法是原子的:key 不存在时放入 1,存在时执行 Integer::sum
map.merge(key, 1, Integer::sum);
}
/**
* ✅ 正确写法二:使用 compute
*/
public void safeIncrementV2(String key) {
// compute 方法也是原子的
map.compute(key, (k, v) -> v == null ? 1 : v + 1);
}
/**
* ✅ 正确写法三:putIfAbsent + replace(CAS 风格)
*/
public void safeIncrementV3(String key) {
// putIfAbsent:只有 key 不存在时才放入
map.putIfAbsent(key, 0);
// 自旋 CAS
while (true) {
Integer oldValue = map.get(key);
// replace(key, oldValue, newValue) 是原子的
// 只有当前值等于 oldValue 时才替换成 newValue
if (map.replace(key, oldValue, oldValue + 1)) {
break; // 替换成功,退出循环
}
// 替换失败说明被其他线程修改了,重新读取并重试
}
}
}
7.3 线程池异常丢失
package com.example.concurrent.pitfall;
import java.util.concurrent.*;
/**
* 陷阱三:线程池中的异常被静默吞掉
*
* 根源:
* - execute() 提交的任务:异常会导致线程终止,可以被 UncaughtExceptionHandler 捕获
* - submit() 提交的任务:异常被封装在 Future 中,只有调用 get() 时才会抛出
* 如果不调用 get(),异常永远不会被发现
*/
public class ThreadPoolExceptionPitfall {
public static void main(String[] args) throws Exception {
ExecutorService pool = Executors.newFixedThreadPool(2);
// ❌ submit() 方式:异常被吞掉,控制台无任何输出
pool.submit(() -> {
System.out.println("submit 任务开始执行");
throw new RuntimeException("submit 任务异常");
// 这个异常被封装在 Future 中,永远不会被发现
});
Thread.sleep(1000);
System.out.println("主线程继续执行——没有看到任何异常信息!");
// ✅ 修复方式一:使用 Future.get() 获取异常
Future<?> future = pool.submit(() -> {
throw new RuntimeException("submit 任务异常");
});
try {
future.get(); // 这里会抛出 ExecutionException
} catch (ExecutionException e) {
System.err.println("捕获到异常:" + e.getCause().getMessage());
}
// ✅ 修复方式二:使用 execute() 而非 submit()(针对不需要返回值的任务)
// execute() 提交的任务,异常会传播到 UncaughtExceptionHandler
// ✅ 修复方式三:在任务内部 try-catch
pool.submit(() -> {
try {
throw new RuntimeException("业务异常");
} catch (Exception e) {
// 在任务内部处理异常——最可靠的方式
System.err.println("任务内部捕获异常:" + e.getMessage());
// 记录日志、发送告警等
}
});
Thread.sleep(500);
pool.shutdown();
}
}
7.4 SimpleDateFormat 线程不安全
package com.example.concurrent.pitfall;
import java.text.SimpleDateFormat;
import java.time.LocalDateTime;
import java.time.format.DateTimeFormatter;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
/**
* 陷阱四:SimpleDateFormat 是线程不安全的
*
* 原因:SimpleDateFormat 内部有一个 Calendar 实例变量
* 多线程同时调用 format/parse 时会相互覆盖 Calendar 的状态
* 导致:格式化结果错误、NumberFormatException、ArrayIndexOutOfBoundsException
*/
public class SimpleDateFormatPitfall {
// ❌ 错误写法:多个线程共享同一个 SimpleDateFormat
private static final SimpleDateFormat UNSAFE_SDF =
new SimpleDateFormat("yyyy-MM-dd HH:mm:ss");
// ✅ 修复方式一:使用 ThreadLocal(JDK 7 及以下推荐)
private static final ThreadLocal<SimpleDateFormat> SAFE_SDF =
ThreadLocal.withInitial(
() -> new SimpleDateFormat("yyyy-MM-dd HH:mm:ss")
);
// ✅ 修复方式二:使用 DateTimeFormatter(JDK 8+ 强烈推荐)
// DateTimeFormatter 是不可变的、线程安全的
private static final DateTimeFormatter FORMATTER =
DateTimeFormatter.ofPattern("yyyy-MM-dd HH:mm:ss");
public static void main(String[] args) {
ExecutorService pool = Executors.newFixedThreadPool(10);
for (int i = 0; i < 100; i++) {
pool.submit(() -> {
// ❌ 这样会产生各种奇怪的异常或错误结果
// String result = UNSAFE_SDF.format(new java.util.Date());
// ✅ 方式一:ThreadLocal
// String result = SAFE_SDF.get()
// .format(new java.util.Date());
// ✅ 方式二:DateTimeFormatter(推荐)
String result = LocalDateTime.now().format(FORMATTER);
System.out.println(Thread.currentThread().getName()
+ " -> " + result);
});
}
pool.shutdown();
}
}
7.5 Double-Checked Locking 必须配合 volatile
package com.example.concurrent.pitfall;
/**
* 陷阱五:双重检查锁定(DCL)单例模式必须使用 volatile
*
* 原因:对象创建不是原子操作,分三步:
* 1. 分配内存空间
* 2. 初始化对象
* 3. 将引用指向内存空间
*
* 由于 CPU 指令重排序,步骤 2 和 3 可能颠倒:
* 1. 分配内存空间
* 3. 将引用指向内存空间(此时对象还未初始化!)
* 2. 初始化对象
*
* 此时另一个线程在第一次 null 检查时看到 instance 不为 null,
* 直接返回了一个未初始化的对象——NPE 或数据不一致
*/
public class DoubleCheckedLockingPitfall {
// ❌ 错误写法:没有 volatile,可能返回未初始化的对象
// private static DoubleCheckedLockingPitfall instance;
// ✅ 正确写法:volatile 禁止指令重排序
// volatile 保证:引用赋值一定在对象初始化之后
private static volatile DoubleCheckedLockingPitfall instance;
private DoubleCheckedLockingPitfall() {
// 私有构造函数
}
public static DoubleCheckedLockingPitfall getInstance() {
// 第一次检查:避免每次都加锁(性能优化)
if (instance == null) {
synchronized (DoubleCheckedLockingPitfall.class) {
// 第二次检查:防止多个线程同时通过第一次检查后重复创建
if (instance == null) {
// ⚠️ 这一行不是原子操作!
// 没有 volatile 时,其他线程可能看到半初始化的对象
instance = new DoubleCheckedLockingPitfall();
}
}
}
return instance;
}
/**
* ✅ 更推荐的方式:静态内部类单例(JVM 保证线程安全,无需 volatile/synchronized)
*/
public static class BetterSingleton {
private BetterSingleton() {}
// 静态内部类在第一次被使用时才加载
// 类加载过程由 JVM 保证线程安全(ClassLoader 加锁)
private static class Holder {
private static final BetterSingleton INSTANCE =
new BetterSingleton();
}
public static BetterSingleton getInstance() {
return Holder.INSTANCE; // 触发 Holder 类加载
}
}
}
7.6 线程池参数配置不当导致 OOM
package com.example.concurrent.pitfall;
import java.util.concurrent.*;
/**
* 陷阱六:线程池参数配置不当导致 OOM
*
* 常见错误:
* 1. 使用无界队列(默认 LinkedBlockingQueue)
* 2. maximumPoolSize 设置过大
* 3. 忘记设置拒绝策略
*/
public class ThreadPoolOOMPitfall {
/**
* ❌ 错误配置:无界队列 + 大量慢任务 = OOM
*/
public static void wrongConfig() {
ThreadPoolExecutor pool = new ThreadPoolExecutor(
2, 4, 60, TimeUnit.SECONDS,
new LinkedBlockingQueue<>() // ⚠️ 默认容量 Integer.MAX_VALUE
);
// 大量慢任务提交后,全部堆积在队列中
for (int i = 0; i < 1_000_000; i++) {
pool.submit(() -> {
try { Thread.sleep(10_000); } catch (InterruptedException e) {
Thread.currentThread().interrupt();
}
});
}
// 结果:队列中 100 万个 Runnable 对象 -> OOM
}
/**
* ✅ 正确配置
*/
public static void correctConfig() {
ThreadPoolExecutor pool = new ThreadPoolExecutor(
4, // 核心线程数:根据 CPU 核心数设置
8, // 最大线程数:预留弹性
60, TimeUnit.SECONDS, // 空闲超时:60秒回收临时线程
new ArrayBlockingQueue<>(200), // ⚠️ 有界队列!容量根据业务评估
r -> {
Thread t = new Thread(r);
t.setName("biz-pool-" + t.getId()); // 必须命名
return t;
},
new ThreadPoolExecutor.CallerRunsPolicy() // 拒绝策略:调用者执行
);
// 即使提交 100 万个任务:
// 1. 队列最多 200 个
// 2. 超出后 CallerRunsPolicy 让调用者线程执行——天然限流
// 3. 不会 OOM
}
}
7.7 ThreadLocal 内存泄漏
package com.example.concurrent.pitfall;
import java.text.SimpleDateFormat;
/**
* 陷阱七:ThreadLocal 内存泄漏
*
* 原因:
* ThreadLocal 的实现是每个 Thread 内部有一个 ThreadLocalMap,
* key 是 ThreadLocal 的弱引用,value 是强引用。
*
* 当 ThreadLocal 变量被回收后:
* - key 变成 null(弱引用被 GC)
* - 但 value 仍然被 Thread -> ThreadLocalMap -> Entry -> value 强引用
* - 只要线程不终止,value 就不会被 GC -> 内存泄漏
*
* ⚠️ 在线程池中特别危险!
* 因为线程池中的线程是复用的、长期存活的,ThreadLocal 的 value 永远不会被回收
*/
public class ThreadLocalLeakPitfall {
// ❌ 错误用法:在线程池场景下不调用 remove()
private static final ThreadLocal<SimpleDateFormat> DATE_FORMAT =
ThreadLocal.withInitial(
() -> new SimpleDateFormat("yyyy-MM-dd")
);
/**
* ❌ 错误写法:使用完不清理
*/
public void unsafeUsage() {
SimpleDateFormat sdf = DATE_FORMAT.get();
String result = sdf.format(new java.util.Date());
// 用完就走了,没有调用 remove()
// 线程池中的线程复用时,上一个任务的 value 还残留在 ThreadLocal 中
// 可能导致:内存泄漏、数据串号(上一个请求的用户信息被下一个请求读到)
}
/**
* ✅ 正确写法:用完必须 remove
*/
public void safeUsage() {
try {
SimpleDateFormat sdf = DATE_FORMAT.get();
String result = sdf.format(new java.util.Date());
System.out.println(result);
} finally {
// ⚠️ 必须在 finally 中调用 remove()!
// 清除当前线程的 ThreadLocal 值,防止内存泄漏和数据串号
DATE_FORMAT.remove();
}
}
/**
* ✅ 更好的方案:不用 ThreadLocal,直接用线程安全的 DateTimeFormatter
* JDK 8+ 的 DateTimeFormatter 是不可变的、线程安全的
*/
}
7.8 Spring @Async 默认线程池的坑
package com.example.concurrent.pitfall;
import org.springframework.scheduling.annotation.Async;
import org.springframework.stereotype.Service;
/**
* 陷阱八:Spring @Async 默认线程池的坑
*
* ⚠️ Spring @Async 默认使用 SimpleAsyncTaskExecutor
* 这个"线程池"其实根本不是线程池!它每次都创建一个新线程,不会复用!
*
* 风险:
* 1. 无限制创建线程 -> 大量请求时线程数爆炸 -> OOM
* 2. 没有队列 -> 无法缓冲突发流量
* 3. 没有拒绝策略 -> 过载时无法限流
*/
@Service
public class SpringAsyncPitfall {
/**
* ❌ 错误用法:使用默认线程池
*/
@Async // 默认使用 SimpleAsyncTaskExecutor——不是真正的线程池!
public void unsafeAsyncMethod() {
// 每次调用都会创建一个新线程
System.out.println("这个线程不会被复用:"
+ Thread.currentThread().getName());
}
/**
* ✅ 正确用法:指定自定义线程池
* 需要先在配置类中定义名为 "orderProcessExecutor" 的线程池 Bean
* 见 3.4.2 节的 ThreadPoolConfig
*/
@Async("orderProcessExecutor") // 明确指定使用哪个线程池
public void safeAsyncMethod() {
System.out.println("使用自定义线程池:"
+ Thread.currentThread().getName());
}
}
八、与 Spring 生态配合
8.1 @Async 正确使用方式
8.1.1 完整配置
package com.example.concurrent.spring;
import org.springframework.aop.interceptor.AsyncUncaughtExceptionHandler;
import org.springframework.context.annotation.Configuration;
import org.springframework.scheduling.annotation.AsyncConfigurer;
import org.springframework.scheduling.annotation.EnableAsync;
import org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor;
import java.lang.reflect.Method;
import java.util.concurrent.Executor;
import java.util.concurrent.ThreadPoolExecutor;
/**
* @Async 完整配置
*
* 步骤:
* 1. @EnableAsync 开启异步支持
* 2. 实现 AsyncConfigurer 自定义默认线程池和异常处理器
* 3. 定义业务专用线程池 Bean
*/
@Configuration
@EnableAsync // 开启 @Async 注解支持
public class AsyncConfig implements AsyncConfigurer {
/**
* 自定义默认异步线程池——替换掉 SimpleAsyncTaskExecutor
* 当 @Async 不指定线程池名称时使用这个
*/
@Override
public Executor getAsyncExecutor() {
ThreadPoolTaskExecutor executor = new ThreadPoolTaskExecutor();
executor.setCorePoolSize(8);
executor.setMaxPoolSize(16);
executor.setQueueCapacity(500);
executor.setKeepAliveSeconds(60);
executor.setThreadNamePrefix("default-async-");
executor.setRejectedExecutionHandler(
new ThreadPoolExecutor.CallerRunsPolicy()
);
// ⚠️ 设置优雅关闭
executor.setWaitForTasksToCompleteOnShutdown(true);
executor.setAwaitTerminationSeconds(60);
executor.initialize();
return executor;
}
/**
* 自定义异步异常处理器
* 处理 @Async 方法中抛出的、没有被 Future 捕获的异常
*
* ⚠️ 注意:只对返回 void 的 @Async 方法生效
* 返回 Future/CompletableFuture 的方法,异常会封装在 Future 中
*/
@Override
public AsyncUncaughtExceptionHandler getAsyncUncaughtExceptionHandler() {
return new AsyncUncaughtExceptionHandler() {
@Override
public void handleUncaughtException(Throwable ex,
Method method,
Object... params) {
// 记录异常日志
System.err.println("@Async 方法异常:"
+ method.getDeclaringClass().getName()
+ "#" + method.getName());
System.err.println("异常信息:" + ex.getMessage());
// 实际项目中:发送告警通知
// alertService.sendAlert("@Async异常", ex);
}
};
}
}
8.1.2 @Async 方法的返回值
package com.example.concurrent.spring;
import org.springframework.scheduling.annotation.Async;
import org.springframework.scheduling.annotation.AsyncResult;
import org.springframework.stereotype.Service;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.Future;
/**
* @Async 方法的三种返回值类型
*/
@Service
public class AsyncService {
/**
* 返回 void:发出去就不管了,"发射后遗忘"模式
*
* 适用场景:发送通知、记录日志等不需要等待结果的操作
* ⚠️ 异常只能通过 AsyncUncaughtExceptionHandler 捕获
*/
@Async("orderProcessExecutor")
public void sendNotification(Long userId, String message) {
// 异步发送通知,调用方不等待结果
System.out.println("异步发送通知给用户:" + userId);
}
/**
* 返回 Future<T>:可以获取异步结果
*
* 适用场景:需要获取异步执行结果的场景
* ⚠️ 但 Future 的 API 比较简陋,推荐用 CompletableFuture
*/
@Async("orderProcessExecutor")
public Future<String> asyncQueryWithFuture(Long userId) {
String result = "用户 " + userId + " 的信息";
// 使用 AsyncResult 包装返回值
return new AsyncResult<>(result);
}
/**
* 返回 CompletableFuture<T>:推荐方式
*
* 适用场景:需要链式调用、组合多个异步结果
* CompletableFuture 功能更强大,支持 thenApply、thenCompose 等操作
*/
@Async("orderProcessExecutor")
public CompletableFuture<String> asyncQueryWithCompletableFuture(
Long userId) {
try {
Thread.sleep(200); // 模拟耗时操作
String result = "用户 " + userId + " 的详细信息";
// CompletableFuture.completedFuture 直接包装已完成的结果
return CompletableFuture.completedFuture(result);
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
// 异常情况返回失败的 Future
CompletableFuture<String> failedFuture = new CompletableFuture<>();
failedFuture.completeExceptionally(e);
return failedFuture;
}
}
}
8.1.3 @Async 使用注意事项
⚠️ 注意事项一:@Async 方法必须通过代理调用,同类内部调用无效。
@Service
public class OrderService {
// ❌ 错误:同一个类中直接调用 @Async 方法,不走代理,不会异步执行
public void createOrder() {
sendOrderNotification(); // 这是 this.sendOrderNotification()
// 直接调用 this 的方法,不经过 Spring AOP 代理
// 所以 @Async 注解不会生效,还是同步执行!
}
@Async
public void sendOrderNotification() {
System.out.println("发送订单通知");
}
}
// ✅ 正确:@Async 方法放在另一个 Bean 中,通过注入调用
@Service
public class OrderServiceFixed {
// 注入通知服务——调用时走 Spring 代理
private final NotificationService notificationService;
public OrderServiceFixed(NotificationService notificationService) {
this.notificationService = notificationService;
}
public void createOrder() {
// 通过注入的 Bean 调用——走 AOP 代理,@Async 生效
notificationService.sendOrderNotification();
}
}
@Service
class NotificationService {
@Async("orderProcessExecutor")
public void sendOrderNotification() {
System.out.println("异步发送订单通知");
}
}
⚠️ 注意事项二:@Async 方法不能是 private 或 static。 原因:Spring AOP 基于代理实现,private/static 方法无法被代理。
8.2 事务与并发
8.2.1 @Transactional 跨线程失效
package com.example.concurrent.spring;
import org.springframework.stereotype.Service;
import org.springframework.transaction.annotation.Transactional;
import java.util.List;
import java.util.concurrent.*;
/**
* ⚠️ 核心问题:@Transactional 是基于 ThreadLocal 实现的
*
* Spring 的事务管理器将事务信息(Connection)存储在当前线程的 ThreadLocal 中。
* 子线程拿不到父线程的 ThreadLocal,因此在子线程中:
* - 没有事务上下文
* - 每个子线程各自开启独立连接
* - 主线程回滚时,子线程的操作不会回滚
*/
@Service
public class TransactionConcurrencyPitfall {
// 假设注入了 OrderRepository 和 InventoryRepository
// private final OrderRepository orderRepository;
// private final InventoryRepository inventoryRepository;
/**
* ❌ 错误写法:多线程中使用 @Transactional 期望全部回滚
*/
@Transactional
public void batchCreateOrders(List<String> orderIds) {
ExecutorService pool = Executors.newFixedThreadPool(4);
List<Future<?>> futures = new java.util.ArrayList<>();
for (String orderId : orderIds) {
futures.add(pool.submit(() -> {
// ⚠️ 这里运行在子线程中!
// 子线程没有主线程的事务上下文
// 每个子线程使用独立的数据库连接
// 即使主线程回滚,子线程已提交的数据不会回滚
createSingleOrder(orderId);
}));
}
// 等待所有任务完成
for (Future<?> future : futures) {
try {
future.get();
} catch (Exception e) {
// 即使这里抛异常触发回滚
// 子线程中已经 commit 的数据也无法回滚!
throw new RuntimeException("批量创建失败", e);
}
}
}
private void createSingleOrder(String orderId) {
// 数据库操作
System.out.println("创建订单:" + orderId);
}
/**
* ✅ 解决方案一:分批处理 + 补偿机制
* 不追求全局事务,而是单个订单独立事务 + 失败重试/补偿
*/
public void batchCreateOrdersV2(List<String> orderIds) {
ExecutorService pool = Executors.newFixedThreadPool(4);
List<Future<Boolean>> futures = new java.util.ArrayList<>();
for (String orderId : orderIds) {
futures.add(pool.submit(() -> {
try {
// 每个订单在自己的线程中独立开启事务
createSingleOrderWithTransaction(orderId);
return true;
} catch (Exception e) {
// 记录失败的订单,后续补偿
System.err.println("订单创建失败:" + orderId);
return false;
}
}));
}
// 收集失败的订单,写入补偿表
List<String> failedOrders = new java.util.ArrayList<>();
for (int i = 0; i < futures.size(); i++) {
try {
if (!futures.get(i).get()) {
failedOrders.add(orderIds.get(i));
}
} catch (Exception e) {
failedOrders.add(orderIds.get(i));
}
}
if (!failedOrders.isEmpty()) {
// 将失败的订单写入补偿表,由定时任务重试
System.err.println("以下订单需要补偿处理:" + failedOrders);
// compensationService.save(failedOrders);
}
pool.shutdown();
}
// 这个方法应该由 Spring 管理事务(通过另一个 Bean 调用)
@Transactional
public void createSingleOrderWithTransaction(String orderId) {
createSingleOrder(orderId);
}
/**
* ✅ 解决方案二:编程式事务
* 使用 TransactionTemplate 在子线程中手动管理事务
*/
// 需要注入 TransactionTemplate
// private final TransactionTemplate transactionTemplate;
//
// public void batchWithProgrammaticTx(List<String> orderIds) {
// ExecutorService pool = Executors.newFixedThreadPool(4);
// for (String orderId : orderIds) {
// pool.submit(() -> {
// // 在子线程中使用编程式事务
// transactionTemplate.execute(status -> {
// try {
// createSingleOrder(orderId);
// return true;
// } catch (Exception e) {
// status.setRollbackOnly(); // 手动标记回滚
// return false;
// }
// });
// });
// }
// pool.shutdown();
// }
}
8.3 并发场景下的 Bean 线程安全
package com.example.concurrent.spring;
import org.springframework.stereotype.Service;
import org.springframework.web.context.annotation.RequestScope;
import java.util.ArrayList;
import java.util.List;
/**
* ⚠️ Spring Bean 默认是单例的(Singleton)
*
* 单例意味着:所有线程(所有 HTTP 请求)共享同一个实例
* 如果 Bean 内部有可变状态(成员变量),就存在线程安全问题
*/
// ❌ 错误示例:有状态的单例 Bean
@Service
class UnsafeOrderService {
// ⚠️ 可变的成员变量——所有线程共享!
// 请求 A 正在处理时,请求 B 可能修改这个变量
private List<String> processingOrders = new ArrayList<>();
// ⚠️ 这也是可变状态
private int totalCount = 0;
public void processOrder(String orderId) {
// 多个线程同时操作同一个 ArrayList——线程不安全
processingOrders.add(orderId);
totalCount++; // 非原子操作——竞态条件
}
}
// ✅ 解决方案一:无状态设计(最推荐)
@Service
class SafeOrderServiceV1 {
// 没有可变的成员变量——天然线程安全
// 所有数据通过方法参数传入,通过返回值输出
public String processOrder(String orderId) {
// 所有变量都是局部的——每个线程有自己的栈帧,互不干扰
List<String> localList = new ArrayList<>();
localList.add(orderId);
return "处理完成:" + orderId;
}
}
// ✅ 解决方案二:使用 ThreadLocal 存储请求级别的状态
@Service
class SafeOrderServiceV2 {
// ThreadLocal 为每个线程维护独立的副本
private final ThreadLocal<List<String>> processingOrders =
ThreadLocal.withInitial(ArrayList::new);
public void processOrder(String orderId) {
try {
// 每个线程操作自己的 List,互不干扰
processingOrders.get().add(orderId);
} finally {
// ⚠️ 使用完必须清理!防止线程池复用时数据泄漏
processingOrders.remove();
}
}
}
// ✅ 解决方案三:使用 Request Scope(Web 场景)
@Service
@RequestScope // 每个 HTTP 请求创建一个新实例
class SafeOrderServiceV3 {
// 每个请求有自己的实例,成员变量不会被共享
private final List<String> processingOrders = new ArrayList<>();
public void processOrder(String orderId) {
// 安全:这个实例只属于当前请求
processingOrders.add(orderId);
}
}
三种解决方案对比:
| 方案 | 实现方式 | 优点 | 缺点 | 推荐场景 |
|---|---|---|---|---|
| 无状态设计 | 不存储可变成员变量 | 最简单、最安全 | 无法缓存中间状态 | 大部分业务场景 |
| ThreadLocal | 线程隔离的变量副本 | 可以存储线程私有状态 | 必须手动清理、内存泄漏风险 | 日志跟踪、用户上下文传递 |
| Request Scope | 每个请求一个实例 | 自然隔离、Spring 自动管理 | 性能略差(频繁创建实例)、注入限制 | Web 请求级别状态管理 |
附录:快速选型决策树
需要并发执行任务?
├── 简单的异步执行(不需要返回值)
│ └── 使用线程池 + Runnable
│
├── 需要获取异步结果?
│ ├── 单个异步任务 -> Callable + Future
│ ├── 多个任务编排 -> CompletableFuture(推荐)
│ └── Spring 环境 -> @Async + CompletableFuture
│
├── 需要控制并发数?
│ └── Semaphore
│
├── 需要等待多个任务完成?
│ ├── 一次性等待 -> CountDownLatch
│ └── 可重复使用 -> CyclicBarrier
│
├── 需要线程安全的集合?
│ ├── Map -> ConcurrentHashMap
│ ├── List(读多写少) -> CopyOnWriteArrayList
│ └── 队列(生产者-消费者) -> BlockingQueue
│
└── 需要互斥访问?
├── 简单场景 -> synchronized
├── 需要 tryLock/公平锁/多 Condition -> ReentrantLock
├── 读多写少 -> ReadWriteLock
└── 读极多写极少 -> StampedLock
附录:核心 API 速查表
| 类/接口 | 核心方法 | 说明 |
|---|---|---|
ExecutorService | submit(), execute(), shutdown() | 线程池提交任务 |
ThreadPoolExecutor | 7 参数构造函数 | 自定义线程池 |
CompletableFuture | supplyAsync(), thenApply(), allOf() | 异步编排 |
CountDownLatch | countDown(), await() | 等待多任务完成 |
CyclicBarrier | await() | 多线程同步汇合 |
Semaphore | acquire(), release(), tryAcquire() | 控制并发数 |
ReentrantLock | lock(), tryLock(), lockInterruptibly() | 显式锁 |
ReadWriteLock | readLock(), writeLock() | 读写分离锁 |
ConcurrentHashMap | putIfAbsent(), compute(), merge() | 线程安全 Map |
BlockingQueue | put(), take(), offer(), poll() | 阻塞队列 |
ThreadLocal | get(), set(), remove() | 线程本地变量 |
最后的话:并发编程的核心不是记住 API,而是理解什么时候该用、什么时候不该用。 能不用锁就不用锁(无状态设计),能用
synchronized就不用ReentrantLock, 能用CompletableFuture就不要手动管理线程。 简单即正确,正确即性能。