前言
在Spring中我们经常会用到异步操作,注解中使用@EnableAsync和@Async就可以使用它了。但是最近发现在异步中线程号使用的是我们项目中自定义的线程池ThreadPoolTaskExecutor而不是之前熟悉的SimpleAsyncTaskExecutor
那么来看一下他的执行过程吧。
正文
- 首先要使异步生效,我们得在启动类中加入
@EnableAsync那么就点开它看看。它会使用@Import注入一个AsyncConfigurationSelector类,启动是通过父类可以决定它使用的是配置类ProxyAsyncConfiguration。
public class AsyncConfigurationSelector extends AdviceModeImportSelector<EnableAsync> { private static final String ASYNC_EXECUTION_ASPECT_CONFIGURATION_CLASS_NAME = "org.springframework.scheduling.aspectj.AspectJAsyncConfiguration"; public AsyncConfigurationSelector() { } @Nullable public String[] selectImports(AdviceMode adviceMode) { switch(adviceMode) { case PROXY: return new String[]{ProxyAsyncConfiguration.class.getName()}; case ASPECTJ: return new String[]{"org.springframework.scheduling.aspectj.AspectJAsyncConfiguration"}; default: return null; } } }- 点开能够看到注入一个
AsyncAnnotationBeanPostProcessor。它实现了BeanPostProcessor接口,因此它是一个后处理器,用于将Spring AOP的Advisor应用于给定的bean。从而该bean上通过异步注解所定义的方法在调用时会被真正地异步调用起来。
public class ProxyAsyncConfiguration extends AbstractAsyncConfiguration { public ProxyAsyncConfiguration() { } @Bean( name = {"org.springframework.context.annotation.internalAsyncAnnotationProcessor"} ) @Role(2) public AsyncAnnotationBeanPostProcessor asyncAdvisor() { Assert.notNull(this.enableAsync, "@EnableAsync annotation metadata was not injected"); AsyncAnnotationBeanPostProcessor bpp = new AsyncAnnotationBeanPostProcessor(); bpp.configure(this.executor, this.exceptionHandler); Class<? extends Annotation> customAsyncAnnotation = this.enableAsync.getClass("annotation"); if (customAsyncAnnotation != AnnotationUtils.getDefaultValue(EnableAsync.class, "annotation")) { bpp.setAsyncAnnotationType(customAsyncAnnotation); } bpp.setProxyTargetClass(this.enableAsync.getBoolean("proxyTargetClass")); bpp.setOrder((Integer)this.enableAsync.getNumber("order")); return bpp; } }AsyncAnnotationBeanPostProcessor的父类实现了BeanFactoryAware,那么会在AsyncAnnotationBeanPostProcessor实例化之后回调setBeanFactory()来实例化切面AsyncAnnotationAdvisor。
public void setBeanFactory(BeanFactory beanFactory) { super.setBeanFactory(beanFactory); //定义一个切面 AsyncAnnotationAdvisor advisor = new AsyncAnnotationAdvisor(this.executor, this.exceptionHandler); if (this.asyncAnnotationType != null) { advisor.setAsyncAnnotationType(this.asyncAnnotationType); } advisor.setBeanFactory(beanFactory); this.advisor = advisor; }AsyncAnnotationAdvisor构造和声明切入的目标(切点)和代码增强(通知)。
public AsyncAnnotationAdvisor( @Nullable Supplier<Executor> executor, @Nullable Supplier<AsyncUncaughtExceptionHandler> exceptionHandler) { Set<Class<? extends Annotation>> asyncAnnotationTypes = new LinkedHashSet<>(2); asyncAnnotationTypes.add(Async.class); try { asyncAnnotationTypes.add((Class<? extends Annotation>) ClassUtils.forName("javax.ejb.Asynchronous", AsyncAnnotationAdvisor.class.getClassLoader())); } catch (ClassNotFoundException ex) { // If EJB 3.1 API not present, simply ignore. } //通知 this.advice = buildAdvice(executor, exceptionHandler); //切入点 this.pointcut = buildPointcut(asyncAnnotationTypes); }- 通知就是最终要执行的。
buildAdvice用于构建通知,主要是创建一个AnnotationAsyncExecutionInterceptor类型的拦截器,并且配置好使用的执行器和异常处理器。真正的异步执行的代码在AsyncExecutionAspectSupport中!
protected Advice buildAdvice( @Nullable Supplier<Executor> executor, @Nullable Supplier<AsyncUncaughtExceptionHandler> exceptionHandler) { AnnotationAsyncExecutionInterceptor interceptor = new AnnotationAsyncExecutionInterceptor(null); //配置拦截器 interceptor.configure(executor, exceptionHandler); return interceptor; }- 配置拦截器,通过参数配置自定义的执行器和异常处理器或者使用默认的执行器和异常处理器。
public void configure(@Nullable Supplier<Executor> defaultExecutor, @Nullable Supplier<AsyncUncaughtExceptionHandler> exceptionHandler) { //默认执行器 this.defaultExecutor = new SingletonSupplier<>(defaultExecutor, () -> getDefaultExecutor(this.beanFactory)); this.exceptionHandler = new SingletonSupplier<>(exceptionHandler, SimpleAsyncUncaughtExceptionHandler::new); }getDefaultExecutor()方法,用来查找默认的执行器,父类AsyncExecutionAspectSupport首先寻找唯一一个类型为TaskExecutor的执行器并返回,若存在多个则寻找默认的执行器taskExecutor,若无法找到则直接返回null。子类AsyncExecutionInterceptor重写getDefaultExecutor方法,首先调用父类逻辑,返回null则配置一个名为SimpleAsyncTaskExecutor的执行器
/** * 父类 * 获取或构建此通知实例的默认执行器 * 这里返回的执行器将被缓存以供后续使用 * 默认实现搜索唯一的TaskExecutor的bean * 在上下文中,用于名为“taskExecutor”的Executor bean。 * 如果两者都不是可解析的,这个实现将返回 null */ @Nullable protected Executor getDefaultExecutor(@Nullable BeanFactory beanFactory) { if (beanFactory != null) { try { // 搜索唯一的一个TaskExecutor类型的bean并返回 return beanFactory.getBean(TaskExecutor.class); } catch (NoUniqueBeanDefinitionException ex) { //找不到唯一一个bean异常后,搜索一个TaskExecutor类型的“taskExecutor”的bean并返回 logger.debug("Could not find unique TaskExecutor bean", ex); try { return beanFactory.getBean(DEFAULT_TASK_EXECUTOR_BEAN_NAME, Executor.class); } catch (NoSuchBeanDefinitionException ex2) { if (logger.isInfoEnabled()) { logger.info("More than one TaskExecutor bean found within the context, and none is named " + "'taskExecutor'. Mark one of them as primary or name it 'taskExecutor' (possibly " + "as an alias) in order to use it for async processing: " + ex.getBeanNamesFound()); } } } catch (NoSuchBeanDefinitionException ex) { //未找到异常时搜索一个TaskExecutor类型的“taskExecutor”的bean并返回 logger.debug("Could not find default TaskExecutor bean", ex); try { return beanFactory.getBean(DEFAULT_TASK_EXECUTOR_BEAN_NAME, Executor.class); } catch (NoSuchBeanDefinitionException ex2) { logger.info("No task executor bean found for async processing: " + "no bean of type TaskExecutor and no bean named 'taskExecutor' either"); } // Giving up -> either using local default executor or none at all... } } return null; } /** * 子类 * 如父类为null则重新实例化一个名为SimpleAsyncTaskExecutor的执行器 */ @Override @Nullable protected Executor getDefaultExecutor(@Nullable BeanFactory beanFactory) { Executor defaultExecutor = super.getDefaultExecutor(beanFactory); return (defaultExecutor != null ? defaultExecutor : new SimpleAsyncTaskExecutor()); }所以,到了这一步就可以理解为什么异步线程名默认叫SimpleAsyncTaskExecutor-xx,为什么有了自己的线程池有可能异步用到了自己的线程池配置。
我们有这个切入点之后,每次请求接口执行异步方法前都会执行AsyncExecutionInterceptor#invoke(),determineAsyncExecutor用来决策使用哪个执行器
@Nullable protected AsyncTaskExecutor determineAsyncExecutor(Method method) { //在缓存的执行器中选择一个对应方法的执行器 AsyncTaskExecutor executor = (AsyncTaskExecutor)this.executors.get(method); if (executor == null) { //获取@Async注解中的value(指定的执行器) String qualifier = this.getExecutorQualifier(method); Executor targetExecutor; if (StringUtils.hasLength(qualifier)) { //获取指定执行器的bean targetExecutor = this.findQualifiedExecutor(this.beanFactory, qualifier); } else { //选择默认的执行器 targetExecutor = (Executor)this.defaultExecutor.get(); } if (targetExecutor == null) { return null; } executor = targetExecutor instanceof AsyncListenableTaskExecutor ? (AsyncListenableTaskExecutor)targetExecutor : new TaskExecutorAdapter(targetExecutor); //将执行器进行缓存 this.executors.put(method, executor); } return (AsyncTaskExecutor)executor; }当有了执行器调用doSubmit方法将任务加入到执行器中。
异步任务,默认将采用SimpleAsyncTaskExecutor作为执行器!它有如下特点:
不复用线程,也就是说为每个任务新起一个线程。但是可以通过
concurrencyLimit属性来控制并发线程数量,但是默认情况下不做限制(concurrencyLimit取值为-1)。
因此,如果我们使用异步任务,一定不能采用默认执行器的配置,以防OOM异常!最好的方式是指定执行器!
总结
本文主要以看源码的方式来了解异步注解@Async是如何在项目中选择线程以及使用线程的,尽量给异步任务指定一个独有线程池,这样会的避免不与其他业务共用线程池而造成影响。
AI大模型学习福利
作为一名热心肠的互联网老兵,我决定把宝贵的AI知识分享给大家。 至于能学习到多少就看你的学习毅力和能力了 。我已将重要的AI大模型资料包括AI大模型入门学习思维导图、精品AI大模型学习书籍手册、视频教程、实战学习等录播视频免费分享出来。
一、全套AGI大模型学习路线
AI大模型时代的学习之旅:从基础到前沿,掌握人工智能的核心技能!
因篇幅有限,仅展示部分资料,需要点击文章最下方名片即可前往获取
二、640套AI大模型报告合集
这套包含640份报告的合集,涵盖了AI大模型的理论研究、技术实现、行业应用等多个方面。无论您是科研人员、工程师,还是对AI大模型感兴趣的爱好者,这套报告合集都将为您提供宝贵的信息和启示。
因篇幅有限,仅展示部分资料,需要点击文章最下方名片即可前往获
三、AI大模型经典PDF籍
随着人工智能技术的飞速发展,AI大模型已经成为了当今科技领域的一大热点。这些大型预训练模型,如GPT-3、BERT、XLNet等,以其强大的语言理解和生成能力,正在改变我们对人工智能的认识。 那以下这些PDF籍就是非常不错的学习资源。
因篇幅有限,仅展示部分资料,需要点击文章最下方名片即可前往获
四、AI大模型商业化落地方案
因篇幅有限,仅展示部分资料,需要点击文章最下方名片即可前往获
作为普通人,入局大模型时代需要持续学习和实践,不断提高自己的技能和认知水平,同时也需要有责任感和伦理意识,为人工智能的健康发展贡献力量