SpringBoot中的消息事件机制
Spring本身提供了这种事件发布订阅的模型。基于同一个JVM进程,可以在一个地方发布消息,在另一个地方处理消息。并不依赖MQ。
基于js的发布订阅模式的简单实现
class Publisher {
constructor(){
this.listener = new Map();
}
listen(event, handler){
const queue = this.listener.get(event);
if (queue){
queue.push(handler);
} else {
this.listener.set(event, [handler]);
}
}
publish(event, attach){
const queue = this.listener.get(event);
if (queue){
for (let handler of queue){
handler(attach);
}
}
}
}
const publisher = new Publisher();
publisher.listen('foo', (attach) => {
console.log(`我是foo1,我处理了${attach}`);
});
publisher.listen('foo', (attach) => {
console.log(`我是foo2,我处理了${attach}`);
});
publisher.listen('none', (attach) => {
console.log(`我是none,我处理了${attach}`);
});
publisher.publish('foo', 'HelloWorld!');
//我是foo1,我处理了HelloWorld!
//我是foo2,我处理了HelloWorld!
事件发布接口 ApplicationEventPublisher
ApplicationContext 实现了该接口,可以用于发布事件
@FunctionalInterface
public interface ApplicationEventPublisher {
default void publishEvent(ApplicationEvent event) {
publishEvent((Object) event);
}
void publishEvent(Object event);
}
事件接口 ApplicationEvent
它继承自jdk的事件对象,在原来的基础上添加了一个时间戳属性。
public abstract class ApplicationEvent extends EventObject {
private static final long serialVersionUID = 7099057708183571937L;
private final long timestamp;
public ApplicationEvent(Object source) {
super(source);
this.timestamp = System.currentTimeMillis();
}
public final long getTimestamp() {
return this.timestamp;
}
}
监听事件的接口
用于处理用户发布的事件。继承自jdk的事件监听接口(标记接口)
@FunctionalInterface
public interface ApplicationListener<E extends ApplicationEvent> extends EventListener {
void onApplicationEvent(E event);
}
一个事件发布订阅流程
- 自定义事件对象, 继承类 ApplicationEvent
- 自定义事件监听器, 实现接口 ApplicationListener 泛型参数就是要监听的事件对象
- 使用 ApplicationEventPublisher 实现发布一个事件对象
- 对应的监听器会得到执行
- 如果存在多个监听, 则多个监听都会执行
- 默认执行的线程,就是当前的发布线程(非异步)
实战
定义事件发布类
本质上是使用了 ApplicationContext 方法进行事件对象的发布
@Component
public class Publisher {
@Autowired
private ApplicationContext applicationContext;
public void publish(ApplicationEvent applicationEvent) {
this.applicationContext.publishEvent(applicationEvent);
}
}
定义一个事件对象
public class MyEvent extends ApplicationEvent {
private static final long serialVersionUID = -8457920014906487987L;
public MyEvent(Object source) {
super(source);
}
}
定义一个(或者多个)专门处理该事件对象的监听器
通过接口的发型,来决定要监听的事件对象
@Component
public class MyEventListener1 implements ApplicationListener<MyEvent>{
private static final Logger LOGGER = LoggerFactory.getLogger(MyEventListener1.class);
@Override
public void onApplicationEvent(MyEvent event) {
LOGGER.info("我是监听器1,我收到了事件:{}", event.getSource());
}
}
@Component
public class MyEventListener2 implements ApplicationListener<MyEvent>{
private static final Logger LOGGER = LoggerFactory.getLogger(MyEventListener2.class);
@Override
public void onApplicationEvent(MyEvent event) {
LOGGER.info("我是监听器2,我收到了事件:{}", event.getSource());
}
}
测试事件的发布
@RunWith(SpringRunner.class)
@SpringBootTest(classes = Application.class)
public class ApplicationTest {
@Autowired
private Publisher publisher;
@Test
public void test() {
publisher.publish(new MyEvent("Hello World!"));
}
}
成功的监听到了事件的发布
2019-09-19 10:35:11.333 INFO 7672 --- [ main] io.springboot.event.MyEventListener1 : 我是监听器1,我收到了事件:Hello World!
2019-09-19 10:35:11.335 INFO 7672 --- [ main] io.springboot.event.MyEventListener2 : 我是监听器2,我收到了事件:Hello World!
使用注解驱动的方式来监听事件
对于一个事件,如果需要存在多个监听器(ApplicationListener)。通过定义多个ApplicationListener 就会显得很繁琐。可以通过注解的方式,来监听事件
@EventListener
@Target({ElementType.METHOD, ElementType.ANNOTATION_TYPE})
@Retention(RetentionPolicy.RUNTIME)
@Documented
public @interface EventListener {
@AliasFor("classes")
Class<?>[] value() default {};
@AliasFor("value")
Class<?>[] classes() default {};
String condition() default "";
}
- value/classes 指定监听的事件对象。
- condition 一个
SpEL
表达式,可以用于动态的条件判断,当前监听器是否要执行。
通过方法 + 注解定义监听器
@Component
public class Listeners {
private static final Logger LOGGER = LoggerFactory.getLogger(Listeners.class);
@EventListener(value = MyEvent.class)
public void listener1(MyEvent myEvent) {
LOGGER.info("我是注解监听器1,我处理了事件:{}", myEvent.getSource());
}
@EventListener(value = MyEvent.class)
public void listener2(MyEvent myEvent) {
LOGGER.info("我是注解监听器2,我处理了事件:{}", myEvent.getSource());
}
/**
* 只有在 source == 'Hello' 才会执行
* @param myEvent
*/
@EventListener(value = MyEvent.class, condition = "#myEvent.source eq 'Hello'")
public void listener3(MyEvent myEvent) {
LOGGER.info("我是注解监听器3,我处理了事件:{}", myEvent.getSource());
}
}
测试
@RunWith(SpringRunner.class)
@SpringBootTest(classes = Application.class)
public class ApplicationTest {
@Autowired
private Publisher publisher;
@Test
public void test() {
// 监听器1,2都会执行
publisher.publish(new MyEvent("Hello World!"));
// 监听器1,2,3都会执行
publisher.publish(new MyEvent("Hello"));
}
}
2019-09-19 10:52:46.059 INFO 7916 --- [ main] io.springboot.event.Listeners : 我是注解监听器1,我处理了事件:Hello World!
2019-09-19 10:52:46.061 INFO 7916 --- [ main] io.springboot.event.Listeners : 我是注解监听器2,我处理了事件:Hello World!
2019-09-19 10:52:46.146 INFO 7916 --- [ main] io.springboot.event.Listeners : 我是注解监听器1,我处理了事件:Hello
2019-09-19 10:52:46.146 INFO 7916 --- [ main] io.springboot.event.Listeners : 我是注解监听器2,我处理了事件:Hello
2019-09-19 10:52:46.146 INFO 7916 --- [ main] io.springboot.event.Listeners : 我是注解监听器3,我处理了事件:Hello
注解驱动的方式,还支持根据泛型参数来监听事件对象
系统发布的多个泛型事件, 只有与该监听方法想匹配的泛型事件对象才会执行
@EventListener(value = MyEvent.class)
public void listener2(MyEvent<User> myEvent) {
// 只会处理MyEvent的泛型是User的事件对象
LOGGER.info("我是注解监听器2,我处理了事件:{}", myEvent.getSource());
}
事件的传播
可以在事件方法内, 主动的调用方法来发送新的事件
applicationContext.publishEvent(new MyEvent("Hello"));
注解驱动的方法,也可以返回一个事件对象 ApplicationEvent 来广播新的事件
也可以返回一个事件对象数组, 来广播多个
@EventListener(value = MyEvent.class)
@Async
public MyEvent listener1(MyEvent myEvent) {
LOGGER.info("我是注解监听器1,我处理了事件:{}", myEvent.getSource());
// 返回一个事件对象,并且广播
return new MyEvent("Hello");
}
千万要注意, 如果返回的事件对象, 还可以被当前的事件方法捕获的话, 那么就会产生一个不断发布, 出现死循环。
异步事件
事件的处理默认是同步,阻塞的。如果有需要可以配合 @Async 来实现异步的事件处理。
至于对 @Async 不了解的同学,可以先通过搜索引擎了解
直接在执行方法上添加 @Async 注解就可以了,不管是注解驱动,还是实现 ApplicationListener 都一样
@EventListener(value = MyEvent.class)
@Async
public void listener1(MyEvent myEvent) {
LOGGER.info("我是注解监听器1,我处理了事件:{}", myEvent.getSource());
}