Spring Event使用示例(业务解耦)

mtain 2024年02月29日 60次浏览

一、说明

Spring Event的应用场景主要是在实际业务开发过程中,当业务逻辑变得复杂,核心业务和多个子业务交织在一起时。为了避免代码过长和耦合度过高,Spring Event提供了一种观察者设计模式,允许一个Bean在完成任务后通知其他Bean,或者一个Bean观察并监听另一个Bean的行为。这种模式适用于需要在一次请求中同步完成多个业务场景,例如邮件发送、短信发送等。

业务场景示例:
主流程中,需要推送数据给外围平台或者发送短信等,为了降低代码耦合度,减少调用时间(异步),可以该技术来实现。

二、代码示例

1. 自定义事件

import org.springframework.context.ApplicationEvent;

/**
 * @desc 业务创建事件
 */
public class BusiCreateEvent extends ApplicationEvent {
    private String message;

    public String getMessage() {
        return message;
    }

    public void setMessage(String message) {
        this.message = message;
    }

    public BusiCreateEvent(Object source, String message) {
        super(source);
        this.message = message;
    }
}

2. 事件监听器

通常根据业务动作会创建多个监听器,每个监听器完成一个动作,彼此互不干扰

事件监听类1:

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.context.ApplicationListener;
import org.springframework.scheduling.annotation.Async;
import org.springframework.stereotype.Component;

/**
* 事件监听类
*/
@Component
public class BusiCreateSmsEventListener implements ApplicationListener<BusiCreateEvent> {
    private Logger logger = LoggerFactory.getLogger(getClass());

    @Override
    public void onApplicationEvent(BusiCreateEvent event) {
        System.out.println("\n\n\n---------------------------------------");
        System.out.println("Listener One Thread:"+Thread.currentThread());
        try {
            Thread.sleep(3000);
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
        System.out.println("Listener One Result:"+event.getMessage());
        System.out.println("=========================================\n\n\n");


    }
}

事件监听类2:

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.context.ApplicationListener;
import org.springframework.scheduling.annotation.Async;
import org.springframework.stereotype.Component;

/**
* 事件监听类2
*/
@Component
public class BusiCreateApiEventListener implements ApplicationListener<BusiCreateEvent> {
    private Logger logger = LoggerFactory.getLogger(getClass());


    @Override
    public void onApplicationEvent(BusiCreateEvent event) {
        System.out.println("\n\n\n---------------------------------------");
        System.out.println("Listener Two Thread:"+Thread.currentThread());
        try {
            Thread.sleep(5000);
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
        System.out.println("Listener Two Result:"+event.getMessage());
        System.out.println("=========================================\n\n\n");

    }
}

3. 事件发布

/**
* 事件发布类
*/
@Component
public class BusiCreateEventPublish {
 
    @Autowired
    private ApplicationEventPublisher eventPublisher;
 
    public void publish(String message){
        BusiCreateEvent event = new BusiCreateEvent(this, message);
        eventPublisher.publishEvent(event);
    }
 
}

可以在适当的位置调用事件发布,例如在controller中,发布事件触发后,监听都会收到该事件

import cn.hutool.core.lang.UUID;
import fastboot.core.comn.event.BusiCreateEventPublish;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.web.bind.annotation.GetMapping;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RestController;

@RestController
@RequestMapping("/event/publish")
public class EventPublishController {
    private final Logger logger = LoggerFactory.getLogger(getClass());
    @Autowired
    BusiCreateEventPublish customEventPublish;

    @GetMapping("/busiCreate")
    public Boolean busiCreate(){
        customEventPublish.publish(UUID.fastUUID().toString());
        System.out.println("Publish OK");
        return true;
    }


}

事件发布后,监听器默认使用主线程,进行同步执行,在实际场景中几乎都需要异步执行,可通过代码设置实现

4. 事件全局异步设置(所有监听器异步)

配置异步线程,并将事件设置为异步

import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.context.event.SimpleApplicationEventMulticaster;
import org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor;

import java.util.concurrent.Executor;

/**
 * @desc Spring Event事件通知支持多线程
 */
@Configuration
public class EventAsyncConfig {
    /**
     * 配置多线程
     * @return
     */
    @Bean
    public Executor taskExecutor() {
        ThreadPoolTaskExecutor taskExecutor = new ThreadPoolTaskExecutor();
        taskExecutor.setCorePoolSize(10); //线程池活跃的线程数
        taskExecutor.setMaxPoolSize(30); //线程池最大活跃的线程数
        taskExecutor.setWaitForTasksToCompleteOnShutdown(true);
        taskExecutor.setThreadNamePrefix("event-");
        taskExecutor.initialize();
        return taskExecutor;
    }

    /**
     * 设置事件使用异步
     * @return
     */
    @Bean
    public SimpleApplicationEventMulticaster applicationEventMulticaster() {
        SimpleApplicationEventMulticaster simpleApplicationEventMulticaster = new SimpleApplicationEventMulticaster();
        simpleApplicationEventMulticaster.setTaskExecutor(taskExecutor());
        return simpleApplicationEventMulticaster;
    }
}

5. 事件指定监听器异步设置(指定监听器异步)

不设置所有事件为异步,只将部分事件监听器使用注解设置为异步

  • 线程配置
import org.springframework.context.annotation.Configuration;
import org.springframework.scheduling.annotation.AsyncConfigurer;
import org.springframework.scheduling.annotation.EnableAsync;
import org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor;

import java.util.concurrent.Executor;

// EnableAsync注解开启异步
@EnableAsync
@Configuration
public class EventAsyncConfig implements AsyncConfigurer {

    /**
     * 给Async配置一个taskExecutor
     * @return
     */
    @Override
    public Executor getAsyncExecutor() {
        ThreadPoolTaskExecutor taskExecutor = new ThreadPoolTaskExecutor();
        taskExecutor.setCorePoolSize(10); //线程池活跃的线程数
        taskExecutor.setMaxPoolSize(30); //线程池最大活跃的线程数
        taskExecutor.setWaitForTasksToCompleteOnShutdown(true);
        taskExecutor.setThreadNamePrefix("event-async");
        taskExecutor.initialize();
        return taskExecutor;
    }


}
  • 监听器使用注解开启异步
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.context.ApplicationListener;
import org.springframework.scheduling.annotation.Async;
import org.springframework.stereotype.Component;

/**
* 事件监听类
*/
@Component
public class BusiCreateSmsEventListener implements ApplicationListener<BusiCreateEvent> {
    private Logger logger = LoggerFactory.getLogger(getClass());

    // Async开启此监听器的异步
    @Async
    @Override
    public void onApplicationEvent(BusiCreateEvent event) {
        System.out.println("\n\n\n---------------------------------------");
        System.out.println("Listener One Thread:"+Thread.currentThread());
        try {
            Thread.sleep(3000);
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
        System.out.println("Listener One Result:"+event.getMessage());
        System.out.println("=========================================\n\n\n");


    }
}

三、事务问题

正常来说,使用了事件,监听器会异步化处理,不干扰主线程执行,不论事务被提交或回滚,监听器都能正常执行。
但是,有的场景中,监听器会去查询主线程的数据,但主线程未提交事务,所以监听器中查询不到该数据,解决该问题有以下方案:

  • 监听器设置,事务传播策略:总是新建、事务隔离策略:读未提交
  • 事务同步管理器TransactionSynchronizationManager
  • @TransactionalEventListener注解

主线程插入数据

方案1:监听器设置,事务传播策略:总是新建、事务隔离策略:读未提交

该方案属于较为暴力方案,直接读取主线程事务中未提交的数据

@Component
public class BusiCreateApiEventListener implements ApplicationListener<BusiCreateEvent> {
    private Logger logger = LoggerFactory.getLogger(getClass());
    @Autowired
    JdbcTemplate jdbcTemplate;

    @Override
    @Transactional(propagation = Propagation.REQUIRES_NEW,isolation = Isolation.READ_UNCOMMITTED)
    public void onApplicationEvent(BusiCreateEvent event) {
        System.out.println("\n\n\n---------------------------------------");
        System.out.println("Listener Two Thread:"+Thread.currentThread());
        String message = event.getMessage();
        List<String> result = jdbcTemplate.queryForList("select buis_time from busi_data where busi_id='" + message + "'", String.class);
        System.out.println("Listener Two query Result number:"+result.size());
        try {
            Thread.sleep(5000);
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
        System.out.println("=========================================\n\n\n");

    }
}

方案2.事务同步管理器TransactionSynchronizationManager

将事件发布写在事务提交后

    @Transactional
    @GetMapping("/busiCreate")
    public Boolean busiCreate() throws InterruptedException {
        String busiId = UUID.fastUUID().toString();
        String sql = "INSERT INTO busi_data(busi_id, buis_time)VALUES('" +busiId+ "', '"+ DateUtil.now() +"')";
        jdbcTemplate.execute(sql);

        TransactionSynchronizationManager.registerSynchronization(new TransactionSynchronizationAdapter() {
            @Override
            public void afterCommit(){
                customEventPublish.publish(busiId);

            }
        });

        Thread.sleep(5000);
        String busiId2 = UUID.fastUUID().toString();
        sql = "INSERT INTO busi_data(busi_id, buis_time)VALUES('" +busiId2+ "', '"+ DateUtil.now() +"')";
        jdbcTemplate.execute(sql);
        System.out.println("createBusi successful");
        return true;
    }

方案3. @TransactionalEventListener注解(未能尝试成功)

在监听器中使用该注解,并进行配置

@Component
public class BusiCreateApiEventListener implements ApplicationListener<BusiCreateEvent> {
    private Logger logger = LoggerFactory.getLogger(getClass());


    @Override
    @TransactionalEventListener(phase = TransactionPhase.AFTER_COMMIT, classes = BusiCreateEvent.class)
    public void onApplicationEvent(BusiCreateEvent event) {
        System.out.println("\n\n\n---------------------------------------");
        System.out.println("Listener Two Thread:"+Thread.currentThread());
        try {
            Thread.sleep(5000);
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
        System.out.println("Listener Two Result:"+event.getMessage());
        System.out.println("=========================================\n\n\n");

    }
}