spring框架中的@Async使用以及原理浅析

Spring framework 中有一个叫 @Async 的注解。顾名思义,它是一个能将方法异步化的注解。 @Async 可以作用在方法或者类上,只需加上这一注解,就可以轻松将本来同步调用的方法变为异步调用。在实际应用中,如果我们在一段逻辑中不关心某个方法的具体返回值,只是希望调用这个方法,便可以在这个方法加上这个注解,使主逻辑快速返回。

@Target({ElementType.METHOD, ElementType.TYPE})
@Retention(RetentionPolicy.RUNTIME)
@Documented
public @interface Async {
	/**
	 * 指定执行器名称。
	 * 可以是任意一个 java.util.concurrent.Executor 或者 org.springframework.core.task.TaskExecutor
	 */
	String value() default "";
}

使用方法

环境与配置

@Async 在spring3.0之后就支持了,这里我们测试的环境为:

JAVA 1.8
Spring Boot 1.5.8
Tomcat 7

pom 中的关键配置:

<groupId>org.hxuhao.spring.cloud.demo</groupId>
    <artifactId>user-service-provider</artifactId>
    <version>1.0-SNAPSHOT</version>
    <packaging>war</packaging>

    <parent>
        <groupId>org.springframework.boot</groupId>
        <artifactId>spring-boot-starter-parent</artifactId>
        <version>1.5.8.RELEASE</version>
    </parent>

    <properties>
        <java.version>1.8</java.version>
        <spring-cloud.version>Dalston.SR4</spring-cloud.version>
    </properties>


    <dependencies>
        <!-- web -->
        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-web</artifactId>
        </dependency>
        <!--引入 servlet-->
        <dependency>
            <groupId>javax.servlet</groupId>
            <artifactId>javax.servlet-api</artifactId>
        </dependency>
    </dependencies>

主要类

首先,我们一个配置类来开启对 @Async 的支持,当然也可以通过 xml 的形式配置

配置类

@EnableAsync 	// 开启@Async
@Component
public class AsyncConfig {
}

业务类

然后,我们就可以在需要的方法上加上 @Async ,为了测试,我们会输出执行方法的具体线程。

/**
 * 用户控制器
 */
@RestController
@RequestMapping("/users")
public class UserController {

    @Autowired
    private UserService userService;

    @GetMapping(value = "/getUser")
    public String getUser() {
        System.out.println(String.format("UserController.getUser current thread %s", Thread.currentThread().getName()));
        String user = userService.getUser();
        userService.asyncFunction();
        return user;
    }
}

---

/**
 * 用户服务 (这里为篇幅省略,就不贴UserService这个接口了)
 */
@Service
public class UserServiceImpl implements UserService {

    @Override
    public String getUser() {
        System.out.println(String.format("UserServiceImpl.getUser current thread %s", Thread.currentThread().getName()));
        return "hxuhao233";
    }
	
	/**
	 * 可以异步执行的方法
	 */
    @Async
    @Override
    public void asyncFunction() {
        System.out.println(String.format("UserServiceImpl.asyncFunction current thread %s", Thread.currentThread().getName()));
        return;
    }
}

启动应用后通过调用 http://localhost:{port}/users/getUser ,可以看到控制台的输出

// 第一次调用
UserController.getUser current thread http-apr-8083-exec-4
UserServiceImpl.getUser current thread http-apr-8083-exec-4
UserServiceImpl.asyncFunction current thread SimpleAsyncTaskExecutor-2

// 第二次调用
UserController.getUser current thread http-apr-8083-exec-7
UserServiceImpl.getUser current thread http-apr-8083-exec-7
UserServiceImpl.asyncFunction current thread SimpleAsyncTaskExecutor-3

// 第三次调用
UserController.getUser current thread http-apr-8083-exec-10
UserServiceImpl.getUser current thread http-apr-8083-exec-10
UserServiceImpl.asyncFunction current thread SimpleAsyncTaskExecutor-4

通过这段输出,我们发现:加上 @AsyncasyncFunctionSimpleAsyncTaskExecutor 执行了,而不是被Tomcat本身的线程执行,从而实现了
方法的异步调用。那么这个 SimpleAsyncTaskExecutor 是什么呢?

SimpleAsyncTaskExecutor

通过查看源码,我们发现, SimpleAsyncTaskExecutor 其实是 @Async 默认的一种实现方式,因为异步执行肯定用到其他线程池,然而在我们的代码中并没有指定线程池,所以 Spring 默认使用了 SimpleAsyncTaskExecutor
需要注意的是: SimpleAsyncTaskExecutor 并不是标准意义上的线程池,因为它对每一次方法调用都是直接 新建一个线程 去执行的,没有复用线程。这是一个非常 危险 的行为,在实际应用中可能为引起巨大的 资源浪费

public class SimpleAsyncTaskExecutor extends CustomizableThreadCreator
		implements AsyncListenableTaskExecutor, Serializable {
	protected void doExecute(Runnable task) {
	    // 每次都new一个线程
		Thread thread = (this.threadFactory != null ? this.threadFactory.newThread(task) : createThread(task)); 		
		thread.start();
	}
	
	public Thread createThread(Runnable runnable) {
		Thread thread = new Thread(getThreadGroup(), runnable, nextThreadName());
		thread.setPriority(getThreadPriority());
		thread.setDaemon(isDaemon());
		return thread;
	}
}

@Async 指定线程池

单线程池

为了指定线程池,我们需要把配置类实现 AsyncConfigurer ,它有两个方法需要实现,一个方法指定线程池,另一个方法指定异常处理的方式。

@EnableAsync
@Component
public class AsyncConfig implements AsyncConfigurer {
    @Override
    public Executor getAsyncExecutor() {
        // 实际线程池的具体参数等配置应跟实际业务修改。
        Executor executor = new ThreadPoolExecutor(2, 2, 1,
                TimeUnit.MINUTES, new LinkedBlockingDeque<>(1000),
                new CustomizableThreadFactory("async-pool-"));
        return executor;
    }

    @Override
    public AsyncUncaughtExceptionHandler getAsyncUncaughtExceptionHandler() {
        // 异常处理
        return new SimpleAsyncUncaughtExceptionHandler();
    }
}

修改之后再次调用 http://localhost:{port}/users/getUser ,可以看到控制台输出

// 第一次调用
UserController.getUser current thread http-apr-8083-exec-7
UserServiceImpl.getUser current thread http-apr-8083-exec-7
UserServiceImpl.asyncFunction current thread async-pool-1

// 第二次调用
UserController.getUser current thread http-apr-8083-exec-10
UserServiceImpl.getUser current thread http-apr-8083-exec-10
UserServiceImpl.asyncFunction current thread async-pool-2

// 第三次调用
UserController.getUser current thread http-apr-8083-exec-8
UserServiceImpl.getUser current thread http-apr-8083-exec-8
UserServiceImpl.asyncFunction current thread async-pool-1

这次就是我们自己指定的线程池执行 asyncFunction 了。

指定多线程池

如果我们需要方法的执行不受别的方法影响,或者不同方法在不同的线程中执行,就需要配置多个线程池。
并且使用 @Async 时必须指定执行 执行器的名称

@EnableAsync
@Component
public class AsyncConfig{
    @Bean("async-pool-bean-1")
    public Executor getExecutor1() {
        return new ThreadPoolExecutor(2, 2, 1,
                TimeUnit.MINUTES, new LinkedBlockingDeque<>(1000),
                new CustomizableThreadFactory("async-pool-1-"));
    }

    @Bean("async-pool-bean-2")
    public Executor getExecutor2() {
        return new ThreadPoolExecutor(2, 2, 1,
                TimeUnit.MINUTES, new LinkedBlockingDeque<>(1000),
                new CustomizableThreadFactory("async-pool-2-"));
    }
}

---


    @Async("async-pool-bean-1") // 指定由 async-pool-bean-1 执行
    @Override
    public void asyncFunction() {
        System.out.println(String.format("UserServiceImpl.asyncFunction current thread %s", Thread.currentThread().getName()));
        return;
    }

    @Async("async-pool-bean-2") // 指定由 async-pool-bean-2 执行
    @Override
    public void asyncFunction2() {
        System.out.println(String.format("UserServiceImpl.asyncFunction2 current thread %s", Thread.currentThread().getName()));
        return;
    }

原理

大致分为3块:

  • 应用启动时。
  • 构造业务类时。
  • 异步方法执行时。

应用启动

首先,要使用 @Async ,必须先用 @EnableAsyc

@Target(ElementType.TYPE)
@Retention(RetentionPolicy.RUNTIME)
@Documented
@Import(AsyncConfigurationSelector.class)
public @interface EnableAsync {

	/**
	 * 这里可以自定义使方法异步化的注解,一般我们直接用@Async就行
	 */
	Class<? extends Annotation> annotation() default Annotation.class;
	
	/**
	 * 实现异步代理的切面模式,一种是jdk动态代理(默认使用),一种是ASPECJ
	 */
	AdviceMode mode() default AdviceMode.PROXY;
}

其引入了 AsyncConfigurationSelector ,这是一个选择器,根据 @EnableAsync 中的 mode 来决定使用哪种异步配置。由于使用我们的配置是 AdviceMode.PROXY ,所以这里返回 ProxyAsyncConfiguration 的名称,表示后续会构建 ProxyAsyncConfiguration

public class AsyncConfigurationSelector extends AdviceModeImportSelector<EnableAsync> {

	private static final String ASYNC_EXECUTION_ASPECT_CONFIGURATION_CLASS_NAME =
			"org.springframework.scheduling.aspectj.AspectJAsyncConfiguration";
			
	@Override
	public String[] selectImports(AdviceMode adviceMode) {
		switch (adviceMode) {
			case PROXY:
				return new String[] { ProxyAsyncConfiguration.class.getName() }; // 走的这
			case ASPECTJ:
				return new String[] { ASYNC_EXECUTION_ASPECT_CONFIGURATION_CLASS_NAME };
			default:
				return null;
		}
	}
}
@Configuration
@Role(BeanDefinition.ROLE_INFRASTRUCTURE)
public class ProxyAsyncConfiguration extends AbstractAsyncConfiguration {
	protected Executor executor; // 继承自AbstractAsyncConfiguration

	protected AsyncUncaughtExceptionHandler exceptionHandler; // 继承自AbstractAsyncConfiguration

	@Bean(name = TaskManagementConfigUtils.ASYNC_ANNOTATION_PROCESSOR_BEAN_NAME)   
	@Role(BeanDefinition.ROLE_INFRASTRUCTURE)
	public AsyncAnnotationBeanPostProcessor asyncAdvisor() {
		Assert.notNull(this.enableAsync, "@EnableAsync annotation metadata was not injected");
		AsyncAnnotationBeanPostProcessor bpp = new AsyncAnnotationBeanPostProcessor();
		Class<? extends Annotation> customAsyncAnnotation = this.enableAsync.getClass("annotation");
		if (customAsyncAnnotation != AnnotationUtils.getDefaultValue(EnableAsync.class, "annotation")) {
			bpp.setAsyncAnnotationType(customAsyncAnnotation);
		}
		if (this.executor != null) {
			bpp.setExecutor(this.executor);
		}
		if (this.exceptionHandler != null) {
			bpp.setExceptionHandler(this.exceptionHandler);
		}
		bpp.setProxyTargetClass(this.enableAsync.getBoolean("proxyTargetClass"));
		bpp.setOrder(this.enableAsync.<Integer>getNumber("order"));
		return bpp;
	}
}

ProxyAsyncConfiguration 会构造一个 AsyncAnnotationBeanPostProcessor
AsyncAnnotationBeanPostProcessor 会被 spring 添加到bean的后置处理器列表中,可以在bean构造之后在执行某些操作,在后面构造业务类时也会出现。

public class AsyncAnnotationBeanPostProcessor extends AbstractBeanFactoryAwareAdvisingPostProcessor {
	protected final Log logger = LogFactory.getLog(getClass());

	private Class<? extends Annotation> asyncAnnotationType; // 异步的注解,比如@Async

	private Executor executor; // 默认的线程池

	private AsyncUncaughtExceptionHandler exceptionHandler; // 异常处理器
	
	protected Advisor advisor; 	// (实在不知道怎么翻译。。。理解为一种切面的基础接口,可以用来获取切面) ,继承自AbstractBeanFactoryAwareAdvisingPostProcessor 
	
	@Override
	public void setBeanFactory(BeanFactory beanFactory) {
		super.setBeanFactory(beanFactory);

		AsyncAnnotationAdvisor advisor = new AsyncAnnotationAdvisor(this.executor, this.exceptionHandler);
		if (this.asyncAnnotationType != null) {
			advisor.setAsyncAnnotationType(this.asyncAnnotationType);
		}
		advisor.setBeanFactory(beanFactory);
		this.advisor = advisor;
	}

setBeanFactory 中, AsyncAnnotationBeanPostProcessor 创建了一个 AsyncAnnotationAdvisor

public class AsyncAnnotationAdvisor extends AbstractPointcutAdvisor implements BeanFactoryAware {

	private AsyncUncaughtExceptionHandler exceptionHandler;

	private Advice advice;

	private Pointcut pointcut;

	public AsyncAnnotationAdvisor(Executor executor, AsyncUncaughtExceptionHandler exceptionHandler) {
		Set<Class<? extends Annotation>> asyncAnnotationTypes = new LinkedHashSet<Class<? extends Annotation>>(2);
		asyncAnnotationTypes.add(Async.class);
		
		this.advice = buildAdvice(executor, this.exceptionHandler);
		this.pointcut = buildPointcut(asyncAnnotationTypes);
	}
	
    /**
	 * 构造切面
	 */
	protected Advice buildAdvice(Executor executor, AsyncUncaughtExceptionHandler exceptionHandler) {
		return new AnnotationAsyncExecutionInterceptor(executor, exceptionHandler);
	}

	/**
	 * 构造切点
	 */
	protected Pointcut buildPointcut(Set<Class<? extends Annotation>> asyncAnnotationTypes) {
		ComposablePointcut result = null;
		for (Class<? extends Annotation> asyncAnnotationType : asyncAnnotationTypes) {
			Pointcut cpc = new AnnotationMatchingPointcut(asyncAnnotationType, true);
			Pointcut mpc = AnnotationMatchingPointcut.forMethodAnnotation(asyncAnnotationType);
			if (result == null) {
				result = new ComposablePointcut(cpc);
			}
			else {
				result.union(cpc);
			}
			result = result.union(mpc);
		}
		return result;
	}
}

AsyncAnnotationAdvisor 构造了一个切点,就包括 @Async 还有我们自定义的异步化注解。
另外还构造了一个切面 AnnotationAsyncExecutionInterceptor ,这就是 @Async 能异步执行的关键,在后面执行业务代码时再介绍。

构造业务类

以上面的 UserServiceImpl 举例

@Service
public class UserServiceImpl implements UserService {

    @Override
    public String getUser() {
        System.out.println(String.format("UserServiceImpl.getUser current thread %s", Thread.currentThread().getName()));
        return "hxuhao233";
    }
	
	/**
	 * 可以异步执行的方法
	 */
    @Async
    @Override
    public void asyncFunction() {
        System.out.println(String.format("UserServiceImpl.asyncFunction current thread %s", Thread.currentThread().getName()));
        return;
    }
}

spring 在构造这个 bean 时,会执行所有 BeanPostProcessor 的postProcessAfterInitialization,这其中就包括上面提到的 AsyncAnnotationBeanPostProcessor

public abstract class AbstractAutowireCapableBeanFactory extends AbstractBeanFactory
		implements AutowireCapableBeanFactory {
	@Override
	public Object applyBeanPostProcessorsAfterInitialization(Object existingBean, String beanName)
			throws BeansException {

		Object result = existingBean;
		// 依次调用所有注册到spring的PostProcessor的postProcessAfterInitialization方法
		for (BeanPostProcessor beanProcessor : getBeanPostProcessors()) { 
			result = beanProcessor.postProcessAfterInitialization(result, beanName);
			if (result == null) {
				return result;
			}
		}
		return result;
	}
}

postProcessAfterInitialization 方法中, UserSerivceImpl 被包起来,返回了一个新的代理类

public class AsyncAnnotationBeanPostProcessor extends AbstractBeanFactoryAwareAdvisingPostProcessor {
	private ConfigurableListableBeanFactory beanFactory;

	@Override
	public Object postProcessAfterInitialization(Object bean, String beanName) {
		// ...
		// isEligible 是用来判断这个bean是否应该被代理起来,具体到这就是判断这个bean有没有加上@Async注解
		if (isEligible(bean, beanName)) {
			ProxyFactory proxyFactory = prepareProxyFactory(bean, beanName);
			if (!proxyFactory.isProxyTargetClass()) {
				evaluateProxyInterfaces(bean.getClass(), proxyFactory);
			}
			// 工厂类设置`AsyncAnnotationAdvisor`
			proxyFactory.addAdvisor(this.advisor);
			customizeProxyFactory(proxyFactory);
			// 通过工厂创建代理类
			return proxyFactory.getProxy(getProxyClassLoader()); 	
		}
		// ...
	}	
}

最后 UserServiceImpl 的代理类长这样:

异步方法执行

当实际执行到 UserServiceImplasyncFunction 时, advisor 中的 AnnotationAsyncExecutionInterceptor 就发挥作用了

public class AnnotationAsyncExecutionInterceptor extends AsyncExecutionInterceptor {
	@Override
	public Object invoke(final MethodInvocation invocation) throws Throwable {
		Class<?> targetClass = (invocation.getThis() != null ? AopUtils.getTargetClass(invocation.getThis()) : null);
		Method specificMethod = ClassUtils.getMostSpecificMethod(invocation.getMethod(), targetClass);
		final Method userDeclaredMethod = BridgeMethodResolver.findBridgedMethod(specificMethod);
        // 选择具体执行方法的线程池
		AsyncTaskExecutor executor = determineAsyncExecutor(userDeclaredMethod);
		if (executor == null) {
			throw new IllegalStateException(
					"No executor specified and no default executor set on AsyncExecutionInterceptor either");
		}
		// 构造任务
		Callable<Object> task = new Callable<Object>() {
			@Override
			public Object call() throws Exception {
				try {
					Object result = invocation.proceed();
					if (result instanceof Future) {
						return ((Future<?>) result).get();
					}
				}
				catch (ExecutionException ex) {
					handleError(ex.getCause(), userDeclaredMethod, invocation.getArguments());
				}
				catch (Throwable ex) {
					handleError(ex, userDeclaredMethod, invocation.getArguments());
				}
				return null;
			}
		};
		// 调用线程池执行具体的方法(asyncFunction)
		return doSubmit(task, executor, invocation.getMethod().getReturnType());
	}
}

因此, asyncFunction 就交给另外的线程执行,实现了方法异步化。


原文:https://blog.csdn.net/qq_19404533/article/details/108164837
原文:https://blog.csdn.net/qq_19404533/article/details/108177065
作者: Hxuhao2333