抱歉,您的浏览器无法访问本站
本页面需要浏览器支持(启用)JavaScript
了解详情 >

Spring事务-异步源码解析

开启异步

开启异步任务使用方法

1).方法上加@Async注解

2).启动类或者配置类上@EnableAsync

@EnableAsync用于开启Spring bean异步方法的能力。下面是注解EnableAsync的定义。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
@Target(ElementType.TYPE)
@Retention(RetentionPolicy.RUNTIME)
@Documented
@Import(AsyncConfigurationSelector.class)
public @interface EnableAsync {

/**
* 默认只赋予@Async@javax.ejb.Asynchronous方法异步的能力,可通过该属性覆盖扩展
*/
Class<? extends Annotation> annotation() default Annotation.class;

/**
* <p>
* 异步方法的代理对象是否使用cglib
*/
boolean proxyTargetClass() default false;

/**
* 默认使用JDK动态代理
*/
AdviceMode mode() default AdviceMode.PROXY;

/**
* 最低优先级
*/
int order() default Ordered.LOWEST_PRECEDENCE;

}

核心注解就是@Import(AsyncConfigurationSelector.class),一看就是套路ImportSelector接口的selectImports()方法,源码如下:

注册AsyncAnnotationBeanPostProcessor

默认情况下EnableAsync#mode()为AdviceMode.PROXY,AsyncConfigurationSelector的selectImports()方法返回的@Configuartion类ProxyAsyncConfiguration里面注册了一个AsyncAnnotationBeanPostProcessor,这个BeanPostProcessor为每个有@Async的类或方法的类生成一个有异步方法调用能力的代理对象。

AdviceModeImportSelector#selectImports

AsyncConfigurationSelector.selectImports()方法是在父类AdviceModeImportSelector的ImportSelector接口方法selectImports()调用时被调用的。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
public abstract class AdviceModeImportSelector<A extends Annotation> implements ImportSelector {

public static final String DEFAULT_ADVICE_MODE_ATTRIBUTE_NAME = "mode";

protected String getAdviceModeAttributeName() {
return DEFAULT_ADVICE_MODE_ATTRIBUTE_NAME;
}

@Override
public final String[] selectImports(AnnotationMetadata importingClassMetadata) {
//@EnableAsync
Class<?> annType = GenericTypeResolver.resolveTypeArgument(getClass(), AdviceModeImportSelector.class);
Assert.state(annType != null, "Unresolvable type argument for AdviceModeImportSelector");

AnnotationAttributes attributes = AnnotationConfigUtils.attributesFor(importingClassMetadata, annType);
if (attributes == null) {
throw new IllegalArgumentException(String.format(
"@%s is not present on importing class '%s' as expected",
annType.getSimpleName(), importingClassMetadata.getClassName()));
}

AdviceMode adviceMode = attributes.getEnum(getAdviceModeAttributeName());
//调用子类的实现
String[] imports = selectImports(adviceMode);
if (imports == null) {
throw new IllegalArgumentException("Unknown AdviceMode: " + adviceMode);
}
return imports;
}

/**
* 抽象方法让子类实现
*/
@Nullable
protected abstract String[] selectImports(AdviceMode adviceMode);
}

AsyncConfigurationSelector

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
public class AsyncConfigurationSelector extends AdviceModeImportSelector<EnableAsync> {

private static final String ASYNC_EXECUTION_ASPECT_CONFIGURATION_CLASS_NAME =
"org.springframework.scheduling.aspectj.AspectJAsyncConfiguration";

@Override
@Nullable
public String[] selectImports(AdviceMode adviceMode) {
//不同模式使用不同代理技术,默认是PROXY JDK动态代理
switch (adviceMode) {
//JDK动态代理
case PROXY:
return new String[]{ProxyAsyncConfiguration.class.getName()};
//AspectJ静态织入
case ASPECTJ:
return new String[]{ASYNC_EXECUTION_ASPECT_CONFIGURATION_CLASS_NAME};
default:
return null;
}
}
}

异步配置类

ProxyAsyncConfiguration

ProxyAsyncConfiguration的@Bean方法内,注册了一个AsyncAnnotationBeanPostProcessor并配置了Supplier<Executor>和Supplier<AsyncUncaughtExceptionHandler>,这两个对象是定义在父类AbstractAsyncConfiguration里面的,通过一个@Autowired方法将容器的里面的AsyncConfigurer对象赋给executor和exceptionHandler。所以如果容器中没有这两种bean,这两个成员变量就是null。如果我们想实现自己的Executor和AsyncUncaughtExceptionHandler可以在容器中实现一个自定义的AsyncConfigurer对象。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
@Configuration
@Role(BeanDefinition.ROLE_INFRASTRUCTURE)
public class ProxyAsyncConfiguration extends AbstractAsyncConfiguration {

@Bean(name = TaskManagementConfigUtils.ASYNC_ANNOTATION_PROCESSOR_BEAN_NAME)
@Role(BeanDefinition.ROLE_INFRASTRUCTURE)
public AsyncAnnotationBeanPostProcessor asyncAdvisor() {
Assert.notNull(this.enableAsync, "@EnableAsync annotation metadata was not injected");
//创建AsyncAnnotationBeanPostProcessor
AsyncAnnotationBeanPostProcessor bpp = new AsyncAnnotationBeanPostProcessor();
//Executor:设置线程任务执行器,exceptionHandler:设置异常处理器
bpp.configure(this.executor, this.exceptionHandler);
Class<? extends Annotation> customAsyncAnnotation = this.enableAsync.getClass("annotation");
//如果@EnableAsync中用户自定义了annotation属性,即异步注解类型,那么设置
if (customAsyncAnnotation != AnnotationUtils.getDefaultValue(EnableAsync.class, "annotation")) {
bpp.setAsyncAnnotationType(customAsyncAnnotation);
}
//设置是否升级到CGLIB子类代理,默认不开启
bpp.setProxyTargetClass(this.enableAsync.getBoolean("proxyTargetClass"));
//设置执行优先级,默认最后执行
bpp.setOrder(this.enableAsync.<Integer>getNumber("order"));
return bpp;
}
}

ProxyAsyncConfiguration就两点:

  1. 就是继承了AbstractAsyncConfiguration类

  2. 定义了一个bean:AsyncAnnotationBeanPostProcessor

AbstractAsyncConfiguration

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
@Configuration
public abstract class AbstractAsyncConfiguration implements ImportAware {

//enableAsync的注解属性
@Nullable
protected AnnotationAttributes enableAsync;

//Doug Lea老李头设计的线程任务执行器
@Nullable
protected Supplier<Executor> executor;

//异常处理器
@Nullable
protected Supplier<AsyncUncaughtExceptionHandler> exceptionHandler;


@Override
public void setImportMetadata(AnnotationMetadata importMetadata) {
this.enableAsync = AnnotationAttributes.fromMap(
importMetadata.getAnnotationAttributes(EnableAsync.class.getName(), false));
if (this.enableAsync == null) {
throw new IllegalArgumentException(
"@EnableAsync is not present on importing class " + importMetadata.getClassName());
}
}

/**
* Collect any {@link AsyncConfigurer} beans through autowiring.
*/
@Autowired(required = false)
void setConfigurers(Collection<AsyncConfigurer> configurers) {
if (CollectionUtils.isEmpty(configurers)) {
return;
}
if (configurers.size() > 1) {
throw new IllegalStateException("Only one AsyncConfigurer may exist");
}
AsyncConfigurer configurer = configurers.iterator().next();
this.executor = configurer::getAsyncExecutor;
this.exceptionHandler = configurer::getAsyncUncaughtExceptionHandler;
}

}

属性:

1)注解属性

2)异步任务执行器

3)异常处理器

方法:

1)setImportMetadata 设置注解属性,即属性1

2)setConfigurers 设置异步任务执行器和异常处理器,即属性2,3

AsyncAnnotationBeanPostProcessor

目标对象在经过AsyncAnnotationBeanPostProcessor的postProcessAfterInitialization()方法后会返回一个代理对象替换元对象。先看一下AsyncAnnotationBeanPostProcessor的继承结构。

AbstractBeanFactoryAwareAdvisingPostProcessor实现了接口BeanFactoryAware的setBeanFactory()方法,而AsyncAnnotationBeanPostProcessor又重写了此方法,此方法的调用要早于接口BeanPostProcessor的postProcessAfterInitialization()方法,下面是AsyncAnnotationBeanPostProcessor.setBeanFactory()方法。

AOP-Advisor切面初始化

AsyncAnnotationBeanPostProcessor这个类的Bean 初始化时 : BeanFactoryAware接口setBeanFactory方法中,对AsyncAnnotationAdvisor异步注解切面进行了构造。

setBeanFactory
1
2
3
4
5
6
7
8
9
10
11
@Override
public void setBeanFactory(BeanFactory beanFactory) {
super.setBeanFactory(beanFactory);

AsyncAnnotationAdvisor advisor = new AsyncAnnotationAdvisor(this.executor, this.exceptionHandler);
if (this.asyncAnnotationType != null) {
advisor.setAsyncAnnotationType(this.asyncAnnotationType);
}
advisor.setBeanFactory(beanFactory);
this.advisor = advisor;
}

这个方法内实例化了一个AsyncAnnotationAdvisor并保存在了父类AbstractAdvisingBeanPostProcessor的成员变量advisor。AsyncAnnotationAdvisor是继承了AbstractPointcutAdvisor实现了getAdvice()和getPointcut()方法。这两个方法的返回值就是上面方法实例化AsyncAnnotationAdvisor就确定的。

AsyncAnnotationAdvisor

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
public class AsyncAnnotationAdvisor extends AbstractPointcutAdvisor implements BeanFactoryAware {

private Advice advice;

private Pointcut pointcut;


public AsyncAnnotationAdvisor() {
this((Supplier<Executor>) null, (Supplier<AsyncUncaughtExceptionHandler>) null);
}


@SuppressWarnings("unchecked")
public AsyncAnnotationAdvisor(
@Nullable Executor executor, @Nullable AsyncUncaughtExceptionHandler exceptionHandler) {

this(SingletonSupplier.ofNullable(executor), SingletonSupplier.ofNullable(exceptionHandler));
}


@SuppressWarnings("unchecked")
public AsyncAnnotationAdvisor(
@Nullable Supplier<Executor> executor, @Nullable Supplier<AsyncUncaughtExceptionHandler> exceptionHandler) {

Set<Class<? extends Annotation>> asyncAnnotationTypes = new LinkedHashSet<>(2);
//@Async和@Asynchronous是默认支持的
asyncAnnotationTypes.add(Async.class);
try {
asyncAnnotationTypes.add((Class<? extends Annotation>)
ClassUtils.forName("javax.ejb.Asynchronous", AsyncAnnotationAdvisor.class.getClassLoader()));
} catch (ClassNotFoundException ex) {
// If EJB 3.1 API not present, simply ignore.
}
//生成切面
this.advice = buildAdvice(executor, exceptionHandler);
//生成切点
this.pointcut = buildPointcut(asyncAnnotationTypes);
}


/**
*
* @param asyncAnnotationType the desired annotation type
* @Async的value 可以覆盖构造器中默认的注解类型
*/
public void setAsyncAnnotationType(Class<? extends Annotation> asyncAnnotationType) {
Assert.notNull(asyncAnnotationType, "'asyncAnnotationType' must not be null");
Set<Class<? extends Annotation>> asyncAnnotationTypes = new HashSet<>();
asyncAnnotationTypes.add(asyncAnnotationType);
this.pointcut = buildPointcut(asyncAnnotationTypes);
}

/**
* Set the {@code BeanFactory} to be used when looking up executors by qualifier.
*/
@Override
public void setBeanFactory(BeanFactory beanFactory) {
if (this.advice instanceof BeanFactoryAware) {
((BeanFactoryAware) this.advice).setBeanFactory(beanFactory);
}
}


@Override
public Advice getAdvice() {
return this.advice;
}

@Override
public Pointcut getPointcut() {
return this.pointcut;
}

/**
* 构建切面
* @return
*/
protected Advice buildAdvice(
@Nullable Supplier<Executor> executor, @Nullable Supplier<AsyncUncaughtExceptionHandler> exceptionHandler) {
//创建异步切面拦截器
AnnotationAsyncExecutionInterceptor interceptor = new AnnotationAsyncExecutionInterceptor(null);
//配置拦截器的执行器以及异常处理器
interceptor.configure(executor, exceptionHandler);
//返回拦截器
return interceptor;
}

/**
* Calculate a pointcut for the given async annotation types, if any.
* 构建切点表达式
*
* @param asyncAnnotationTypes the async annotation types to introspect
* @return the applicable Pointcut object, or {@code null} if none
*/
protected Pointcut buildPointcut(Set<Class<? extends Annotation>> asyncAnnotationTypes) {
ComposablePointcut result = null;
for (Class<? extends Annotation> asyncAnnotationType : asyncAnnotationTypes) {
Pointcut cpc = new AnnotationMatchingPointcut(asyncAnnotationType, true);
Pointcut mpc = new AnnotationMatchingPointcut(null, asyncAnnotationType, true);
if (result == null) {
result = new ComposablePointcut(cpc);
} else {
result.union(cpc);
}
result = result.union(mpc);
}
return (result != null ? result : Pointcut.TRUE);
}
}

buildAdvice()方法返回的是一个AnnotationAsyncExecutionInterceptor(),并传入AsyncAnnotationBeanPostProcessor的executor和exceptionHandler。接下来就看一下这个通知具体行为。继承结构如下:

可以看到AnnotationAsyncExecutionInterceptor是MethodInterceptor接口的实现类,下面看一下invoke()方法的实现:

AnnotationAsyncExecutionInterceptor
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
public class AsyncExecutionInterceptor extends AsyncExecutionAspectSupport implements MethodInterceptor, Ordered {


public AsyncExecutionInterceptor(@Nullable Executor defaultExecutor) {
super(defaultExecutor);
}

public AsyncExecutionInterceptor(@Nullable Executor defaultExecutor, AsyncUncaughtExceptionHandler exceptionHandler) {
//调用父类构造方法,在祖先类AsyncExecutionAspectSupport的同方法签名构造函数中,如果defaultExecutor为null,会创建默认的,如下面注释
//this.defaultExecutor = new SingletonSupplier<>(defaultExecutor, () -> getDefaultExecutor(this.beanFactory));
//this.exceptionHandler = SingletonSupplier.of(SimpleAsyncUncaughtExceptionHandler::new);
super(defaultExecutor, exceptionHandler);
}


/**
* 符合切入点的代理对象方法,切入点由buildPointcut()方法返回
*/
@Override
@Nullable
public Object invoke(final MethodInvocation invocation) throws Throwable {
Class<?> targetClass = (invocation.getThis() != null ? AopUtils.getTargetClass(invocation.getThis()) : null);
Method specificMethod = ClassUtils.getMostSpecificMethod(invocation.getMethod(), targetClass);
//原始定义方法
final Method userDeclaredMethod = BridgeMethodResolver.findBridgedMethod(specificMethod);
//决定使用哪个AsyncTaskExecutor
AsyncTaskExecutor executor = determineAsyncExecutor(userDeclaredMethod);
if (executor == null) {
throw new IllegalStateException(
"No executor specified and no default executor set on AsyncExecutionInterceptor either");
}

//将方法执行包装进Callable中,这样就可以使用线程池进行submit达到多线程异步执行的效果
Callable<Object> task = () -> {
try {
Object result = invocation.proceed();
if (result instanceof Future) {
return ((Future<?>) result).get();
}
} catch (ExecutionException ex) {
handleError(ex.getCause(), userDeclaredMethod, invocation.getArguments());
} catch (Throwable ex) {
handleError(ex, userDeclaredMethod, invocation.getArguments());
}
return null;
};

//父类AsyncExecutionAspectSupport方法,使用executor调度执行
return doSubmit(task, executor, invocation.getMethod().getReturnType());
}

@Override
@Nullable
protected String getExecutorQualifier(Method method) {
return null;
}

/**
* 重写父类AsyncExecutionAspectSupport
*/
@Override
@Nullable
protected Executor getDefaultExecutor(@Nullable BeanFactory beanFactory) {
Executor defaultExecutor = super.getDefaultExecutor(beanFactory);
return (defaultExecutor != null ? defaultExecutor : new SimpleAsyncTaskExecutor());
}

@Override
public int getOrder() {
return Ordered.HIGHEST_PRECEDENCE;
}
}
AsyncExecutionAspectSupport#determineAsyncExecutor
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
protected AsyncTaskExecutor determineAsyncExecutor(Method method) {
AsyncTaskExecutor executor = this.executors.get(method);
if (executor == null) {
Executor targetExecutor;
//如果取得方法或类上的@Async的value属性,然后从容器中找到同名的Executor类型的bean
String qualifier = getExecutorQualifier(method);
if (StringUtils.hasLength(qualifier)) {
targetExecutor = findQualifiedExecutor(this.beanFactory, qualifier);
}
//如果没有qualifier 使用默认的
else {
targetExecutor = this.defaultExecutor.get();
}
if (targetExecutor == null) {
return null;
}
//容器中的Executor可能不是TaskExecutor需要使用TaskExecutorAdapter包装一下
executor = (targetExecutor instanceof AsyncListenableTaskExecutor ?
(AsyncListenableTaskExecutor) targetExecutor : new TaskExecutorAdapter(targetExecutor));
this.executors.put(method, executor);
}
return executor;
}
AnnotationMatchingPointcut

buildPointcut()方法会结合类和方法上的asyncAnnotationType,当然默认是@Async和@Asynchronous,类和方法上一处出现异步注解就会应用通知。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
public class AnnotationMatchingPointcut implements Pointcut {

private final ClassFilter classFilter;

private final MethodMatcher methodMatcher;


public AnnotationMatchingPointcut(Class<? extends Annotation> classAnnotationType) {
this(classAnnotationType, false);
}

public AnnotationMatchingPointcut(Class<? extends Annotation> classAnnotationType, boolean checkInherited) {
this.classFilter = new AnnotationClassFilter(classAnnotationType, checkInherited);
this.methodMatcher = MethodMatcher.TRUE;
}


public AnnotationMatchingPointcut(@Nullable Class<? extends Annotation> classAnnotationType,
@Nullable Class<? extends Annotation> methodAnnotationType) {

this(classAnnotationType, methodAnnotationType, false);
}


public AnnotationMatchingPointcut(@Nullable Class<? extends Annotation> classAnnotationType,
@Nullable Class<? extends Annotation> methodAnnotationType, boolean checkInherited) {

Assert.isTrue((classAnnotationType != null || methodAnnotationType != null),
"Either Class annotation type or Method annotation type needs to be specified (or both)");

if (classAnnotationType != null) {
//类上有@Async返回ture,支持继承的@Async
this.classFilter = new AnnotationClassFilter(classAnnotationType, checkInherited);
}
else {
this.classFilter = new AnnotationCandidateClassFilter(methodAnnotationType);
}

if (methodAnnotationType != null) {
//方法级别有@Async返回true
this.methodMatcher = new AnnotationMethodMatcher(methodAnnotationType, checkInherited);
}
else {
this.methodMatcher = MethodMatcher.TRUE;
}
}


@Override
public ClassFilter getClassFilter() {
return this.classFilter;
}

@Override
public MethodMatcher getMethodMatcher() {
return this.methodMatcher;
}

@Override
public boolean equals(@Nullable Object other) {
if (this == other) {
return true;
}
if (!(other instanceof AnnotationMatchingPointcut)) {
return false;
}
AnnotationMatchingPointcut otherPointcut = (AnnotationMatchingPointcut) other;
return (this.classFilter.equals(otherPointcut.classFilter) &&
this.methodMatcher.equals(otherPointcut.methodMatcher));
}

@Override
public int hashCode() {
return this.classFilter.hashCode() * 37 + this.methodMatcher.hashCode();
}

@Override
public String toString() {
return "AnnotationMatchingPointcut: " + this.classFilter + ", " + this.methodMatcher;
}


public static AnnotationMatchingPointcut forClassAnnotation(Class<? extends Annotation> annotationType) {
Assert.notNull(annotationType, "Annotation type must not be null");
return new AnnotationMatchingPointcut(annotationType);
}

public static AnnotationMatchingPointcut forMethodAnnotation(Class<? extends Annotation> annotationType) {
Assert.notNull(annotationType, "Annotation type must not be null");
return new AnnotationMatchingPointcut(null, annotationType);
}


private static class AnnotationCandidateClassFilter implements ClassFilter {

private final Class<? extends Annotation> annotationType;

AnnotationCandidateClassFilter(Class<? extends Annotation> annotationType) {
this.annotationType = annotationType;
}

@Override
public boolean matches(Class<?> clazz) {
return AnnotationUtils.isCandidateClass(clazz, this.annotationType);
}

@Override
public boolean equals(Object obj) {
if (this == obj) {
return true;
}
if (!(obj instanceof AnnotationCandidateClassFilter)) {
return false;
}
AnnotationCandidateClassFilter that = (AnnotationCandidateClassFilter) obj;
return this.annotationType.equals(that.annotationType);
}

@Override
public int hashCode() {
return this.annotationType.hashCode();
}

@Override
public String toString() {
return getClass().getName() + ": " + this.annotationType;
}
}
}

AOP-生成代理类AopProxy

(AsyncAnnotationBeanPostProcessor -》postProcessAfterInitialization())

具体的后置处理:AsyncAnnotationBeanPostProcessor的后置bean处理是通过其父类AbstractAdvisingBeanPostProcessor来实现的,

该类实现了BeanPostProcessor接口,复写postProcessAfterInitialization方法如下图所示:

AbstractAdvisingBeanPostProcessor#postProcessAfterInitialization

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
@Override
public Object postProcessAfterInitialization(Object bean, String beanName) {
if (this.advisor == null || bean instanceof AopInfrastructureBean) {
// Ignore AOP infrastructure such as scoped proxies.
return bean;
}

//如果bean已经到Spring AOP代理对象,则直接将此通知器添加到拦截器链
if (bean instanceof Advised) {
Advised advised = (Advised) bean;
if (!advised.isFrozen() && isEligible(AopUtils.getTargetClass(bean))) {
// Add our local Advisor to the existing proxy's Advisor chain...
if (this.beforeExistingAdvisors) {
advised.addAdvisor(0, this.advisor);
}
else {
advised.addAdvisor(this.advisor);
}
return bean;
}
}

//符合被代理的条件
//构造ProxyFactory代理工厂,添加代理的接口,设置切面,最后返回代理类:AopProxy
if (isEligible(bean, beanName)) {
//使用子类的方法创建一个代理工厂
ProxyFactory proxyFactory = prepareProxyFactory(bean, beanName);
if (!proxyFactory.isProxyTargetClass()) {
evaluateProxyInterfaces(bean.getClass(), proxyFactory);
}
proxyFactory.addAdvisor(this.advisor);
customizeProxyFactory(proxyFactory);
//创建代理对象
return proxyFactory.getProxy(getProxyClassLoader());
}

// No proxy needed.
return bean;
}

isEligible用于判断这个类或者这个类中的某个方法是否含有注解,AsyncAnnotationAdvisor 实现了PointcutAdvisor接口。

AbstractBeanFactoryAwareAdvisingPostProcessor#isEligible
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
protected boolean isEligible(Object bean, String beanName) {
return isEligible(bean.getClass());
}

protected boolean isEligible(Class<?> targetClass) {
Boolean eligible = this.eligibleBeans.get(targetClass);
if (eligible != null) {
return eligible;
}
if (this.advisor == null) {
return false;
}
//切入点决定
eligible = AopUtils.canApply(this.advisor, targetClass);
this.eligibleBeans.put(targetClass, eligible);
return eligible;
}

isEligible校验通过后,构造ProxyFactory代理工厂,添加代理的接口,设置切面,最后返回代理类:AopProxy接口实现类

canApply
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
public static boolean canApply(Advisor advisor, Class<?> targetClass) {
return canApply(advisor, targetClass, false);
}

public static boolean canApply(Advisor advisor, Class<?> targetClass, boolean hasIntroductions) {
if (advisor instanceof IntroductionAdvisor) {
return ((IntroductionAdvisor) advisor).getClassFilter().matches(targetClass);
//满足第二分支PointcutAdvisor
} else if (advisor instanceof PointcutAdvisor) {
PointcutAdvisor pca = (PointcutAdvisor) advisor;
return canApply(pca.getPointcut(), targetClass, hasIntroductions);
} else {
// It doesn't have a pointcut so we assume it applies.
return true;
}
}

isEligible校验通过后,构造ProxyFactory代理工厂,添加代理的接口,设置切面,最后返回代理类:AopProxy接口实现类

AOP-切点执行

上一步生成的代理AopProxy接口,我们这里最终实际生成的是JdkDynamicAopProxy,即JDK动态代理类,类图如下:

最终执行的是InvocationHandler接口的invoke方法,下面是截取出来的核心代码:

invoke

又进入了JDK动态代理的核心类JdkDynamicAopProxy#invok

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
@Override
@Nullable
public Object invoke(Object proxy, Method method, Object[] args) throws Throwable {
Object oldProxy = null;
boolean setProxyContext = false;

// 1.advised就是proxyFactory,而targetSource持有被代理对象的引用
//从代理工厂中拿到TargetSource对象,该对象包装了被代理实例bean
TargetSource targetSource = this.advised.targetSource;
Object target = null;

try {
//被代理对象的equals方法和hashCode方法是不能被代理的,不会走切面
if (!this.equalsDefined && AopUtils.isEqualsMethod(method)) {
// The target does not implement the equals(Object) method itself.
// 目标不实现equals(Object)方法本身。
return equals(args[0]);
} else if (!this.hashCodeDefined && AopUtils.isHashCodeMethod(method)) {
// The target does not implement the hashCode() method itself.
return hashCode();
} else if (method.getDeclaringClass() == DecoratingProxy.class) {
// There is only getDecoratedClass() declared -> dispatch to proxy config.
// 只有getDecoratedClass()声明 - > dispatch到代理配置。
return AopProxyUtils.ultimateTargetClass(this.advised);
} else if (!this.advised.opaque && method.getDeclaringClass().isInterface() &&
method.getDeclaringClass().isAssignableFrom(Advised.class)) {
// Service invocations on ProxyConfig with the proxy config...
// ProxyConfig上的服务调用与代理配置...
return AopUtils.invokeJoinpointUsingReflection(this.advised, method, args);
}

Object retVal;

// 有时候目标对象内部的自我调用将无法实施切面中的增强则需要通过此属性暴露代理
if (this.advised.exposeProxy) {
// Make invocation available if necessary.
oldProxy = AopContext.setCurrentProxy(proxy);
setProxyContext = true;
}

// Get as late as possible to minimize the time we "own" the target,
// in case it comes from a pool.
// 2.拿到我们被代理的对象实例
//这个target就是被代理实例
target = targetSource.getTarget();
Class<?> targetClass = (target != null ? target.getClass() : null);

// Get the interception chain for this method.
// 3.获取拦截器链:例如使用@Around注解时会找到AspectJAroundAdvice,还有ExposeInvocationInterceptor
//从代理工厂中拿过滤器链 Object是一个MethodInterceptor类型的对象,其实就是一个advice对象
List<Object> chain = this.advised.getInterceptorsAndDynamicInterceptionAdvice(method, targetClass);

// Check whether we have any advice. If we don't, we can fallback on direct
// reflective invocation of the target, and avoid creating a MethodInvocation.
// 4.检查我们是否有任何拦截器(advice)。 如果没有,直接反射调用目标,并避免创建MethodInvocation。
//如果该方法没有执行链,则说明这个方法不需要被拦截,则直接反射调用
if (chain.isEmpty()) {
// We can skip creating a MethodInvocation: just invoke the target directly
// Note that the final invoker must be an InvokerInterceptor so we know it does
// nothing but a reflective operation on the target, and no hot swapping or fancy proxying.
Object[] argsToUse = AopProxyUtils.adaptArgumentsIfNecessary(method, args);
// 5.不存在拦截器链,则直接进行反射调用
retVal = AopUtils.invokeJoinpointUsingReflection(target, method, argsToUse);
} else {
// 6.如果存在拦截器,则创建一个ReflectiveMethodInvocation:代理对象、被代理对象、方法、参数、
// 被代理对象的Class、拦截器链作为参数创建ReflectiveMethodInvocation
// We need to create a method invocation...
MethodInvocation invocation =
new ReflectiveMethodInvocation(proxy, target, method, args, targetClass, chain);
// Proceed to the joinpoint through the interceptor chain.
// 7.触发ReflectiveMethodInvocation的执行方法
retVal = invocation.proceed();
}

// Massage return value if necessary.
// 8.必要时转换返回值
Class<?> returnType = method.getReturnType();
if (retVal != null && retVal == target &&
returnType != Object.class && returnType.isInstance(proxy) &&
!RawTargetAccess.class.isAssignableFrom(method.getDeclaringClass())) {
// Special case: it returned "this" and the return type of the method
// is type-compatible. Note that we can't help if the target sets
// a reference to itself in another returned object.
retVal = proxy;
} else if (retVal == null && returnType != Void.TYPE && returnType.isPrimitive()) {
throw new AopInvocationException(
"Null return value from advice does not match primitive return type for: " + method);
}
return retVal;
} finally {
if (target != null && !targetSource.isStatic()) {
// Must have come from TargetSource.
targetSource.releaseTarget(target);
}
if (setProxyContext) {
// Restore old proxy.
AopContext.setCurrentProxy(oldProxy);
}
}
}

@Async注解的拦截器是AsyncExecutionInterceptor,它继承了MethodInterceptor接口。而MethodInterceptor就是AOP规范中的Advice(切点的处理器)。

chain不为空,执行第二个分支,构造ReflectiveMethodInvocation,然后执行proceed方法。

后面的代码AOP已经讲过不在赘述。

核心方法是InterceptorAndDynamicMethodMatcher.interceptor.invoke(this),实际就是执行了AsyncExecutionInterceptor.invoke,继续追!

AsyncExecutionInterceptor#invoke

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
@Override
@Nullable
public Object invoke(final MethodInvocation invocation) throws Throwable {
Class<?> targetClass = (invocation.getThis() != null ? AopUtils.getTargetClass(invocation.getThis()) : null);
Method specificMethod = ClassUtils.getMostSpecificMethod(invocation.getMethod(), targetClass);
//原始定义方法
final Method userDeclaredMethod = BridgeMethodResolver.findBridgedMethod(specificMethod);
//决定使用哪个AsyncTaskExecutor
AsyncTaskExecutor executor = determineAsyncExecutor(userDeclaredMethod);
if (executor == null) {
//如果没有自定义异步任务执行器,报下面这行错,不用管,可以默认执行
throw new IllegalStateException(
"No executor specified and no default executor set on AsyncExecutionInterceptor either");
}

//将方法执行包装进Callable中,这样就可以使用线程池进行submit达到多线程异步执行的效果
Callable<Object> task = () -> {
try {
//调用具体方法
Object result = invocation.proceed();
if (result instanceof Future) {
//阻塞等待执行完毕得到结果
return ((Future<?>) result).get();
}
} catch (ExecutionException ex) {
handleError(ex.getCause(), userDeclaredMethod, invocation.getArguments());
} catch (Throwable ex) {
handleError(ex, userDeclaredMethod, invocation.getArguments());
}
return null;
};

//父类AsyncExecutionAspectSupport方法,使用executor调度执行
//提交有任务给执行器
return doSubmit(task, executor, invocation.getMethod().getReturnType());
}
doSubmit
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
@Nullable
protected Object doSubmit(Callable<Object> task, AsyncTaskExecutor executor, Class<?> returnType) {
//如果返回类型是CompletableFuture
if (CompletableFuture.class.isAssignableFrom(returnType)) {
//使用CompletableFuture异步方式调用
return CompletableFuture.supplyAsync(() -> {
try {
return task.call();
} catch (Throwable ex) {
throw new CompletionException(ex);
}
}, executor);
//如果返回方式是ListenableFuture
} else if (ListenableFuture.class.isAssignableFrom(returnType)) {
return ((AsyncListenableTaskExecutor) executor).submitListenable(task);
//如果返回类型是Future
} else if (Future.class.isAssignableFrom(returnType)) {
//交给线程池执行
return executor.submit(task);
} else {
//其他情况都交给线程池执行
executor.submit(task);
return null;
}
}

总结

整体流程大体可梳理为两条线:

  1. 从注解开始:@EnableAsync–》ProxyAsyncConfiguration类构造一个bean(类型:AsyncAnnotationBeanPostProcessor)

  2. 从AsyncAnnotationBeanPostProcessor这个类的bean的生命周期走:AOP-Advisor切面初始化(setBeanFactory())–>AOP-生成代理类AopProxy(postProcessAfterInitialization())–>AOP-切点执行(InvocationHandler.invoke)

评论