Creating a Batch Service - Spring Boot
通过 Spring Initializr 创建项目,并添加如下依赖:
- Spring Batch
- HyperSQL Database
Lightweight 100% Java SQL Database Engine.
示例代码
主要的代码在 BatchConfiguration.java 类中。
首先分别定义了一个 reader、processor 和 writer,分来用来
- 读取 csv 文件的内容
- 转换为大写
- 写入数据库
之后将上面的三个 bean 组合成一个 Step
,然后构建成一个 Job
。
另外 Job
还注册了一个监听器 JobCompletionNotificationListener
,该监听器实现了 afterJob
方法,会在 Job 执行后读取数据库的内容并打印。
完整代码如下:
xml
<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 https://maven.apache.org/xsd/maven-4.0.0.xsd">
<modelVersion>4.0.0</modelVersion>
<parent>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-parent</artifactId>
<version>3.4.4</version>
<relativePath/> <!-- lookup parent from repository -->
</parent>
<groupId>me.liujiajia</groupId>
<artifactId>batch-example</artifactId>
<version>0.0.1-SNAPSHOT</version>
<name>batch-example</name>
<description>Demo project for Spring Boot</description>
<url/>
<licenses>
<license/>
</licenses>
<developers>
<developer/>
</developers>
<scm>
<connection/>
<developerConnection/>
<tag/>
<url/>
</scm>
<properties>
<java.version>17</java.version>
</properties>
<dependencies>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-batch</artifactId>
</dependency>
<dependency>
<groupId>org.hsqldb</groupId>
<artifactId>hsqldb</artifactId>
<scope>runtime</scope>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-test</artifactId>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.springframework.batch</groupId>
<artifactId>spring-batch-test</artifactId>
<scope>test</scope>
</dependency>
</dependencies>
<build>
<plugins>
<plugin>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-maven-plugin</artifactId>
</plugin>
</plugins>
</build>
</project>
java
package me.liujiajia.batch_example;
public record Person(String firstName, String lastName) {
}
java
package me.liujiajia.batch_example;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.batch.item.ItemProcessor;
public class PersonItemProcessor implements ItemProcessor<Person, Person> {
private static final Logger log = LoggerFactory.getLogger(PersonItemProcessor.class);
@Override
public Person process(final Person person) {
final String firstName = person.firstName().toUpperCase();
final String lastName = person.lastName().toUpperCase();
final Person transformedPerson = new Person(firstName, lastName);
log.info("Converting ({}) into ({})", person, transformedPerson);
return transformedPerson;
}
}
java
package me.liujiajia.batch_example;
import org.springframework.batch.core.Job;
import org.springframework.batch.core.Step;
import org.springframework.batch.core.job.builder.JobBuilder;
import org.springframework.batch.core.repository.JobRepository;
import org.springframework.batch.core.step.builder.StepBuilder;
import org.springframework.batch.item.database.JdbcBatchItemWriter;
import org.springframework.batch.item.database.builder.JdbcBatchItemWriterBuilder;
import org.springframework.batch.item.file.FlatFileItemReader;
import org.springframework.batch.item.file.builder.FlatFileItemReaderBuilder;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.core.io.ClassPathResource;
import org.springframework.jdbc.datasource.DataSourceTransactionManager;
import javax.sql.DataSource;
@Configuration
public class BatchConfiguration {
@Bean
public FlatFileItemReader<Person> reader() {
return new FlatFileItemReaderBuilder<Person>()
.name("personItemReader")
.resource(new ClassPathResource("sample-data.csv"))
.delimited()
.names("firstName", "lastName")
.targetType(Person.class)
.build();
}
@Bean
public PersonItemProcessor processor() {
return new PersonItemProcessor();
}
@Bean
public JdbcBatchItemWriter<Person> writer(DataSource dataSource) {
return new JdbcBatchItemWriterBuilder<Person>()
.sql("INSERT INTO people (first_name, last_name) VALUES (:firstName, :lastName)")
.dataSource(dataSource)
.beanMapped()
.build();
}
@Bean
public Job importUserJob(JobRepository jobRepository, Step step1, JobCompletionNotificationListener listener) {
return new JobBuilder("importUserJob", jobRepository)
.listener(listener)
.start(step1)
.build();
}
@Bean
public Step step1(JobRepository jobRepository, DataSourceTransactionManager transactionManager,
FlatFileItemReader<Person> reader, PersonItemProcessor processor, JdbcBatchItemWriter<Person> writer) {
return new StepBuilder("step1", jobRepository)
.<Person, Person>chunk(3, transactionManager)
.reader(reader)
.processor(processor)
.writer(writer)
.build();
}
}
java
package me.liujiajia.batch_example;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.batch.core.BatchStatus;
import org.springframework.batch.core.JobExecution;
import org.springframework.batch.core.JobExecutionListener;
import org.springframework.jdbc.core.DataClassRowMapper;
import org.springframework.jdbc.core.JdbcTemplate;
import org.springframework.stereotype.Component;
@Component
public class JobCompletionNotificationListener implements JobExecutionListener {
private static final Logger log = LoggerFactory.getLogger(JobCompletionNotificationListener.class);
private final JdbcTemplate jdbcTemplate;
public JobCompletionNotificationListener(JdbcTemplate jdbcTemplate) {
this.jdbcTemplate = jdbcTemplate;
}
@Override
public void afterJob(JobExecution jobExecution) {
if (jobExecution.getStatus() == BatchStatus.COMPLETED) {
log.info("!!! JOB FINISHED! Time to verify the results");
jdbcTemplate
.query("SELECT first_name, last_name FROM people", new DataClassRowMapper<>(Person.class))
.forEach(person -> log.info("Found <{}> in the database.", person));
}
}
}
csv
Jill,Doe
Joe,Doe
Justin,Doe
Jane,Doe
John,Doe
sql
DROP TABLE people IF EXISTS;
CREATE TABLE people (
person_id BIGINT IDENTITY NOT NULL PRIMARY KEY,
first_name VARCHAR(20),
last_name VARCHAR(20)
);
properties
spring.application.name=batch-example
java
package me.liujiajia.batch_example;
import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;
@SpringBootApplication
public class BatchExampleApplication {
public static void main(String[] args) {
System.exit(SpringApplication.exit(SpringApplication.run(BatchExampleApplication.class, args)));
}
}
txt
. ____ _ __ _ _
/\\ / ___'_ __ _ _(_)_ __ __ _ \ \ \ \
( ( )\___ | '_ | '_| | '_ \/ _` | \ \ \ \
\\/ ___)| |_)| | | | | || (_| | ) ) ) )
' |____| .__|_| |_|_| |_\__, | / / / /
=========|_|==============|___/=/_/_/_/
:: Spring Boot :: (v3.4.4)
2025-04-01T16:47:50.280+08:00 INFO 24108 --- [batch-example] [ main] m.l.b.BatchExampleApplication : Starting BatchExampleApplication using Java 22.0.2 with PID 24108 (D:\projects\gitee\ryukaka\example\batch-example\target\classes started by 佳佳 in D:\projects\gitee\ryukaka\example\batch-example)
2025-04-01T16:47:50.284+08:00 INFO 24108 --- [batch-example] [ main] m.l.b.BatchExampleApplication : No active profile set, falling back to 1 default profile: "default"
2025-04-01T16:47:51.781+08:00 INFO 24108 --- [batch-example] [ main] com.zaxxer.hikari.HikariDataSource : HikariPool-1 - Starting...
2025-04-01T16:47:52.354+08:00 INFO 24108 --- [batch-example] [ main] com.zaxxer.hikari.pool.PoolBase : HikariPool-1 - Driver does not support get/set network timeout for connections. (feature not supported)
2025-04-01T16:47:52.357+08:00 INFO 24108 --- [batch-example] [ main] com.zaxxer.hikari.pool.HikariPool : HikariPool-1 - Added connection org.hsqldb.jdbc.JDBCConnection@3a22bad6
2025-04-01T16:47:52.360+08:00 INFO 24108 --- [batch-example] [ main] com.zaxxer.hikari.HikariDataSource : HikariPool-1 - Start completed.
2025-04-01T16:47:52.908+08:00 INFO 24108 --- [batch-example] [ main] m.l.b.BatchExampleApplication : Started BatchExampleApplication in 3.585 seconds (process running for 4.904)
2025-04-01T16:47:52.913+08:00 INFO 24108 --- [batch-example] [ main] o.s.b.a.b.JobLauncherApplicationRunner : Running default command line with: []
2025-04-01T16:47:52.972+08:00 INFO 24108 --- [batch-example] [ main] o.s.b.c.l.s.TaskExecutorJobLauncher : Job: [SimpleJob: [name=importUserJob]] launched with the following parameters: [{}]
2025-04-01T16:47:53.000+08:00 INFO 24108 --- [batch-example] [ main] o.s.batch.core.job.SimpleStepHandler : Executing step: [step1]
2025-04-01T16:47:53.023+08:00 INFO 24108 --- [batch-example] [ main] m.l.batch_example.PersonItemProcessor : Converting (Person[firstName=Jill, lastName=Doe]) into (Person[firstName=JILL, lastName=DOE])
2025-04-01T16:47:53.027+08:00 INFO 24108 --- [batch-example] [ main] m.l.batch_example.PersonItemProcessor : Converting (Person[firstName=Joe, lastName=Doe]) into (Person[firstName=JOE, lastName=DOE])
2025-04-01T16:47:53.027+08:00 INFO 24108 --- [batch-example] [ main] m.l.batch_example.PersonItemProcessor : Converting (Person[firstName=Justin, lastName=Doe]) into (Person[firstName=JUSTIN, lastName=DOE])
2025-04-01T16:47:53.039+08:00 INFO 24108 --- [batch-example] [ main] m.l.batch_example.PersonItemProcessor : Converting (Person[firstName=Jane, lastName=Doe]) into (Person[firstName=JANE, lastName=DOE])
2025-04-01T16:47:53.039+08:00 INFO 24108 --- [batch-example] [ main] m.l.batch_example.PersonItemProcessor : Converting (Person[firstName=John, lastName=Doe]) into (Person[firstName=JOHN, lastName=DOE])
2025-04-01T16:47:53.043+08:00 INFO 24108 --- [batch-example] [ main] o.s.batch.core.step.AbstractStep : Step: [step1] executed in 42ms
2025-04-01T16:47:53.048+08:00 INFO 24108 --- [batch-example] [ main] m.l.b.JobCompletionNotificationListener : !!! JOB FINISHED! Time to verify the results
2025-04-01T16:47:53.052+08:00 INFO 24108 --- [batch-example] [ main] m.l.b.JobCompletionNotificationListener : Found <Person[firstName=JILL, lastName=DOE]> in the database.
2025-04-01T16:47:53.052+08:00 INFO 24108 --- [batch-example] [ main] m.l.b.JobCompletionNotificationListener : Found <Person[firstName=JOE, lastName=DOE]> in the database.
2025-04-01T16:47:53.052+08:00 INFO 24108 --- [batch-example] [ main] m.l.b.JobCompletionNotificationListener : Found <Person[firstName=JUSTIN, lastName=DOE]> in the database.
2025-04-01T16:47:53.052+08:00 INFO 24108 --- [batch-example] [ main] m.l.b.JobCompletionNotificationListener : Found <Person[firstName=JANE, lastName=DOE]> in the database.
2025-04-01T16:47:53.052+08:00 INFO 24108 --- [batch-example] [ main] m.l.b.JobCompletionNotificationListener : Found <Person[firstName=JOHN, lastName=DOE]> in the database.
2025-04-01T16:47:53.055+08:00 INFO 24108 --- [batch-example] [ main] o.s.b.c.l.s.TaskExecutorJobLauncher : Job: [SimpleJob: [name=importUserJob]] completed with the following parameters: [{}] and the following status: [COMPLETED] in 60ms
2025-04-01T16:47:53.062+08:00 INFO 24108 --- [batch-example] [ main] com.zaxxer.hikari.HikariDataSource : HikariPool-1 - Shutdown initiated...
2025-04-01T16:47:53.078+08:00 INFO 24108 --- [batch-example] [ main] com.zaxxer.hikari.HikariDataSource : HikariPool-1 - Shutdown completed.
已与地址为 ''127.0.0.1:51027',传输: '套接字'' 的目标虚拟机断开连接
进程已结束,退出代码为 0
关于 schema-all.sql
schema-all.sql 这个文件在示例代码中没有调用的地方。
官方文档中的说明:
Spring Boot runs
schema-@@platform@@.sql
automatically during startup.-all
is the default for all platforms.
具体的代码在 spring-boot-autoconfigure 包中。
- 在
DataSourceInitializationConfiguration
定义SqlDataSourceScriptDatabaseInitializer
Bean SqlDataSourceScriptDatabaseInitializer
的getSettings
方法中加载配置
根据platform
加载不同的 sql 文件。另外,也可以通过配置项spring.sql.init.schema-locations
修改 schema.sql 文件的路径。java@ImportRuntimeHints(SqlInitializationScriptsRuntimeHints.class) public class SqlDataSourceScriptDatabaseInitializer extends DataSourceScriptDatabaseInitializer { public SqlDataSourceScriptDatabaseInitializer(DataSource dataSource, SqlInitializationProperties properties) { this(dataSource, getSettings(properties)); } public SqlDataSourceScriptDatabaseInitializer(DataSource dataSource, DatabaseInitializationSettings settings) { super(dataSource, settings); } public static DatabaseInitializationSettings getSettings(SqlInitializationProperties properties) { return SettingsCreator.createFrom(properties); } }
javafinal class SettingsCreator { private SettingsCreator() { } static DatabaseInitializationSettings createFrom(SqlInitializationProperties properties) { DatabaseInitializationSettings settings = new DatabaseInitializationSettings(); settings .setSchemaLocations(scriptLocations(properties.getSchemaLocations(), "schema", properties.getPlatform())); settings.setDataLocations(scriptLocations(properties.getDataLocations(), "data", properties.getPlatform())); settings.setContinueOnError(properties.isContinueOnError()); settings.setSeparator(properties.getSeparator()); settings.setEncoding(properties.getEncoding()); settings.setMode(properties.getMode()); return settings; } private static List<String> scriptLocations(List<String> locations, String fallback, String platform) { if (locations != null) { return locations; } List<String> fallbackLocations = new ArrayList<>(); fallbackLocations.add("optional:classpath*:" + fallback + "-" + platform + ".sql"); fallbackLocations.add("optional:classpath*:" + fallback + ".sql"); return fallbackLocations; } }
java@ConfigurationProperties("spring.sql.init") public class SqlInitializationProperties { private List<String> schemaLocations; private List<String> dataLocations; private String platform = "all"; private String username; private String password; private boolean continueOnError = false; private String separator = ";"; private Charset encoding; private DatabaseInitializationMode mode = DatabaseInitializationMode.EMBEDDED; // ... }
javapublic enum DatabaseInitializationMode { /** * Always initialize the database. */ ALWAYS, /** * Only initialize an embedded database. */ EMBEDDED, /** * Never initialize the database. */ NEVER }
- 由于
SqlDataSourceScriptDatabaseInitializer
实现了InitializingBean
接口,在 Bean 创建结束后会执行afterPropertiesSet
方法
实际的实现在AbstractScriptDatabaseInitializer
类中,最后调用的runScripts
方法的实现在DataSourceScriptDatabaseInitializer
类中。java@Override public void afterPropertiesSet() throws Exception { initializeDatabase(); } public boolean initializeDatabase() { ScriptLocationResolver locationResolver = new ScriptLocationResolver(this.resourceLoader); boolean initialized = applySchemaScripts(locationResolver); return applyDataScripts(locationResolver) || initialized; } private boolean applySchemaScripts(ScriptLocationResolver locationResolver) { return applyScripts(this.settings.getSchemaLocations(), "schema", locationResolver); } private boolean applyScripts(List<String> locations, String type, ScriptLocationResolver locationResolver) { List<Resource> scripts = getScripts(locations, type, locationResolver); if (!scripts.isEmpty() && isEnabled()) { runScripts(scripts); return true; } return false; } private void runScripts(List<Resource> resources) { runScripts(new Scripts(resources).continueOnError(this.settings.isContinueOnError()) .separator(this.settings.getSeparator()) .encoding(this.settings.getEncoding())); }
javaprotected void runScripts(Scripts scripts) { ResourceDatabasePopulator populator = new ResourceDatabasePopulator(); populator.setContinueOnError(scripts.isContinueOnError()); populator.setSeparator(scripts.getSeparator()); if (scripts.getEncoding() != null) { populator.setSqlScriptEncoding(scripts.getEncoding().name()); } for (Resource resource : scripts) { populator.addScript(resource); } customize(populator); DatabasePopulatorUtils.execute(populator, this.dataSource); }