SpringBoot定时任务以及全局业务 异步 双线程池实践

springboot 定时任务

@EnableScheduling @Scheduled

使用

注解1放在启动类上

注解2放在定时任务的方法上

经验:

@Scheduled 注解可以写很多的方式使用最多的是使用 cron 表达式来定时处理时间eg.

@Scheduled(cron = "0 0 0 * * ?")

五个参数说说明:

对于周这个参数: 是从1-7 周末到周六计算的

[秒] [分] [时] [日] [月] [周] [年]

cron.qqe2.com/ 可以自动生成限定时间的cron表达式工具

0-59 , - * /
0-59 , - * /
小时 0-23 , - * /
日期 1-31 , - * ? / L W C
月份 1-12 或者 JAN-DEC , - * /
星期 1-7 或者 SUN-SAT , - * ? / L C #
年(可选) 留空, 1970-2099 , - * /

@Scheduled(cron = "10 * * * * ?") //一个分钟的十秒处理一次 /* @Scheduled(fixedRate = 5000) :上一次开始执行时间点之后5秒再执行 @Scheduled(fixedDelay = 5000) :上一次执行完毕时间点之后5秒再执行 @Scheduled(initialDelay=1000, fixedRate=5000) :第一次延迟1秒后执行,之后按fixedRate的规则每5秒执行一次 @Scheduled(cron="0/5 * * * * ?") :通过cron表达式定义规则 */

总结: 对于cron表达式中一般不写七位的年份,一般以六位为主

两个注解的使用应该就能直接实现springboot项目中的定时任务但是这样实现的定时任务如果加一些配置文件的话,如果存在多个定时任务的话就会出现可能在同一时间内只能执行一个定时任务的情况。

异步

@EnableAsync @Async

使用

注解1放在启动类上,注解2放在定时任务的方法上面

@Async("asyncScheduleExecutor")

@Async 注解里面的参数可以指定该异步方法使用的线程池是哪一个。

线程池的创建

阿里Java开发者手册里面明确要求创建线程池方法

 public ThreadPoolExecutor(int corePoolSize,
                              int maximumPoolSize,
                              long keepAliveTime,
                              TimeUnit unit,
                              BlockingQueue<Runnable> workQueue,
                              RejectedExecutionHandler handler) {
        this(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue,
             Executors.defaultThreadFactory(), handler);
    }

使用new ThreadPoolExecutor 这种方式创建线程池是因为可以让使用者更了解具体每一个参数的作用以及目的:

参数说明:

  1. corePoolSize 核心线程池大小
  2. maximumPoolSize 最大线程池大小
  3. keepAliveTime 空闲线程的存活时间
  4. unit 空闲线程保持存活的时间的单位。
  5. workQueue 阻塞任务队列,当要执行的任务超出corePoolSize ,那么此时将把任务放入队列中。
  6. handler 提交任务超出maxmumPoolSize+workQueue时的拒绝策略

任务队列:

  • ArrayBlockingQueue:是一个基于数组结构的有界阻塞队列,此队列按 FIFO(先进先出)原则对元素进行排序。
  • LinkedBlockingQueue:一个基于链表结构的阻塞队列,此队列按 FIFO (先进先出) 排序元素,吞吐量通常要高于 ArrayBlockingQueue。静态工厂方法Executors.newFixedThreadPool() 使用了这个队列。
  • SynchronousQueue:一个不存储元素的阻塞队列。每个插入操作必须等到另一个线程调用移除操作,否则插入操作一直处于阻塞状态,吞吐量通常要高于LinkedBlockingQueue,静态工厂方法Executors.newCachedThreadPool 使用了这个队列。
  • PriorityBlockingQueue:一个具有优先级的无限阻塞队列。

拒绝策略:

  • AbortPolicy 中止策略 :默认的饱和策略,该策略将抛出为检查的RejectedExecutionException.调用者可以捕获这个异常,然后根据需求编写自己的代码。
  • DiscardPolicy 抛弃策略: 当新提交的任务无法保存到队列中等待执行时,抛弃策略会悄悄抛弃该任务
  • DiscardOldestPolicy 抛弃最旧的策略: 抛弃下一个将被执行的任务,然后添加新提交的任务
  • CallerRunsPolicy 调用者运行策略: 该策略实现了一种调用机制,该策略既不抛弃任务,也不抛出异常,而是将某些任务回退到调用者,从而降低新任务的流量。它不会在线程池的某个线程中执行新提交的任务,而是在一个调用了execute的线程中执行该任务(一般该调用者是main线程)

实践

目的

系统中即存在多定时任务需要异步执行,又存在需要单独执行的异步方法.

Let’s do it!

1,项目启动类中加入相应的注解:

package com.smile.ssm;

import org.mybatis.spring.annotation.MapperScan;
import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;
import org.springframework.scheduling.annotation.EnableAsync;
import org.springframework.scheduling.annotation.EnableScheduling;

/**
* @author Smile
*/
@SpringBootApplication
@MapperScan("com.smile.ssm.dao")
@EnableScheduling
@EnableAsync
public class SsmApplication {

   public static void main(String[] args) {
       SpringApplication.run(SsmApplication.class, args);
       System.out.println("******************SSM project start success !!!***********************");
   }

}

2,可视化线程池执行情况

日志中可视化线程池执行情况,执行了多少了,队列中还剩下多少等等 image.png 相应的代码

package com.smile.ssm.config;

import lombok.extern.slf4j.Slf4j;
import org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor;
import org.springframework.util.concurrent.ListenableFuture;

import java.util.concurrent.Callable;
import java.util.concurrent.Future;
import java.util.concurrent.ThreadPoolExecutor;

/**
 * 可视化线程池管理构造器
 *
 * @author Smile
 */
@Slf4j
public class VisibleThreadPoolTaskExecutor extends ThreadPoolTaskExecutor {
    private void showThreadPoolInfo(String prefix) {
        ThreadPoolExecutor threadPoolExecutor = getThreadPoolExecutor();

        log.info("{}, {},taskCount [{}], completedTaskCount [{}], activeCount [{}], queueSize [{}]",
                this.getThreadNamePrefix(),
                prefix,
                threadPoolExecutor.getTaskCount(),
                threadPoolExecutor.getCompletedTaskCount(),
                threadPoolExecutor.getActiveCount(),
                threadPoolExecutor.getQueue().size());
    }

    @Override
    public void execute(Runnable task) {
        showThreadPoolInfo("1. do execute");
        super.execute(task);
    }

    @Override
    public void execute(Runnable task, long startTimeout) {
        showThreadPoolInfo("2. do execute");
        super.execute(task, startTimeout);
    }

    @Override
    public Future<?> submit(Runnable task) {
        showThreadPoolInfo("1. do submit");
        return super.submit(task);
    }

    @Override
    public <T> Future<T> submit(Callable<T> task) {
        showThreadPoolInfo("2. do submit");
        return super.submit(task);
    }

    @Override
    public ListenableFuture<?> submitListenable(Runnable task) {
        showThreadPoolInfo("1. do submitListenable");
        return super.submitListenable(task);
    }

    @Override
    public <T> ListenableFuture<T> submitListenable(Callable<T> task) {
        showThreadPoolInfo("2. do submitListenable");
        return super.submitListenable(task);
    }
}

3,创建两个线程池并且分别指定线程池的名称

image.png 这里注意我自己写了一个可视化的ThreadPoolExcutor的方便便于我们在日志中区分使用了两个线程池执行不一样的内容

image.png

package com.smile.ssm.config;

import cn.hutool.core.thread.ThreadFactoryBuilder;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.scheduling.annotation.EnableAsync;
import org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor;

import java.util.concurrent.*;

/**
 * 全局业务线程池
 *
 * @author smile
 */
@Configuration
@EnableAsync
public class ExecutorsConfig {
    @Bean
    public Executor asyncServiceExecutor() {
        //获取当前机器的核数
        int cpuNum = Runtime.getRuntime().availableProcessors();
        ThreadPoolTaskExecutor executor = new VisibleThreadPoolTaskExecutor();
        //配置核心线程数
        executor.setCorePoolSize(cpuNum);
        //配置最大线程数
        executor.setMaxPoolSize(cpuNum * 2);
        //配置队列大小
        executor.setQueueCapacity(150);
        //线程存活时间
        executor.setKeepAliveSeconds(60);
        //配置线程池中的线程的名称前缀
        executor.setThreadNamePrefix("PUSH_DATA_THREAD");
        // CALLER_RUNS:不在新线程中执行任务,而是有调用者所在的线程来执行
        executor.setRejectedExecutionHandler(new ThreadPoolExecutor.CallerRunsPolicy());
        //执行初始化
        executor.initialize();
        return executor;
    }

    public static void main(String[] args) {
        //规范标准版创建线程池 局部
        ThreadFactory namedThreadFactory = new ThreadFactoryBuilder().setNamePrefix("demo-pool-%d").build();
        ThreadPoolExecutor threadPoolExecutor = new ThreadPoolExecutor(5, 200, 0L, TimeUnit.MILLISECONDS,
                new LinkedBlockingQueue<Runnable>(1024), namedThreadFactory, new ThreadPoolExecutor.AbortPolicy());
    }


    @Bean
    public Executor asyncScheduleExecutor() {
        //获取当前机器的核数
        int cpuNum = Runtime.getRuntime().availableProcessors();
        ThreadPoolTaskExecutor executor = new VisibleThreadPoolTaskExecutor();
        //配置核心线程数
        executor.setCorePoolSize(cpuNum);
        //配置最大线程数
        executor.setMaxPoolSize(cpuNum * 2);
        //配置队列大小
        executor.setQueueCapacity(150);
        //线程存活时间
        executor.setKeepAliveSeconds(60);
        //配置线程池中的线程的名称前缀
        executor.setThreadNamePrefix("Schedule_THREAD");
        // CALLER_RUNS:不在新线程中执行任务,而是有调用者所在的线程来执行
        executor.setRejectedExecutionHandler(new ThreadPoolExecutor.CallerRunsPolicy());
        //执行初始化
        executor.initialize();
        return executor;
    }


}

4,需要执行的异步方法上面使用异步注解实现异步

注意 @Async(“asyncServiceExecutor”) 这个里面的值代表着我们线程配置文件中的方法亦或beanName

image.png

package com.smile.ssm.timer;

![image.png](https://p9-juejin.byteimg.com/tos-cn-i-k3u1fbpfcp/d6361a8ee52d4c8084fac8b96f9b2e39~tplv-k3u1fbpfcp-watermark.image?)
import com.smile.ssm.entity.SysUserPO;
import com.smile.ssm.service.SysUserService;
import lombok.extern.slf4j.Slf4j;
import org.springframework.scheduling.annotation.Async;
import org.springframework.stereotype.Component;

import javax.annotation.Resource;
import java.util.List;

/**
 * @author Smile
 */
@Component
@Slf4j
public class AsyncTask {
    @Resource
    private SysUserService service;

    @Async("asyncServiceExecutor")
    public void doTask1() throws InterruptedException {
        List<SysUserPO> list = service.list();
        System.out.println(list);
        log.info("Task1 started.");
        long start = System.currentTimeMillis();
        Thread.sleep(5000);
        long end = System.currentTimeMillis();

        log.info("Task1 finished, time elapsed: {} ms.", end - start);
    }
}

5,需要多定时任务异步执行的使用类

image.png

package com.smile.ssm.timer;

import com.smile.ssm.entity.SysUserPO;
import com.smile.ssm.service.SysUserService;
import lombok.extern.slf4j.Slf4j;
import org.springframework.scheduling.annotation.Async;
import org.springframework.scheduling.annotation.Scheduled;
import org.springframework.stereotype.Component;

import javax.annotation.Resource;
import java.util.List;

/**
 * 定时任务测试类
 *
 * @author Smile
 */
@Component
@Slf4j
public class ScheduleTimeTest {

    @Resource
    private SysUserService service;

    @Async("asyncScheduleExecutor")
    @Scheduled(cron = "0/10 * * * * ?")
    public void test1() {
        System.out.println("定时任务1开始");
        List<SysUserPO> list = service.list();
        System.out.println(list);

        System.out.println("定时任务1结束");
    }

    @Async("asyncScheduleExecutor")
    @Scheduled(cron = "0/10 * * * * ?")
    public void test2() {
        System.out.println("定时任务2开始");
        List<SysUserPO> list = service.list();
        System.out.println(list);
        System.out.println("定时任务2结束");
    }

    @Async("asyncScheduleExecutor")
    @Scheduled(cron = "0/10 * * * * ?")
    public void test3() {
        System.out.println("定时任务3");
        List<SysUserPO> list = service.list();
        System.out.println(list);
        System.out.println("定时任务3结束");
    }

    @Async("asyncScheduleExecutor")
    @Scheduled(cron = "0/10 * * * * ?")
    public void test4() {
        System.out.println("定时任务4");
        List<SysUserPO> list = service.list();
        System.out.println(list);
        System.out.println("定时任务4结束");
    }


}

测试

单纯定时任务异步线程池

image.png

双线程池使用

image.png

源码地址


作者:Smile_X
链接:SpringBoot定时任务以及全局业务 异步 双线程池实践 - 掘金