Spring Batch Processing

Javokhir Narzullayev
5 min readSep 21, 2022

Hi guys it is again me Javokhir Narzullayev. Okay, today's topic is dedicated to the Spring Batch framework. Sure this theme very huge might say a lot be said. But I say mainly parts of the framework. Let's start!

So this is the simple architecture of the Spring batch. These are the key terms: Job, Step, Reader, Processor, Writer and others.

Why and when it is needed to use that technology. Imagine you have big data like(database, file or another type of data) that architecture helps us migrate data from A point to B. Maybe use another situation but I suppose exactly that in my case helps me. Okay let's see if it is in practice

Required technology: Java 8+,Maven,PostgresSQL

<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 https://maven.apache.org/xsd/maven-4.0.0.xsd">

<parent>
<artifactId>migration</artifactId>
<groupId>uz.narzullayev.javohir</groupId>
<version>0.0.1</version>
</parent>

<modelVersion>4.0.0</modelVersion>
<artifactId>migration</artifactId>


<dependencies>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-batch</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-data-jpa</artifactId>
</dependency>
<dependency>
<groupId>org.flywaydb</groupId>
<artifactId>flyway-core</artifactId>
</dependency>
<dependency>
<groupId>org.postgresql</groupId>
<artifactId>postgresql</artifactId>
<scope>runtime</scope>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-configuration-processor</artifactId>
<optional>true</optional>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-test</artifactId>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.springframework.batch</groupId>
<artifactId>spring-batch-test</artifactId>
<scope>test</scope>
</dependency>
</dependencies>

<build>
<finalName>migration</finalName>
<plugins>
<plugin>
<artifactId>spring</artifactId>
</plugin>
<plugin>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-maven-plugin</artifactId>
</plugin>
<plugin>
<groupId>org.flywaydb</groupId>
<artifactId>flyway-maven-plugin</artifactId>
<version>8.0.0</version>
</plugin>
</plugins>
</build>

</project>

Maven dependencies:

spring-boot-starter-batch - Starter of Spring batch
spring-boot-starter-data-jpa - Starter of Specification working with DAO
flyway-core - For fully manipulate SQL command without
spring-boot-configuration-processor - generate meta information

RESOURCE FOLDER

GENERATE SPRING BATCH TABLES

create  table  if not exists batch_job_instance
(
job_instance_id bigint not null
primary key,
version bigint,
job_name varchar(100) not null,
job_key varchar(32) not null,
constraint job_inst_un
unique (job_name, job_key)
);

alter table batch_job_instance
owner to migration_user;

create table if not exists batch_job_execution
(
job_execution_id bigint not null
primary key,
version bigint,
job_instance_id bigint not null
constraint job_inst_exec_fk
references batch_job_instance,
create_time timestamp not null,
start_time timestamp,
end_time timestamp,
status varchar(10),
exit_code varchar(2500),
exit_message varchar(2500),
last_updated timestamp,
job_configuration_location varchar(2500)
);

alter table batch_job_execution
owner to migration_user;



create table batch_job_execution_params
(
job_execution_id bigint not null
constraint job_exec_params_fk
references batch_job_execution,
type_cd varchar(6) not null,
key_name varchar(100) not null,
string_val varchar(250),
date_val timestamp,
long_val bigint,
double_val double precision,
identifying char not null
);

alter table batch_job_execution_params
owner to migration_user;

create table batch_step_execution
(
step_execution_id bigint not null
primary key,
version bigint not null,
step_name varchar(100) not null,
job_execution_id bigint not null
constraint job_exec_step_fk
references batch_job_execution,
start_time timestamp not null,
end_time timestamp,
status varchar(10),
commit_count bigint,
read_count bigint,
filter_count bigint,
write_count bigint,
read_skip_count bigint,
write_skip_count bigint,
process_skip_count bigint,
rollback_count bigint,
exit_code varchar(2500),
exit_message varchar(2500),
last_updated timestamp
);

alter table batch_step_execution
owner to migration_user;

create table batch_step_execution_context
(
step_execution_id bigint not null
primary key
constraint step_exec_ctx_fk
references batch_step_execution,
short_context varchar(2500) not null,
serialized_context text
);

alter table batch_step_execution_context
owner to migration_user;

create table batch_job_execution_context
(
job_execution_id bigint not null
primary key
constraint job_exec_ctx_fk
references batch_job_execution,
short_context varchar(2500) not null,
serialized_context text
);

alter table batch_job_execution_context
owner to migration_user;


/*Sequence generate*/
create sequence if not exists batch_step_execution_seq maxvalue 9223372036854775807 no cycle;
alter sequence batch_step_execution_seq owner to migration_user;

create sequence if not exists batch_job_execution_seq maxvalue 9223372036854775807 no cycle;
alter sequence batch_job_execution_seq owner to migration_user;

create sequence if not exists batch_job_seq maxvalue 9223372036854775807 no cycle;
alter sequence batch_job_seq owner to migration_user;

Batch tables need for managing job steps and others. Actually might be used in-memory table but it will be in any situation we lose the point of the score. Save batch table gives us more chance for example server is shutdown or has a problem with the network and some force majeure situation happened, it will continue from the moment it stopped

APPLICATION.YML

spring:
application:
name: MIGRATION
security:
user:
name: user
password: user
datasource:
jdbc-url: jdbc:postgresql://localhost:5432/?migration_db
username: migration_user
password: 123
driverClassName: org.postgresql.Driver
hikari:
maximum-pool-size: 50
lifecycle:
timeout-per-shutdown-phase: 30s
jpa:
open-in-view: false
show-sql: true
hibernate:
ddl-auto: update
properties:
hibernate:
id.new_generator_mappings: true
order_inserts: true
order_updates: true
jdbc:
batch_size: 10
dialect: org.hibernate.dialect.PostgreSQLDialect
database-platform: org.hibernate.dialect.PostgreSQLDialect
format_sql: true
batch:
job:
# This is to prevent job started on its own
enabled: false
# To enable Table creation
initialize-schema: NEVER
main:
allow-bean-definition-overriding: true

SOURCE-CODE

@EnableAsync
@SpringBootApplication
@EnableBatchProcessing
public class MigrationApplication {
public static void main(String[] args) {
SpringApplication.run(MigrationApplication.class, args);
}

}
//Controller@Slf4j
@RestController
@RequestMapping("/api/migration")
public class JobController {

@Autowired
private JobLauncher jobLauncher;

@Autowired
private JobRepository jobRepository;


@Autowired
ListableJobLocator jobLocator;

@Autowired
@Qualifier(JobTemplate.SOATO)
private Job soatoJob;

@Autowired
private JobExplorer jobExplorer;
@PersistenceContext
private EntityManager entityManager;


@SuppressWarnings("unchecked")
public List<?> jobs() {
var sql = "select * from batch_step_execution order by step_execution_id";

var resultList = (List<Tuple>) entityManager.createNativeQuery(sql, Tuple.class).getResultList();
return resultList.stream().map(tuple -> {
var map = new HashMap<>();
map.put("step_execution_id", tuple.get("step_execution_id"));
map.put("step_name", tuple.get("step_name"));
map.put("read_count", tuple.get("read_count"));
map.put("write_count", tuple.get("write_count"));
map.put("start_time", tuple.get("start_time"));
map.put("end_time", tuple.get("end_time"));
map.put("status", tuple.get("status"));
map.put("commit_count", tuple.get("commit_count"));
map.put("exit_message", tuple.get("exit_message"));
map.put("exit_code", tuple.get("exit_code"));
return map;
}).collect(Collectors.toList());
}

@GetMapping("/soato")
public String batchSoato() throws JobExecutionAlreadyRunningException, JobRestartException, JobInstanceAlreadyCompleteException, JobParametersInvalidException {
return runJob(JobTemplate.SOATO, soatoJob);
}

private String runJob(String jobName, Job processJob) throws JobExecutionAlreadyRunningException, JobRestartException, JobInstanceAlreadyCompleteException, JobParametersInvalidException {

JobParameters jobParameters = new JobParametersBuilder()
.addLong("dateTime", System.currentTimeMillis())
.toJobParameters();
JobExecution execution = jobRepository.createJobExecution(jobName, jobParameters);
if (!execution.isRunning()){
jobLauncher.run(processJob, jobParameters);
return ":) Job Started Successfully ! :) " + processJob.getName();
}
return ":) Sorry job is running ! :) " + processJob.getName();
}

}
@UtilityClass
public class JobTemplate {
public final static String SOATO ="SOATO";

}
//Configuration JobConfig@Slf4j
@Configuration
@RequiredArgsConstructor
public class JobConfig {
private final JobBuilderFactory jobs;
private final StepBuilderFactory steps;
private final SoatoRepository soatoRepository;



@Bean(name = JobTemplate.SOATO)
public Job reyestrJob() {
return jobs.get(JobTemplate.SOATO)
.incrementer(new RunIdIncrementer())
.listener(new JobStatusListener())
.start(soatoStep())
.build();
}

@Bean
public Step soatoStep() {
return steps.get("soato-step-1")
.<SoatoComparison, Soato>chunk(QueryTemplate.CHUNK)
.reader(soatoReader())
.processor(soatoWriter())
.writer(soatoRepository::saveAll)
.listener(new JobWriterListener<>())
.listener(new JobProcessListener())
.build();
}

@Bean
public ItemReader<SoatoComparison> soatoReader() {
var reader = new FlatFileItemReader<SoatoComparison>();
reader.setLinesToSkip(1);
reader.setResource(new ClassPathResource("soato.csv"));
reader.setEncoding("UTF-8");
reader.setLineMapper(soatoRowMapper());
return reader;
}
@Bean
public ItemProcessor<SoatoComparison,Soato> soatoWriter() {
return soatoComparison -> {
var soato_id = soatoComparison.getSoato_id();
var byId = soatoRepository.findById(soato_id);
if (byId.isPresent()) {
Soato soato = byId.get();
soato.setOldDbId(soatoComparison.getId());
return soato;
}
return null;
};
}
//READING FROM CSV FILE MAPPER
public DefaultLineMapper<SoatoComparison> soatoRowMapper(){
var beanWrapperFieldSetMapper = new BeanWrapperFieldSetMapper<SoatoComparison>();
beanWrapperFieldSetMapper.setTargetType(SoatoComparison.class);
var defaultLineMapper = new DefaultLineMapper<SoatoComparison>();
var delimitedLineTokenizer = new DelimitedLineTokenizer();
delimitedLineTokenizer.setNames("id", "par_id", "soato_id","level");
defaultLineMapper.setLineTokenizer(delimitedLineTokenizer);
defaultLineMapper.setFieldSetMapper(beanWrapperFieldSetMapper);
return defaultLineMapper;
}

}

Soato.csv file

id,par_id,soato_id,level
0,,17,
1,0,1703,1
2,0,1706,1
3,0,1708,1
4,0,1712,1
5,0,1714,1
6,0,1718,1
7,0,1726,1

So it is my source code. If that theme is interesting I recommend you additionally search about spring-batch-scheduled this too interesting planning jobs.

--

--