Skip to content
欢迎扫码关注公众号

Creating a Batch Service - Spring Boot

通过 Spring Initializr 创建项目,并添加如下依赖:

  • Spring Batch
  • HyperSQL Database
    Lightweight 100% Java SQL Database Engine.

示例代码

主要的代码在 BatchConfiguration.java 类中。

首先分别定义了一个 reader、processor 和 writer,分来用来

  • 读取 csv 文件的内容
  • 转换为大写
  • 写入数据库

之后将上面的三个 bean 组合成一个 Step,然后构建成一个 Job

另外 Job 还注册了一个监听器 JobCompletionNotificationListener,该监听器实现了 afterJob 方法,会在 Job 执行后读取数据库的内容并打印。

完整代码如下:

xml
<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
	xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 https://maven.apache.org/xsd/maven-4.0.0.xsd">
	<modelVersion>4.0.0</modelVersion>
	<parent>
		<groupId>org.springframework.boot</groupId>
		<artifactId>spring-boot-starter-parent</artifactId>
		<version>3.4.4</version>
		<relativePath/> <!-- lookup parent from repository -->
	</parent>
	<groupId>me.liujiajia</groupId>
	<artifactId>batch-example</artifactId>
	<version>0.0.1-SNAPSHOT</version>
	<name>batch-example</name>
	<description>Demo project for Spring Boot</description>
	<url/>
	<licenses>
		<license/>
	</licenses>
	<developers>
		<developer/>
	</developers>
	<scm>
		<connection/>
		<developerConnection/>
		<tag/>
		<url/>
	</scm>
	<properties>
		<java.version>17</java.version>
	</properties>
	<dependencies>
		<dependency>
			<groupId>org.springframework.boot</groupId>
			<artifactId>spring-boot-starter-batch</artifactId>
		</dependency>

		<dependency>
			<groupId>org.hsqldb</groupId>
			<artifactId>hsqldb</artifactId>
			<scope>runtime</scope>
		</dependency>
		<dependency>
			<groupId>org.springframework.boot</groupId>
			<artifactId>spring-boot-starter-test</artifactId>
			<scope>test</scope>
		</dependency>
		<dependency>
			<groupId>org.springframework.batch</groupId>
			<artifactId>spring-batch-test</artifactId>
			<scope>test</scope>
		</dependency>
	</dependencies>

	<build>
		<plugins>
			<plugin>
				<groupId>org.springframework.boot</groupId>
				<artifactId>spring-boot-maven-plugin</artifactId>
			</plugin>
		</plugins>
	</build>

</project>
java
package me.liujiajia.batch_example;

public record Person(String firstName, String lastName) {

}
java
package me.liujiajia.batch_example;

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import org.springframework.batch.item.ItemProcessor;

public class PersonItemProcessor implements ItemProcessor<Person, Person> {

  private static final Logger log = LoggerFactory.getLogger(PersonItemProcessor.class);

  @Override
  public Person process(final Person person) {
    final String firstName = person.firstName().toUpperCase();
    final String lastName = person.lastName().toUpperCase();

    final Person transformedPerson = new Person(firstName, lastName);

    log.info("Converting ({}) into ({})", person, transformedPerson);

    return transformedPerson;
  }

}
java
package me.liujiajia.batch_example;

import org.springframework.batch.core.Job;
import org.springframework.batch.core.Step;
import org.springframework.batch.core.job.builder.JobBuilder;
import org.springframework.batch.core.repository.JobRepository;
import org.springframework.batch.core.step.builder.StepBuilder;
import org.springframework.batch.item.database.JdbcBatchItemWriter;
import org.springframework.batch.item.database.builder.JdbcBatchItemWriterBuilder;
import org.springframework.batch.item.file.FlatFileItemReader;
import org.springframework.batch.item.file.builder.FlatFileItemReaderBuilder;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.core.io.ClassPathResource;
import org.springframework.jdbc.datasource.DataSourceTransactionManager;

import javax.sql.DataSource;

@Configuration
public class BatchConfiguration {

    @Bean
    public FlatFileItemReader<Person> reader() {
        return new FlatFileItemReaderBuilder<Person>()
                .name("personItemReader")
                .resource(new ClassPathResource("sample-data.csv"))
                .delimited()
                .names("firstName", "lastName")
                .targetType(Person.class)
                .build();
    }

    @Bean
    public PersonItemProcessor processor() {
        return new PersonItemProcessor();
    }

    @Bean
    public JdbcBatchItemWriter<Person> writer(DataSource dataSource) {
        return new JdbcBatchItemWriterBuilder<Person>()
                .sql("INSERT INTO people (first_name, last_name) VALUES (:firstName, :lastName)")
                .dataSource(dataSource)
                .beanMapped()
                .build();
    }

    @Bean
    public Job importUserJob(JobRepository jobRepository, Step step1, JobCompletionNotificationListener listener) {
        return new JobBuilder("importUserJob", jobRepository)
                .listener(listener)
                .start(step1)
                .build();
    }

    @Bean
    public Step step1(JobRepository jobRepository, DataSourceTransactionManager transactionManager,
                      FlatFileItemReader<Person> reader, PersonItemProcessor processor, JdbcBatchItemWriter<Person> writer) {
        return new StepBuilder("step1", jobRepository)
                .<Person, Person>chunk(3, transactionManager)
                .reader(reader)
                .processor(processor)
                .writer(writer)
                .build();
    }
}
java
package me.liujiajia.batch_example;

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import org.springframework.batch.core.BatchStatus;
import org.springframework.batch.core.JobExecution;
import org.springframework.batch.core.JobExecutionListener;
import org.springframework.jdbc.core.DataClassRowMapper;
import org.springframework.jdbc.core.JdbcTemplate;
import org.springframework.stereotype.Component;

@Component
public class JobCompletionNotificationListener implements JobExecutionListener {

  private static final Logger log = LoggerFactory.getLogger(JobCompletionNotificationListener.class);

  private final JdbcTemplate jdbcTemplate;

  public JobCompletionNotificationListener(JdbcTemplate jdbcTemplate) {
    this.jdbcTemplate = jdbcTemplate;
  }

  @Override
  public void afterJob(JobExecution jobExecution) {
    if (jobExecution.getStatus() == BatchStatus.COMPLETED) {
      log.info("!!! JOB FINISHED! Time to verify the results");

      jdbcTemplate
          .query("SELECT first_name, last_name FROM people", new DataClassRowMapper<>(Person.class))
          .forEach(person -> log.info("Found <{}> in the database.", person));
    }
  }
}
csv
Jill,Doe
Joe,Doe
Justin,Doe
Jane,Doe
John,Doe
sql
DROP TABLE people IF EXISTS;

CREATE TABLE people  (
    person_id BIGINT IDENTITY NOT NULL PRIMARY KEY,
    first_name VARCHAR(20),
    last_name VARCHAR(20)
);
properties
spring.application.name=batch-example
java
package me.liujiajia.batch_example;

import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;

@SpringBootApplication
public class BatchExampleApplication {

	public static void main(String[] args) {
		System.exit(SpringApplication.exit(SpringApplication.run(BatchExampleApplication.class, args)));
	}

}
txt
  .   ____          _            __ _ _
 /\\ / ___'_ __ _ _(_)_ __  __ _ \ \ \ \
( ( )\___ | '_ | '_| | '_ \/ _` | \ \ \ \
 \\/  ___)| |_)| | | | | || (_| |  ) ) ) )
  '  |____| .__|_| |_|_| |_\__, | / / / /
 =========|_|==============|___/=/_/_/_/

 :: Spring Boot ::                (v3.4.4)

2025-04-01T16:47:50.280+08:00  INFO 24108 --- [batch-example] [           main] m.l.b.BatchExampleApplication            : Starting BatchExampleApplication using Java 22.0.2 with PID 24108 (D:\projects\gitee\ryukaka\example\batch-example\target\classes started by 佳佳 in D:\projects\gitee\ryukaka\example\batch-example)
2025-04-01T16:47:50.284+08:00  INFO 24108 --- [batch-example] [           main] m.l.b.BatchExampleApplication            : No active profile set, falling back to 1 default profile: "default"
2025-04-01T16:47:51.781+08:00  INFO 24108 --- [batch-example] [           main] com.zaxxer.hikari.HikariDataSource       : HikariPool-1 - Starting...
2025-04-01T16:47:52.354+08:00  INFO 24108 --- [batch-example] [           main] com.zaxxer.hikari.pool.PoolBase          : HikariPool-1 - Driver does not support get/set network timeout for connections. (feature not supported)
2025-04-01T16:47:52.357+08:00  INFO 24108 --- [batch-example] [           main] com.zaxxer.hikari.pool.HikariPool        : HikariPool-1 - Added connection org.hsqldb.jdbc.JDBCConnection@3a22bad6
2025-04-01T16:47:52.360+08:00  INFO 24108 --- [batch-example] [           main] com.zaxxer.hikari.HikariDataSource       : HikariPool-1 - Start completed.
2025-04-01T16:47:52.908+08:00  INFO 24108 --- [batch-example] [           main] m.l.b.BatchExampleApplication            : Started BatchExampleApplication in 3.585 seconds (process running for 4.904)
2025-04-01T16:47:52.913+08:00  INFO 24108 --- [batch-example] [           main] o.s.b.a.b.JobLauncherApplicationRunner   : Running default command line with: []
2025-04-01T16:47:52.972+08:00  INFO 24108 --- [batch-example] [           main] o.s.b.c.l.s.TaskExecutorJobLauncher      : Job: [SimpleJob: [name=importUserJob]] launched with the following parameters: [{}]
2025-04-01T16:47:53.000+08:00  INFO 24108 --- [batch-example] [           main] o.s.batch.core.job.SimpleStepHandler     : Executing step: [step1]
2025-04-01T16:47:53.023+08:00  INFO 24108 --- [batch-example] [           main] m.l.batch_example.PersonItemProcessor    : Converting (Person[firstName=Jill, lastName=Doe]) into (Person[firstName=JILL, lastName=DOE])
2025-04-01T16:47:53.027+08:00  INFO 24108 --- [batch-example] [           main] m.l.batch_example.PersonItemProcessor    : Converting (Person[firstName=Joe, lastName=Doe]) into (Person[firstName=JOE, lastName=DOE])
2025-04-01T16:47:53.027+08:00  INFO 24108 --- [batch-example] [           main] m.l.batch_example.PersonItemProcessor    : Converting (Person[firstName=Justin, lastName=Doe]) into (Person[firstName=JUSTIN, lastName=DOE])
2025-04-01T16:47:53.039+08:00  INFO 24108 --- [batch-example] [           main] m.l.batch_example.PersonItemProcessor    : Converting (Person[firstName=Jane, lastName=Doe]) into (Person[firstName=JANE, lastName=DOE])
2025-04-01T16:47:53.039+08:00  INFO 24108 --- [batch-example] [           main] m.l.batch_example.PersonItemProcessor    : Converting (Person[firstName=John, lastName=Doe]) into (Person[firstName=JOHN, lastName=DOE])
2025-04-01T16:47:53.043+08:00  INFO 24108 --- [batch-example] [           main] o.s.batch.core.step.AbstractStep         : Step: [step1] executed in 42ms
2025-04-01T16:47:53.048+08:00  INFO 24108 --- [batch-example] [           main] m.l.b.JobCompletionNotificationListener  : !!! JOB FINISHED! Time to verify the results
2025-04-01T16:47:53.052+08:00  INFO 24108 --- [batch-example] [           main] m.l.b.JobCompletionNotificationListener  : Found <Person[firstName=JILL, lastName=DOE]> in the database.
2025-04-01T16:47:53.052+08:00  INFO 24108 --- [batch-example] [           main] m.l.b.JobCompletionNotificationListener  : Found <Person[firstName=JOE, lastName=DOE]> in the database.
2025-04-01T16:47:53.052+08:00  INFO 24108 --- [batch-example] [           main] m.l.b.JobCompletionNotificationListener  : Found <Person[firstName=JUSTIN, lastName=DOE]> in the database.
2025-04-01T16:47:53.052+08:00  INFO 24108 --- [batch-example] [           main] m.l.b.JobCompletionNotificationListener  : Found <Person[firstName=JANE, lastName=DOE]> in the database.
2025-04-01T16:47:53.052+08:00  INFO 24108 --- [batch-example] [           main] m.l.b.JobCompletionNotificationListener  : Found <Person[firstName=JOHN, lastName=DOE]> in the database.
2025-04-01T16:47:53.055+08:00  INFO 24108 --- [batch-example] [           main] o.s.b.c.l.s.TaskExecutorJobLauncher      : Job: [SimpleJob: [name=importUserJob]] completed with the following parameters: [{}] and the following status: [COMPLETED] in 60ms
2025-04-01T16:47:53.062+08:00  INFO 24108 --- [batch-example] [           main] com.zaxxer.hikari.HikariDataSource       : HikariPool-1 - Shutdown initiated...
2025-04-01T16:47:53.078+08:00  INFO 24108 --- [batch-example] [           main] com.zaxxer.hikari.HikariDataSource       : HikariPool-1 - Shutdown completed.
已与地址为 ''127.0.0.1:51027',传输: '套接字'' 的目标虚拟机断开连接

进程已结束,退出代码为 0

关于 schema-all.sql

schema-all.sql 这个文件在示例代码中没有调用的地方。

官方文档中的说明:

Spring Boot runs schema-@@platform@@.sql automatically during startup. -all is the default for all platforms.

具体的代码在 spring-boot-autoconfigure 包中。

  1. DataSourceInitializationConfiguration 定义 SqlDataSourceScriptDatabaseInitializer Bean
  2. SqlDataSourceScriptDatabaseInitializergetSettings 方法中加载配置
    根据 platform 加载不同的 sql 文件。另外,也可以通过配置项 spring.sql.init.schema-locations 修改 schema.sql 文件的路径。
    java
    @ImportRuntimeHints(SqlInitializationScriptsRuntimeHints.class)
    public class SqlDataSourceScriptDatabaseInitializer extends DataSourceScriptDatabaseInitializer {
        public SqlDataSourceScriptDatabaseInitializer(DataSource dataSource, SqlInitializationProperties properties) {
            this(dataSource, getSettings(properties));
        }
        public SqlDataSourceScriptDatabaseInitializer(DataSource dataSource, DatabaseInitializationSettings settings) {
            super(dataSource, settings);
        }
        public static DatabaseInitializationSettings getSettings(SqlInitializationProperties properties) {
            return SettingsCreator.createFrom(properties);
        }
    }
    java
    final class SettingsCreator {
    
        private SettingsCreator() {
        }
    
        static DatabaseInitializationSettings createFrom(SqlInitializationProperties properties) {
            DatabaseInitializationSettings settings = new DatabaseInitializationSettings();
            settings
                .setSchemaLocations(scriptLocations(properties.getSchemaLocations(), "schema", properties.getPlatform()));
            settings.setDataLocations(scriptLocations(properties.getDataLocations(), "data", properties.getPlatform()));
            settings.setContinueOnError(properties.isContinueOnError());
            settings.setSeparator(properties.getSeparator());
            settings.setEncoding(properties.getEncoding());
            settings.setMode(properties.getMode());
            return settings;
        }
    
        private static List<String> scriptLocations(List<String> locations, String fallback, String platform) {
            if (locations != null) {
                return locations;
            }
            List<String> fallbackLocations = new ArrayList<>();
            fallbackLocations.add("optional:classpath*:" + fallback + "-" + platform + ".sql");
            fallbackLocations.add("optional:classpath*:" + fallback + ".sql");
            return fallbackLocations;
        }
    }
    java
    @ConfigurationProperties("spring.sql.init")
    public class SqlInitializationProperties {
        private List<String> schemaLocations;
        private List<String> dataLocations;
        private String platform = "all";
        private String username;
        private String password;
        private boolean continueOnError = false;
        private String separator = ";";
        private Charset encoding;
        private DatabaseInitializationMode mode = DatabaseInitializationMode.EMBEDDED;
        // ...
    }
    java
    public enum DatabaseInitializationMode {
        /**
        * Always initialize the database.
        */
        ALWAYS,
        /**
        * Only initialize an embedded database.
        */
        EMBEDDED,
        /**
        * Never initialize the database.
        */
        NEVER
    }
  3. 由于 SqlDataSourceScriptDatabaseInitializer 实现了 InitializingBean 接口,在 Bean 创建结束后会执行 afterPropertiesSet 方法
    实际的实现在 AbstractScriptDatabaseInitializer 类中,最后调用的 runScripts 方法的实现在 DataSourceScriptDatabaseInitializer 类中。
    java
    @Override
    public void afterPropertiesSet() throws Exception {
        initializeDatabase();
    }
    public boolean initializeDatabase() {
        ScriptLocationResolver locationResolver = new ScriptLocationResolver(this.resourceLoader);
        boolean initialized = applySchemaScripts(locationResolver);
        return applyDataScripts(locationResolver) || initialized;
    }
    private boolean applySchemaScripts(ScriptLocationResolver locationResolver) {
        return applyScripts(this.settings.getSchemaLocations(), "schema", locationResolver);
    }
    private boolean applyScripts(List<String> locations, String type, ScriptLocationResolver locationResolver) {
        List<Resource> scripts = getScripts(locations, type, locationResolver);
        if (!scripts.isEmpty() && isEnabled()) {
            runScripts(scripts);
            return true;
        }
        return false;
    }
    private void runScripts(List<Resource> resources) {
        runScripts(new Scripts(resources).continueOnError(this.settings.isContinueOnError())
            .separator(this.settings.getSeparator())
            .encoding(this.settings.getEncoding()));
    }
    java
    protected void runScripts(Scripts scripts) {
        ResourceDatabasePopulator populator = new ResourceDatabasePopulator();
        populator.setContinueOnError(scripts.isContinueOnError());
        populator.setSeparator(scripts.getSeparator());
        if (scripts.getEncoding() != null) {
            populator.setSqlScriptEncoding(scripts.getEncoding().name());
        }
        for (Resource resource : scripts) {
            populator.addScript(resource);
        }
        customize(populator);
        DatabasePopulatorUtils.execute(populator, this.dataSource);
    }