Skip to content

Spring Batch 作业监听器(Job Listener)

🏷️ Spring Batch

每个作业都有生命周期。Spring Batch 允许在生命周期的不同阶段通过提供的钩子( Hook )来添加额外的逻辑。

JobExecutionListener 接口提供了两个回调方法:beforeJob()afterJob()

创建 Spring Batch 监听器有两种方式:

  1. 实现上面的 JobExecutionListener 接口;
  2. 使用 @BeforeJob@AfterJob 注解,并通过 JobListenerFactoryBean 提供的 getListener() 将其代理为 JobExecutionListener

1. 实现 JobExecutionListener 接口

继承 JobExecutionListener 并实现其 beforeJob()afterJob() 方法即可。

参数均为 JobExecution 实例,可以通过其获取作业的执行信息。

示例代码如下:

java
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.batch.core.JobExecution;
import org.springframework.batch.core.JobExecutionListener;
import org.springframework.stereotype.Component;

/**
 * @author 佳佳
 */
@Component
public class JobLoggerListener implements JobExecutionListener {

    private static final Logger LOGGER = LoggerFactory.getLogger(JobLoggerListener.class);

    @Override
    public void beforeJob(JobExecution jobExecution) {
        LOGGER.info("{} is beginning execution", 
                jobExecution.getJobInstance().getJobName());
    }

    @Override
    public void afterJob(JobExecution jobExecution) {
        LOGGER.info("{} has completed with the status {}",
                jobExecution.getJobInstance().getJobName(),
                jobExecution.getStatus());
    }
}

调用 JobBuilderlistener() 方法添加监听器(可以调用多次以添加多个监听器)到作业:

java
@Bean
public Job listenerJob(@Qualifier("step1") Step step1,
                       JobLoggerListener jobLoggerListener) {
    return this.jobBuilderFactory.get("JobListenerJob")
            .start(step1)
            .listener(jobLoggerListener)
            .incrementer(new RunIdIncrementer())
            .build();
}

运行日志如下:

java
2022-11-30 11:45:17.436  INFO 14896 --- [           main] o.s.b.c.l.support.SimpleJobLauncher      : Job: [SimpleJob: [name=JobListenerJob]] launched with the following parameters: [{run.id=1}]
2022-11-30 11:45:17.563  INFO 14896 --- [           main] m.l.b.c.job.listener.JobLoggerListener   : JobListenerJob is beginning execution
2022-11-30 11:45:17.709  INFO 14896 --- [           main] o.s.batch.core.job.SimpleStepHandler     : Executing step: [step1]
Hello, World!
2022-11-30 11:45:17.893  INFO 14896 --- [           main] o.s.batch.core.step.AbstractStep         : Step: [step1] executed in 184ms
2022-11-30 11:45:17.980  INFO 14896 --- [           main] m.l.b.c.job.listener.JobLoggerListener   : JobListenerJob has completed with the status COMPLETED
2022-11-30 11:45:18.051  INFO 14896 --- [           main] o.s.b.c.l.support.SimpleJobLauncher      : Job: [SimpleJob: [name=JobListenerJob]] completed with the following parameters: [{run.id=1}] and the following status: [COMPLETED] in 486ms

可以看到 JobLoggerListener 在作业执行前后分别打印了一条日志。

2. 使用 @BeforeJob@AfterJob 注解

使用注解方式的代码示例如下:

java
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.batch.core.JobExecution;
import org.springframework.batch.core.annotation.AfterJob;
import org.springframework.batch.core.annotation.BeforeJob;
import org.springframework.stereotype.Component;

/**
 * @author 佳佳
 */
@Component
public class JobLoggerAnnotationListener {

    private static final Logger LOGGER = LoggerFactory.getLogger(JobLoggerAnnotationListener.class);

    @BeforeJob
    public void beforeJob(JobExecution jobExecution) {
        LOGGER.info("{} is beginning execution",
                jobExecution.getJobInstance().getJobName());
    }

    @AfterJob
    public void afterJob(JobExecution jobExecution) {
        LOGGER.info("{} has completed with the status {}",
                jobExecution.getJobInstance().getJobName(),
                jobExecution.getStatus());
    }
}

此时添加监听器的方式有两种:

  1. 通过 JobListenerFactoryBean.getListener() 方法将其代理为 JobExecutionListener

    java
    @Bean
    public Job listenerJob(@Qualifier("step1") Step step1,
                           JobLoggerAnnotationListener jobLoggerAnnotationListener) {
        return this.jobBuilderFactory.get("JobListenerJob")
                .start(step1)
                .listener(JobListenerFactoryBean.getListener(jobLoggerAnnotationListener))
                .incrementer(new RunIdIncrementer())
                .build();
    }
  2. 直接将监听器实例传给 listener(Object listener) 重载方法。

    实际上是将 JobListenerFactoryBean.getListener() 处理封装到了这个 listener() 方法里。

    相对于上一种,这种方式比较简单,而且这种方式添加了是否包含 @BeforeJob@AfterJob 注解的校验 -- 至少要有一个注解才会添加。

    java
    @Bean
    public Job listenerJob(@Qualifier("step1") Step step1,
                           JobLoggerAnnotationListener jobLoggerAnnotationListener) {
        return this.jobBuilderFactory.get("JobListenerJob")
                .start(step1)
                .listener(jobLoggerAnnotationListener)
                .incrementer(new RunIdIncrementer())
                .build();
    }

运行结果如下:

java
2022-11-30 16:01:14.819  INFO 22460 --- [           main] o.s.b.c.l.support.SimpleJobLauncher      : Job: [SimpleJob: [name=JobListenerJob]] launched with the following parameters: [{run.id=7}]
2022-11-30 16:01:14.937  INFO 22460 --- [           main] m.l.b.c.j.l.JobLoggerAnnotationListener  : JobListenerJob is beginning execution
2022-11-30 16:01:15.076  INFO 22460 --- [           main] o.s.batch.core.job.SimpleStepHandler     : Executing step: [step1]
Hello, World!
2022-11-30 16:01:15.221  INFO 22460 --- [           main] o.s.batch.core.step.AbstractStep         : Step: [step1] executed in 144ms
2022-11-30 16:01:15.271  INFO 22460 --- [           main] m.l.b.c.j.l.JobLoggerAnnotationListener  : JobListenerJob has completed with the status COMPLETED
2022-11-30 16:01:15.310  INFO 22460 --- [           main] o.s.b.c.l.support.SimpleJobLauncher      : Job: [SimpleJob: [name=JobListenerJob]] completed with the following parameters: [{run.id=7}] and the following status: [COMPLETED] in 389ms

执行顺序

可以通过继承 org.springframework.core.Ordered 接口或添加 @Order 注解来指定监听器执行的顺序。

  • 同时使用 Ordered 接口和 @Order 注解时,Ordered 接口的优先级高。
  • 所有未指定顺序的监听器始终排在最后执行。
  • 使用上面 @BeforeJob@AfterJob 注解的方式添加监听器时,@Order 注解的排序不生效( Ordered 接口的方式是可以的)。原因在于由于使用了代理,获取不到被代理类的注解信息。
  • afterJob() 方法的执行顺序和 beforeJob() 方法是相反的。beforeJob() 按照指定的顺序执行,afterJob() 按照相反的顺序执行(类似于堆栈 LIFO )。

附 1. JobExecutionListener 接口源码

下面是在 spring-batch-core:4.3.7JobExecutionListener 接口的源码:

java
/*
 * Copyright 2006-2013 the original author or authors.
 *
 * Licensed under the Apache License, Version 2.0 (the "License");
 * you may not use this file except in compliance with the License.
 * You may obtain a copy of the License at
 *
 *      https://www.apache.org/licenses/LICENSE-2.0
 *
 * Unless required by applicable law or agreed to in writing, software
 * distributed under the License is distributed on an "AS IS" BASIS,
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 * See the License for the specific language governing permissions and
 * limitations under the License.
 */
package org.springframework.batch.core;

/**
 * Provide callbacks at specific points in the lifecycle of a {@link Job}.
 * Implementations can be stateful if they are careful to either ensure thread
 * safety, or to use one instance of a listener per job, assuming that job
 * instances themselves are not used by more than one thread.
 *
 * @author Dave Syer
 *
 */
public interface JobExecutionListener {

    /**
     * Callback before a job executes.
     *
     * @param jobExecution the current {@link JobExecution}
     */
    void beforeJob(JobExecution jobExecution);

    /**
     * Callback after completion of a job. Called after both both successful and
     * failed executions. To perform logic on a particular status, use
     * "if (jobExecution.getStatus() == BatchStatus.X)".
     *
     * @param jobExecution the current {@link JobExecution}
     */
    void afterJob(JobExecution jobExecution);

}

参考

[1]:《Spring Batch 权威指南》 -- [美] 迈克尔·T.米内拉(Michael,T.,Minella)著