SpringBoot中的消息事件机制

SpringBoot中的消息事件机制

Spring本身提供了这种事件发布订阅的模型。基于同一个JVM进程,可以在一个地方发布消息,在另一个地方处理消息。并不依赖MQ

基于js的发布订阅模式的简单实现

class Publisher {
	constructor(){	
		this.listener = new Map();
	}
	listen(event, handler){
		const queue = this.listener.get(event);
		if (queue){
			queue.push(handler);
		} else {
			this.listener.set(event, [handler]);
		}
	}
	publish(event, attach){
		const queue = this.listener.get(event);
		if (queue){
			for (let handler of queue){
				handler(attach);
			}
		}
	}
}

const publisher = new Publisher();

publisher.listen('foo', (attach) => {
	console.log(`我是foo1,我处理了${attach}`);
});
publisher.listen('foo', (attach) => {
	console.log(`我是foo2,我处理了${attach}`);
});
publisher.listen('none', (attach) => {
	console.log(`我是none,我处理了${attach}`);
});

publisher.publish('foo', 'HelloWorld!');

//我是foo1,我处理了HelloWorld!
//我是foo2,我处理了HelloWorld!

事件发布接口 ApplicationEventPublisher

ApplicationContext 实现了该接口,可以用于发布事件

@FunctionalInterface
public interface ApplicationEventPublisher {
	default void publishEvent(ApplicationEvent event) {
		publishEvent((Object) event);
	}
	void publishEvent(Object event);
}

事件接口 ApplicationEvent

它继承自jdk的事件对象,在原来的基础上添加了一个时间戳属性。

public abstract class ApplicationEvent extends EventObject {
	private static final long serialVersionUID = 7099057708183571937L;
	private final long timestamp;
	public ApplicationEvent(Object source) {
		super(source);
		this.timestamp = System.currentTimeMillis();
	}
	public final long getTimestamp() {
		return this.timestamp;
	}
}

监听事件的接口

用于处理用户发布的事件。继承自jdk的事件监听接口(标记接口)

@FunctionalInterface
public interface ApplicationListener<E extends ApplicationEvent> extends EventListener {
	void onApplicationEvent(E event);
}

一个事件发布订阅流程

  1. 自定义事件对象, 继承类 ApplicationEvent
  2. 自定义事件监听器, 实现接口 ApplicationListener 泛型参数就是要监听的事件对象
  3. 使用 ApplicationEventPublisher 实现发布一个事件对象
  4. 对应的监听器会得到执行
    • 如果存在多个监听, 则多个监听都会执行
    • 默认执行的线程,就是当前的发布线程(非异步)

实战

定义事件发布类

本质上是使用了 ApplicationContext 方法进行事件对象的发布

@Component
public class Publisher {
	
	@Autowired
	private ApplicationContext applicationContext;
	
	public void publish(ApplicationEvent applicationEvent) {
		this.applicationContext.publishEvent(applicationEvent);
	}
}

定义一个事件对象

public class MyEvent extends ApplicationEvent {
	
	private static final long serialVersionUID = -8457920014906487987L;

	public MyEvent(Object source) {
		super(source);
	}
}

定义一个(或者多个)专门处理该事件对象的监听器

通过接口的发型,来决定要监听的事件对象

@Component
public class MyEventListener1 implements ApplicationListener<MyEvent>{

	private static final Logger LOGGER = LoggerFactory.getLogger(MyEventListener1.class);
	
	@Override
	public void onApplicationEvent(MyEvent event) {
		LOGGER.info("我是监听器1,我收到了事件:{}", event.getSource());
	}
}

@Component
public class MyEventListener2 implements ApplicationListener<MyEvent>{

	private static final Logger LOGGER = LoggerFactory.getLogger(MyEventListener2.class);
	
	@Override
	public void onApplicationEvent(MyEvent event) {
		LOGGER.info("我是监听器2,我收到了事件:{}", event.getSource());
	}
}

测试事件的发布

@RunWith(SpringRunner.class)
@SpringBootTest(classes = Application.class)
public class ApplicationTest {
	
	@Autowired
	private Publisher publisher;
	
	@Test
	public void test() {
		publisher.publish(new MyEvent("Hello World!"));
	}
}

成功的监听到了事件的发布

2019-09-19 10:35:11.333  INFO 7672 --- [           main] io.springboot.event.MyEventListener1     : 我是监听器1,我收到了事件:Hello World!
2019-09-19 10:35:11.335  INFO 7672 --- [           main] io.springboot.event.MyEventListener2     : 我是监听器2,我收到了事件:Hello World!

使用注解驱动的方式来监听事件

对于一个事件,如果需要存在多个监听器(ApplicationListener)。通过定义多个ApplicationListener 就会显得很繁琐。可以通过注解的方式,来监听事件

@EventListener

@Target({ElementType.METHOD, ElementType.ANNOTATION_TYPE})
@Retention(RetentionPolicy.RUNTIME)
@Documented
public @interface EventListener {
	@AliasFor("classes")
	Class<?>[] value() default {};

	@AliasFor("value")
	Class<?>[] classes() default {};

	String condition() default "";
}
  • value/classes 指定监听的事件对象。
  • condition 一个SpEL表达式,可以用于动态的条件判断,当前监听器是否要执行。

通过方法 + 注解定义监听器

@Component
public class Listeners {
	
	private static final Logger LOGGER = LoggerFactory.getLogger(Listeners.class);
	
	@EventListener(value = MyEvent.class)
	public void listener1(MyEvent myEvent) {
		LOGGER.info("我是注解监听器1,我处理了事件:{}", myEvent.getSource());
	}
	
	@EventListener(value = MyEvent.class)
	public void listener2(MyEvent myEvent) {
		LOGGER.info("我是注解监听器2,我处理了事件:{}", myEvent.getSource());
	}
	
	/**
	 * 只有在 source == 'Hello' 才会执行
	 * @param myEvent
	 */
	@EventListener(value = MyEvent.class, condition = "#myEvent.source eq 'Hello'")
	public void listener3(MyEvent myEvent) {
		LOGGER.info("我是注解监听器3,我处理了事件:{}", myEvent.getSource());
	}
}

测试

@RunWith(SpringRunner.class)
@SpringBootTest(classes = Application.class)
public class ApplicationTest {
	
	@Autowired
	private Publisher publisher;
	
	@Test
	public void test() {
		// 监听器1,2都会执行
		publisher.publish(new MyEvent("Hello World!"));
		// 监听器1,2,3都会执行
		publisher.publish(new MyEvent("Hello"));
	}
}

2019-09-19 10:52:46.059  INFO 7916 --- [           main] io.springboot.event.Listeners            : 我是注解监听器1,我处理了事件:Hello World!
2019-09-19 10:52:46.061  INFO 7916 --- [           main] io.springboot.event.Listeners            : 我是注解监听器2,我处理了事件:Hello World!
2019-09-19 10:52:46.146  INFO 7916 --- [           main] io.springboot.event.Listeners            : 我是注解监听器1,我处理了事件:Hello
2019-09-19 10:52:46.146  INFO 7916 --- [           main] io.springboot.event.Listeners            : 我是注解监听器2,我处理了事件:Hello
2019-09-19 10:52:46.146  INFO 7916 --- [           main] io.springboot.event.Listeners            : 我是注解监听器3,我处理了事件:Hello

注解驱动的方式,还支持根据泛型参数来监听事件对象

系统发布的多个泛型事件, 只有与该监听方法想匹配的泛型事件对象才会执行

@EventListener(value = MyEvent.class)
public void listener2(MyEvent<User> myEvent) {
    // 只会处理MyEvent的泛型是User的事件对象
	LOGGER.info("我是注解监听器2,我处理了事件:{}", myEvent.getSource());
}

事件的传播

可以在事件方法内, 主动的调用方法来发送新的事件

applicationContext.publishEvent(new MyEvent("Hello"));

注解驱动的方法,也可以返回一个事件对象 ApplicationEvent 来广播新的事件

也可以返回一个事件对象数组, 来广播多个

@EventListener(value = MyEvent.class)
@Async
public MyEvent listener1(MyEvent myEvent) {
	LOGGER.info("我是注解监听器1,我处理了事件:{}", myEvent.getSource());
	
	// 返回一个事件对象,并且广播
	return new MyEvent("Hello");
}

千万要注意, 如果返回的事件对象, 还可以被当前的事件方法捕获的话, 那么就会产生一个不断发布, 出现死循环。

异步事件

事件的处理默认是同步,阻塞的。如果有需要可以配合 @Async 来实现异步的事件处理。
至于对 @Async 不了解的同学,可以先通过搜索引擎了解

直接在执行方法上添加 @Async 注解就可以了,不管是注解驱动,还是实现 ApplicationListener 都一样

@EventListener(value = MyEvent.class)
@Async
public void listener1(MyEvent myEvent) {
	LOGGER.info("我是注解监听器1,我处理了事件:{}", myEvent.getSource());
}