news 2026/5/28 1:21:05

Java 异步编程之 Thread、Runnable、Callable、CompletableFuture 与线程池实战

作者头像

张小明

前端开发工程师

1.2k 24
文章封面图
Java 异步编程之 Thread、Runnable、Callable、CompletableFuture 与线程池实战

一、为什么需要异步编程?

在后端开发中,一个接口里经常会做很多事情。

比如用户下单:

1. 创建订单
2. 扣减库存
3. 扣减余额
4. 发送短信
5. 写操作日志
6. 通知第三方系统

其中:创建订单、扣库存、扣余额 属于核心流程,通常需要同步执行。

但是:发送短信、写日志、通知第三方 不一定要阻塞主流程,可以考虑异步执行。

异步编程的价值是:

提升接口响应速度
提高系统吞吐量
减少主线程等待时间
更好地利用 CPU 和 IO 资源

但是异步也不是万能的,它会带来:

异常处理复杂
事务边界复杂
线程安全问题
线程池资源管理问题

所以学习异步编程,不能只会写代码,还要理解它的底层逻辑和适用场景。

二、Java 线程基础:Thread

Thread是 Java 中表示线程的类。

最基础的创建线程方式是继承Thread,重写run()方法。

1. 继承 Thread 创建线程

public class MyThread extends Thread { @Override public void run() { System.out.println("子线程执行:" + Thread.currentThread().getName()); } public static void main(String[] args) { MyThread thread = new MyThread(); thread.start(); System.out.println("主线程执行:" + Thread.currentThread().getName()); } }

可能输出:

主线程执行:main
子线程执行:Thread-0

线程执行顺序不是固定的,具体由操作系统调度决定。

2. start() 和 run() 的区别

thread.start(); 表示真正启动一个新线程。

thread.run(); 只是普通方法调用,不会创建新线程。

3. 为什么不推荐直接继承 Thread?

虽然继承Thread可以创建线程,但实际开发中不推荐。

原因有三个:

1. Java 是单继承,继承 Thread 后就不能继承其他类。
2. 线程和任务耦合,不利于代码复用。
3. 生产环境更推荐使用线程池,而不是频繁 new Thread。

三、任务模型一:Runnable

Runnable是一个任务接口,表示一个没有返回值的任务。

@FunctionalInterface public interface Runnable { void run(); }

它只有一个run()方法。

1. 实现 Runnable

public class SendSmsTask implements Runnable { @Override public void run() { System.out.println("发送短信:" + Thread.currentThread().getName()); } public static void main(String[] args) { SendSmsTask task = new SendSmsTask(); Thread thread = new Thread(task); thread.start(); } }

这里的结构比继承Thread更清晰:

SendSmsTask 负责描述任务
Thread 负责执行任务。

2. 匿名内部类写法

public class RunnableDemo { public static void main(String[] args) { Runnable task = new Runnable() { @Override public void run() { System.out.println("异步写日志:" + Thread.currentThread().getName()); } }; new Thread(task).start(); } }

3. Lambda 写法

因为Runnable是函数式接口,所以可以用 Lambda 简化:

public class RunnableLambdaDemo { public static void main(String[] args) { new Thread(() -> { System.out.println("异步任务执行:" + Thread.currentThread().getName()); }).start(); } }

Lambda 只是简化写法,本质还是实现Runnablerun()方法。

4. Runnable 的特点

没有返回值
不能直接抛出受检异常
可以交给 Thread 执行
可以交给线程池执行

四、任务模型二:Callable

Runnable没有返回值,如果我们希望异步任务执行完成后返回一个结果,就可以使用Callable

@FunctionalInterface public interface Callable<V> { V call() throws Exception; }

Callable有两个特点:

有返回值
可以抛出异常

1. Callable 示例

import java.util.concurrent.Callable; public class QueryUserTask implements Callable<String> { @Override public String call() throws Exception { System.out.println("查询用户信息:" + Thread.currentThread().getName()); Thread.sleep(1000); return "用户信息:张三"; } }

但是注意,Thread不能直接执行Callable

下面这种写法是错误的:

FutureTask<String> task = new FutureTask(new QueryUserTask()); Thread thread = new Thread(task); // 编译错误

原因是 thread 的构造方法接收 Runnable,不接收 Callable。

所以Callable如果想交给Thread执行,需要借助FutureTask

FutureTask<String> futureTask = new QueryUserTask<>(); Thread thread = new Thread(futureTask); thread.start();

六、FutureTask:连接 Callable 和 Thread 的桥梁

FutureTask很重要。

它既是Runnable,又是Future

源码关系大概是:

public class FutureTask<V> implements RunnableFuture<V> { }

而:

public interface RunnableFuture<V> extends Runnable, Future<V> { void run(); }

所以:

FutureTask 可以被 Thread 执行。 FutureTask 也可以通过 get() 获取结果。

1. Callable + FutureTask + Thread

import java.util.concurrent.Callable; import java.util.concurrent.FutureTask; public class FutureTaskDemo { public static void main(String[] args) throws Exception { Callable<String> callable = new Callable<String>() { @Override public String call() throws Exception { System.out.println("子线程开始查询用户:" + Thread.currentThread().getName()); Thread.sleep(2000); return "用户信息:张三"; } }; FutureTask<String> futureTask = new FutureTask<>(callable); Thread thread = new Thread(futureTask); thread.start(); System.out.println("主线程继续执行:" + Thread.currentThread().getName()); String result = futureTask.get(); //同步阻塞,等待获取异步结果 System.out.println("获取异步结果:" + result); } }

2. FutureTask 的缺点

FutureTask适合简单异步任务,但缺点也明显:

get() 会阻塞
不方便做回调
不方便组合多个任务
异常处理不够优雅
任务编排能力弱

七、CompletableFuture:更强大的异步编排工具

FutureTask能解决简单异步任务,但是它不适合复杂任务编排。

例如商品详情页需要同时查:

商品信息
库存信息
价格信息
优惠券信息
评价信息

如果串行查:

商品 200ms
库存 300ms
价格 200ms
优惠券 400ms
评价 300ms

总耗时约 1400ms

如果并行查:

多个任务同时执行
总耗时约等于最慢的任务,也就是 400ms 左右

这类场景就适合使用CompletableFuture

1. CompletableFuture 的定位

CompletableFuture可以理解为:

Future 的增强版 + 异步任务编排工具。

它支持:

异步执行
任务回调
任务串联
任务合并
多个任务并行
异常兜底

八、CompletableFuture 常用方法详解

1. runAsync:执行无返回值任务

CompletableFuture<Void> future = CompletableFuture.runAsync(new Runnable() { @Override public void run() { System.out.println("异步发送短信:" + Thread.currentThread().getName()); } }, executor); future.join();

适合:

发送短信
写日志
清理缓存
异步通知

因为没有返回值,所以类型是:CompletableFuture<Void>

2. supplyAsync:执行有返回值任务

CompletableFuture<String> future = CompletableFuture.supplyAsync(() -> { System.out.println("查询用户:" + Thread.currentThread().getName()); return "用户信息:张三"; }, executor); String result = future.join(); System.out.println(result);

supplyAsync适合有返回结果的任务。

3. thenApply:接收结果,并返回新结果

CompletableFuture<String> future = CompletableFuture.supplyAsync(() -> { return "用户1001"; }, executor).thenApply(userInfo -> { return userInfo + " 的订单信息"; }); String result = future.join(); System.out.println(result);

输出:用户1001 的订单信息

特点:接收上一步结果 返回一个新结果

4. thenAccept:接收结果,不返回新结果

CompletableFuture<Void> future = CompletableFuture.supplyAsync(() -> { return "订单创建成功"; }, executor).thenAccept(result -> { System.out.println("发送通知:" + result); }); future.join();

特点:接收上一步结果 没有返回值

5. thenRun:不接收结果,也不返回结果

CompletableFuture<Void> future = CompletableFuture.supplyAsync(() -> { return "订单结果"; }, executor).thenRun(() -> { System.out.println("订单流程结束,记录日志"); }); future.join();

特点:

不关心上一步结果
只是在上一步完成后继续执行

6. thenApply 和 thenApplyAsync 的区别

这是重点。

thenApply() 不一定开启新线程,通常由上一步任务完成的线程继续执行。

thenApplyAsync() 会把下一步重新提交到线程池执行。

示例:

CompletableFuture<String> future = CompletableFuture.supplyAsync(() -> { System.out.println("第一步:" + Thread.currentThread().getName()); return "用户信息"; }, executor).thenApplyAsync(result -> { System.out.println("第二步:" + Thread.currentThread().getName()); return result + " + 订单信息"; }, executor);

选择建议:

7. thenCombine:合并两个任务结果

如果两个任务互不依赖,可以并行执行,最后合并结果。

CompletableFuture<UserDTO> userFuture = CompletableFuture.supplyAsync(() -> { return userService.queryUserInfo(1001L); }, executor); CompletableFuture<OrderDTO> orderFuture = CompletableFuture.supplyAsync(() -> { return orderService.queryOrderInfo(1001L); }, executor); CompletableFuture<UserOrderDTO> resultFuture = userFuture.thenCombine(orderFuture, (UserDTO user, OrderDTO order) -> { UserOrderDTO dto = new UserOrderDTO(); dto.setUser(user); dto.setOrder(order); return dto; }); UserOrderDTO result = resultFuture.join();

thenCombine的意思是:

等两个任务都完成后,拿到两个任务结果,再合并成一个新结果。

8. allOf:等待多个任务全部完成

CompletableFuture<UserDTO> userFuture = CompletableFuture.supplyAsync(() -> userService.queryUserInfo(1001L), executor); CompletableFuture<OrderDTO> orderFuture = CompletableFuture.supplyAsync(() -> orderService.queryOrderInfo(1001L), executor); CompletableFuture<CouponDTO> couponFuture = CompletableFuture.supplyAsync(() -> couponService.queryCoupon(1001L), executor); CompletableFuture.allOf(userFuture, orderFuture, couponFuture).join(); UserDTO user = userFuture.join(); OrderDTO order = orderFuture.join(); CouponDTO coupon = couponFuture.join();

注意:

CompletableFuture.allOf(...)

返回的是:

CompletableFuture<Void>

每个任务结果还需要分别:

userFuture.join();
orderFuture.join();
couponFuture.join();

9. anyOf:任意一个任务完成即可

CompletableFuture<String> future1 = CompletableFuture.supplyAsync(() -> { sleep(3000); return "服务A返回结果"; }, executor); CompletableFuture<String> future2 = CompletableFuture.supplyAsync(() -> { sleep(1000); return "服务B返回结果"; }, executor); Object result = CompletableFuture.anyOf(future1, future2).join(); System.out.println(result);

输出大概率是: 服务B返回结果

适合场景:

多个服务查同一份数据
谁先返回就用谁
容灾查询

九、CompletableFuture 异常处理

异步任务也可能失败。

如果不处理异常:

CompletableFuture<String> future = CompletableFuture.supplyAsync(() -> { int i = 1 / 0; return "success"; }, executor); String result = future.join();

join()会抛出:CompletionException

1. exceptionally:异常兜底

CompletableFuture<String> future = CompletableFuture.supplyAsync(() -> { int i = 1 / 0; return "success"; }, executor).exceptionally(ex -> { System.out.println("异步任务异常:" + ex.getMessage()); return "默认结果"; }); String result = future.join(); System.out.println(result);

输出: 默认结果

exceptionally只在异常时执行。

2. handle:成功失败都处理

CompletableFuture<String> future = CompletableFuture.supplyAsync(() -> { return "正常结果"; }, executor).handle((result, ex) -> { if (ex != null) { return "失败兜底结果"; } return result; }); String result = future.join(); System.out.println(result);

handle的特点:

成功也执行 失败也执行 可以返回新的结果
3. whenComplete:只做观察,不改变结果
CompletableFuture<String> future = CompletableFuture.supplyAsync(() -> { return "订单创建成功"; }, executor).whenComplete((result, ex) -> { if (ex != null) { System.out.println("任务失败:" + ex.getMessage()); } else { System.out.println("任务成功:" + result); } }); String result = future.join();

whenComplete适合:

打印日志
监控埋点
记录任务状态

十、线程池:生产环境不建议直接 new Thread

前面的代码里,我们经常写:new Thread(task).start();

但是生产环境不建议频繁这么写。

原因是:

线程创建销毁有成本
线程数量不可控
高并发下可能创建大量线程,压垮服务器
不方便统一管理线程

所以实际开发中更推荐使用线程池。

1. ExecutorService 执行 Runnable

import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; public class ExecutorRunnableDemo { public static void main(String[] args) { ExecutorService pool = Executors.newFixedThreadPool(3); pool.execute(new Runnable() { @Override public void run() { System.out.println("执行 Runnable 任务:" + Thread.currentThread().getName()); } }); pool.shutdown(); } }

execute()用于执行没有返回值的任务。

2. ExecutorService 执行 Callable

import java.util.concurrent.Callable; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; import java.util.concurrent.Future; public class ExecutorCallableDemo { public static void main(String[] args) throws Exception { ExecutorService pool = Executors.newFixedThreadPool(3); Future<String> future = pool.submit(new Callable<String>() { @Override public String call() throws Exception { System.out.println("执行 Callable 任务:" + Thread.currentThread().getName()); Thread.sleep(1000); return "订单查询结果"; } }); String result = future.get(); System.out.println("获取结果:" + result); pool.shutdown(); } }

这里:pool.submit(callable);

返回的是: Future<String>

然后通过:future.get(); 获取结果。

3. execute 和 submit 的区别

异常表现也不同:

execute() 执行任务异常,通常会直接打印异常。
submit() 执行任务异常,异常会封装进 Future,调用 get() 时才抛出。

4. ThreadPoolExecutor:推荐的线程池创建方式

虽然下面写法简单:

ExecutorService pool = Executors.newFixedThreadPool(10);

但是生产环境更推荐手动创建ThreadPoolExecutor

原因是Executors一些方法底层使用无界队列,任务过多时可能造成内存压力。

推荐写法

import java.util.concurrent.ArrayBlockingQueue; import java.util.concurrent.Executors; import java.util.concurrent.ThreadPoolExecutor; import java.util.concurrent.TimeUnit; public class ThreadPoolConfigDemo { public static ThreadPoolExecutor buildExecutor() { return new ThreadPoolExecutor( 10, 20, 60, TimeUnit.SECONDS, new ArrayBlockingQueue<>(100), Executors.defaultThreadFactory(), new ThreadPoolExecutor.CallerRunsPolicy() ); } }

核心参数说明

new ThreadPoolExecutor( corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue, threadFactory, handler );

线程池执行流程

假设:

corePoolSize = 10
maximumPoolSize = 20
queue = 100

执行流程:

常见拒绝策略

十一、CompletableFuture + 线程池实战:商品详情页聚合查

1. 业务场景

商品详情页需要查询:

商品基本信息
库存信息
价格信息
优惠券信息
评价信息

这些查询之间没有强依赖,可以并行执行。

2. 线程池配置

import java.util.concurrent.ArrayBlockingQueue; import java.util.concurrent.Executors; import java.util.concurrent.ThreadPoolExecutor; import java.util.concurrent.TimeUnit; public class AsyncExecutorConfig { public static ThreadPoolExecutor buildExecutor() { return new ThreadPoolExecutor( 10, 20, 60, TimeUnit.SECONDS, new ArrayBlockingQueue<>(200), Executors.defaultThreadFactory(), new ThreadPoolExecutor.CallerRunsPolicy() ); } }

3. DTO 示例

public class ProductDetailDTO { private ProductDTO product; private StockDTO stock; private PriceDTO price; private CouponDTO coupon; private CommentDTO comment; public ProductDTO getProduct() { return product; } public void setProduct(ProductDTO product) { this.product = product; } public StockDTO getStock() { return stock; } public void setStock(StockDTO stock) { this.stock = stock; } public PriceDTO getPrice() { return price; } public void setPrice(PriceDTO price) { this.price = price; } public CouponDTO getCoupon() { return coupon; } public void setCoupon(CouponDTO coupon) { this.coupon = coupon; } public CommentDTO getComment() { return comment; } public void setComment(CommentDTO comment) { this.comment = comment; } }

简单 DTO:

public class ProductDTO { private Long productId; private String productName; public ProductDTO(Long productId, String productName) { this.productId = productId; this.productName = productName; } }
public class StockDTO { private Integer stock; public StockDTO(Integer stock) { this.stock = stock; } }
public class StockDTO { private Integer stock; public StockDTO(Integer stock) { this.stock = stock; } }
public class CouponDTO { private String couponName; public CouponDTO(String couponName) { this.couponName = couponName; } }
public class CommentDTO { private Integer commentCount; public CommentDTO(Integer commentCount) { this.commentCount = commentCount; } }

4. 模拟 Service

public class ProductService { public ProductDTO queryProduct(Long productId) { sleep(200); return new ProductDTO(productId, "iPhone 15"); } private void sleep(long millis) { try { Thread.sleep(millis); } catch (InterruptedException e) { Thread.currentThread().interrupt(); } } }
public class StockService { public StockDTO queryStock(Long productId) { sleep(300); return new StockDTO(100); } private void sleep(long millis) { try { Thread.sleep(millis); } catch (InterruptedException e) { Thread.currentThread().interrupt(); } } }
public class PriceService { public PriceDTO queryPrice(Long productId) { sleep(200); return new PriceDTO(5999); } private void sleep(long millis) { try { Thread.sleep(millis); } catch (InterruptedException e) { Thread.currentThread().interrupt(); } } }
public class CouponService { public CouponDTO queryCoupon(Long productId) { sleep(400); return new CouponDTO("满5000减300"); } private void sleep(long millis) { try { Thread.sleep(millis); } catch (InterruptedException e) { Thread.currentThread().interrupt(); } } }
public class CouponService { public CouponDTO queryCoupon(Long productId) { sleep(400); return new CouponDTO("满5000减300"); } private void sleep(long millis) { try { Thread.sleep(millis); } catch (InterruptedException e) { Thread.currentThread().interrupt(); } } }

5. 聚合查询实现

import java.util.concurrent.CompletableFuture; import java.util.concurrent.ThreadPoolExecutor; public class ProductDetailService { private final ProductService productService = new ProductService(); private final StockService stockService = new StockService(); private final PriceService priceService = new PriceService(); private final CouponService couponService = new CouponService(); private final CommentService commentService = new CommentService(); private final ThreadPoolExecutor executor = AsyncExecutorConfig.buildExecutor(); public ProductDetailDTO queryProductDetail(Long productId) { CompletableFuture<ProductDTO> productFuture = CompletableFuture.supplyAsync(() -> { return productService.queryProduct(productId); }, executor); CompletableFuture<StockDTO> stockFuture = CompletableFuture.supplyAsync(() -> { return stockService.queryStock(productId); }, executor); CompletableFuture<PriceDTO> priceFuture = CompletableFuture.supplyAsync(() -> { return priceService.queryPrice(productId); }, executor); CompletableFuture<CouponDTO> couponFuture = CompletableFuture.supplyAsync(() -> { return couponService.queryCoupon(productId); }, executor).exceptionally(ex -> { System.out.println("优惠券查询失败:" + ex.getMessage()); return null; }); CompletableFuture<CommentDTO> commentFuture = CompletableFuture.supplyAsync(() -> { return commentService.queryComment(productId); }, executor).exceptionally(ex -> { System.out.println("评价查询失败:" + ex.getMessage()); return new CommentDTO(0); }); CompletableFuture.allOf( productFuture, stockFuture, priceFuture, couponFuture, commentFuture ).join(); ProductDetailDTO detailDTO = new ProductDetailDTO(); detailDTO.setProduct(productFuture.join()); detailDTO.setStock(stockFuture.join()); detailDTO.setPrice(priceFuture.join()); detailDTO.setCoupon(couponFuture.join()); detailDTO.setComment(commentFuture.join()); return detailDTO; } }

6. 测试代码

public class ProductDetailTest { public static void main(String[] args) { ProductDetailService productDetailService = new ProductDetailService(); long start = System.currentTimeMillis(); ProductDetailDTO detailDTO = productDetailService.queryProductDetail(1001L); long end = System.currentTimeMillis(); System.out.println("查询完成,耗时:" + (end - start) + "ms"); System.out.println(detailDTO); } }

串行查询大约需要:200 + 300 + 200 + 400 + 300 = 1400ms

并行查询大约需要:取最慢任务耗时,大约 400ms

这就是CompletableFuture + 线程池的典型应用场景。

十二、异步编程中的事务问题

异步编程一定要注意事务。

错误示例:

@Transactional public void createOrder(Long userId, Long productId) { orderMapper.insertOrder(userId, productId); CompletableFuture.runAsync(() -> { stockMapper.deductStock(productId); }, executor); accountMapper.deductBalance(userId); }

很多人以为:

createOrder() 方法加了 @Transactional,
所以里面所有数据库操作都在一个事务里。

但这是错误的。

因为:

Spring 事务默认绑定当前线程。
CompletableFuture 开启的是新线程。
新线程不会自动加入外层 @Transactional 事务。

也就是说:stockMapper.deductStock(productId); 通常不在外层事务中

如果外层事务回滚,异步线程里的库存扣减不一定回滚,可能造成数据不一致。

核心写流程建议同步事务执行

@Transactional public void createOrder(Long userId, Long productId) { orderMapper.insertOrder(userId, productId); stockMapper.deductStock(productId); accountMapper.deductBalance(userId); }

如果是跨服务场景,要考虑:

可靠消息
本地消息表
TCC
Saga
补偿机制
幂等控制
最终一致性

十三、异步编程最佳实践

1. 不要所有任务都异步

适合异步的任务:

耗时任务
非核心任务
可以延迟完成的任务
失败后可以补偿的任务
多个互不依赖的查询任务

2. 一定要使用自定义线程池

不要大量使用:CompletableFuture.supplyAsync(() -> queryData());

因为默认使用的是:ForkJoinPool.commonPool()

这是公共线程池,不适合承载大量业务任务。

推荐:

ompletableFuture.supplyAsync(() -> queryData(), executor);

3. 一定要处理异常

不要写成:

CompletableFuture.supplyAsync(() -> { return remoteService.query(); }, executor);

推荐加异常处理:

CompletableFuture.supplyAsync(() -> { return remoteService.query(); }, executor).exceptionally(ex -> { System.out.println("查询失败:" + ex.getMessage()); return defaultValue; });

4. join 也会阻塞

虽然CompletableFuture是异步的,但是future.join();

仍然会阻塞当前线程。

如果任务还没有完成,当前线程会等待。

所以:异步的是任务执行。 阻塞的是最终等待结果的 join。

5. 注意线程池隔离

不同业务最好使用不同线程池。

比如:

订单线程池 短信线程池 报表线程池 文件处理线程池

不要所有异步任务共用一个线程池。

否则某个慢任务堆积,可能拖垮其他业务。

十四、核心知识点总结

版权声明: 本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若内容造成侵权/违法违规/事实不符,请联系邮箱:809451989@qq.com进行投诉反馈,一经查实,立即删除!
网站建设 2026/5/28 1:18:51

Scanpy实战:从10x Genomics原始数据到发表级图表,一篇就够了

Scanpy实战&#xff1a;从10x Genomics原始数据到发表级图表全流程解析 单细胞测序技术正在重塑我们对生命系统的理解方式。想象一下&#xff0c;你手中握着来自10x Genomics平台的原始数据&#xff0c;这些数据可能蕴含着疾病机制的关键线索或发育过程的未知规律。如何将这些看…

作者头像 李华
网站建设 2026/5/28 1:10:04

C64 BASIC 游戏地图“相机视角”实现:从初稿到优化,性能提升有妙招!

Retro Game Coders 社区介绍Retro Game Coders 是一个复古电脑/游戏机游戏 开发社区&#xff0c;提供了丰富的复古资源&#xff0c;如复古游戏时间线、在线复古 IDE、复古像素艺术编辑器等。C64 BASIC 游戏地图“相机视角”问题提出关于如何创建《创世纪》风格地图视图的解释&…

作者头像 李华