一、说明
Spring Event的应用场景主要是在实际业务开发过程中,当业务逻辑变得复杂,核心业务和多个子业务交织在一起时。为了避免代码过长和耦合度过高,Spring Event提供了一种观察者设计模式,允许一个Bean在完成任务后通知其他Bean,或者一个Bean观察并监听另一个Bean的行为。这种模式适用于需要在一次请求中同步完成多个业务场景,例如邮件发送、短信发送等。
业务场景示例:
主流程中,需要推送数据给外围平台或者发送短信等,为了降低代码耦合度,减少调用时间(异步),可以该技术来实现。
二、代码示例
1. 自定义事件
import org.springframework.context.ApplicationEvent;
/**
* @desc 业务创建事件
*/
public class BusiCreateEvent extends ApplicationEvent {
private String message;
public String getMessage() {
return message;
}
public void setMessage(String message) {
this.message = message;
}
public BusiCreateEvent(Object source, String message) {
super(source);
this.message = message;
}
}
2. 事件监听器
通常根据业务动作会创建多个监听器,每个监听器完成一个动作,彼此互不干扰
事件监听类1:
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.context.ApplicationListener;
import org.springframework.scheduling.annotation.Async;
import org.springframework.stereotype.Component;
/**
* 事件监听类
*/
@Component
public class BusiCreateSmsEventListener implements ApplicationListener<BusiCreateEvent> {
private Logger logger = LoggerFactory.getLogger(getClass());
@Override
public void onApplicationEvent(BusiCreateEvent event) {
System.out.println("\n\n\n---------------------------------------");
System.out.println("Listener One Thread:"+Thread.currentThread());
try {
Thread.sleep(3000);
} catch (InterruptedException e) {
e.printStackTrace();
}
System.out.println("Listener One Result:"+event.getMessage());
System.out.println("=========================================\n\n\n");
}
}
事件监听类2:
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.context.ApplicationListener;
import org.springframework.scheduling.annotation.Async;
import org.springframework.stereotype.Component;
/**
* 事件监听类2
*/
@Component
public class BusiCreateApiEventListener implements ApplicationListener<BusiCreateEvent> {
private Logger logger = LoggerFactory.getLogger(getClass());
@Override
public void onApplicationEvent(BusiCreateEvent event) {
System.out.println("\n\n\n---------------------------------------");
System.out.println("Listener Two Thread:"+Thread.currentThread());
try {
Thread.sleep(5000);
} catch (InterruptedException e) {
e.printStackTrace();
}
System.out.println("Listener Two Result:"+event.getMessage());
System.out.println("=========================================\n\n\n");
}
}
3. 事件发布
/**
* 事件发布类
*/
@Component
public class BusiCreateEventPublish {
@Autowired
private ApplicationEventPublisher eventPublisher;
public void publish(String message){
BusiCreateEvent event = new BusiCreateEvent(this, message);
eventPublisher.publishEvent(event);
}
}
可以在适当的位置调用事件发布,例如在controller中,发布事件触发后,监听都会收到该事件
import cn.hutool.core.lang.UUID;
import fastboot.core.comn.event.BusiCreateEventPublish;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.web.bind.annotation.GetMapping;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RestController;
@RestController
@RequestMapping("/event/publish")
public class EventPublishController {
private final Logger logger = LoggerFactory.getLogger(getClass());
@Autowired
BusiCreateEventPublish customEventPublish;
@GetMapping("/busiCreate")
public Boolean busiCreate(){
customEventPublish.publish(UUID.fastUUID().toString());
System.out.println("Publish OK");
return true;
}
}
事件发布后,监听器默认使用主线程,进行同步执行,在实际场景中几乎都需要异步执行,可通过代码设置实现
4. 事件全局异步设置(所有监听器异步)
配置异步线程,并将事件设置为异步
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.context.event.SimpleApplicationEventMulticaster;
import org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor;
import java.util.concurrent.Executor;
/**
* @desc Spring Event事件通知支持多线程
*/
@Configuration
public class EventAsyncConfig {
/**
* 配置多线程
* @return
*/
@Bean
public Executor taskExecutor() {
ThreadPoolTaskExecutor taskExecutor = new ThreadPoolTaskExecutor();
taskExecutor.setCorePoolSize(10); //线程池活跃的线程数
taskExecutor.setMaxPoolSize(30); //线程池最大活跃的线程数
taskExecutor.setWaitForTasksToCompleteOnShutdown(true);
taskExecutor.setThreadNamePrefix("event-");
taskExecutor.initialize();
return taskExecutor;
}
/**
* 设置事件使用异步
* @return
*/
@Bean
public SimpleApplicationEventMulticaster applicationEventMulticaster() {
SimpleApplicationEventMulticaster simpleApplicationEventMulticaster = new SimpleApplicationEventMulticaster();
simpleApplicationEventMulticaster.setTaskExecutor(taskExecutor());
return simpleApplicationEventMulticaster;
}
}
5. 事件指定监听器异步设置(指定监听器异步)
不设置所有事件为异步,只将部分事件监听器使用注解设置为异步
- 线程配置
import org.springframework.context.annotation.Configuration;
import org.springframework.scheduling.annotation.AsyncConfigurer;
import org.springframework.scheduling.annotation.EnableAsync;
import org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor;
import java.util.concurrent.Executor;
// EnableAsync注解开启异步
@EnableAsync
@Configuration
public class EventAsyncConfig implements AsyncConfigurer {
/**
* 给Async配置一个taskExecutor
* @return
*/
@Override
public Executor getAsyncExecutor() {
ThreadPoolTaskExecutor taskExecutor = new ThreadPoolTaskExecutor();
taskExecutor.setCorePoolSize(10); //线程池活跃的线程数
taskExecutor.setMaxPoolSize(30); //线程池最大活跃的线程数
taskExecutor.setWaitForTasksToCompleteOnShutdown(true);
taskExecutor.setThreadNamePrefix("event-async");
taskExecutor.initialize();
return taskExecutor;
}
}
- 监听器使用注解开启异步
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.context.ApplicationListener;
import org.springframework.scheduling.annotation.Async;
import org.springframework.stereotype.Component;
/**
* 事件监听类
*/
@Component
public class BusiCreateSmsEventListener implements ApplicationListener<BusiCreateEvent> {
private Logger logger = LoggerFactory.getLogger(getClass());
// Async开启此监听器的异步
@Async
@Override
public void onApplicationEvent(BusiCreateEvent event) {
System.out.println("\n\n\n---------------------------------------");
System.out.println("Listener One Thread:"+Thread.currentThread());
try {
Thread.sleep(3000);
} catch (InterruptedException e) {
e.printStackTrace();
}
System.out.println("Listener One Result:"+event.getMessage());
System.out.println("=========================================\n\n\n");
}
}
三、事务问题
正常来说,使用了事件,监听器会异步化处理,不干扰主线程执行,不论事务被提交或回滚,监听器都能正常执行。
但是,有的场景中,监听器会去查询主线程的数据,但主线程未提交事务,所以监听器中查询不到该数据,解决该问题有以下方案:
- 监听器设置,事务传播策略:总是新建、事务隔离策略:读未提交
- 事务同步管理器TransactionSynchronizationManager
- @TransactionalEventListener注解
主线程插入数据
方案1:监听器设置,事务传播策略:总是新建、事务隔离策略:读未提交
该方案属于较为暴力方案,直接读取主线程事务中未提交的数据
@Component
public class BusiCreateApiEventListener implements ApplicationListener<BusiCreateEvent> {
private Logger logger = LoggerFactory.getLogger(getClass());
@Autowired
JdbcTemplate jdbcTemplate;
@Override
@Transactional(propagation = Propagation.REQUIRES_NEW,isolation = Isolation.READ_UNCOMMITTED)
public void onApplicationEvent(BusiCreateEvent event) {
System.out.println("\n\n\n---------------------------------------");
System.out.println("Listener Two Thread:"+Thread.currentThread());
String message = event.getMessage();
List<String> result = jdbcTemplate.queryForList("select buis_time from busi_data where busi_id='" + message + "'", String.class);
System.out.println("Listener Two query Result number:"+result.size());
try {
Thread.sleep(5000);
} catch (InterruptedException e) {
e.printStackTrace();
}
System.out.println("=========================================\n\n\n");
}
}
方案2.事务同步管理器TransactionSynchronizationManager
将事件发布写在事务提交后
@Transactional
@GetMapping("/busiCreate")
public Boolean busiCreate() throws InterruptedException {
String busiId = UUID.fastUUID().toString();
String sql = "INSERT INTO busi_data(busi_id, buis_time)VALUES('" +busiId+ "', '"+ DateUtil.now() +"')";
jdbcTemplate.execute(sql);
TransactionSynchronizationManager.registerSynchronization(new TransactionSynchronizationAdapter() {
@Override
public void afterCommit(){
customEventPublish.publish(busiId);
}
});
Thread.sleep(5000);
String busiId2 = UUID.fastUUID().toString();
sql = "INSERT INTO busi_data(busi_id, buis_time)VALUES('" +busiId2+ "', '"+ DateUtil.now() +"')";
jdbcTemplate.execute(sql);
System.out.println("createBusi successful");
return true;
}
方案3. @TransactionalEventListener注解(未能尝试成功)
在监听器中使用该注解,并进行配置
@Component
public class BusiCreateApiEventListener implements ApplicationListener<BusiCreateEvent> {
private Logger logger = LoggerFactory.getLogger(getClass());
@Override
@TransactionalEventListener(phase = TransactionPhase.AFTER_COMMIT, classes = BusiCreateEvent.class)
public void onApplicationEvent(BusiCreateEvent event) {
System.out.println("\n\n\n---------------------------------------");
System.out.println("Listener Two Thread:"+Thread.currentThread());
try {
Thread.sleep(5000);
} catch (InterruptedException e) {
e.printStackTrace();
}
System.out.println("Listener Two Result:"+event.getMessage());
System.out.println("=========================================\n\n\n");
}
}