Step
은 개발자가 작성하기 나름이기 때문에 모호할 수 있다. 간단한 Step
은 데이터를 DB나 파일에서 로드한 뒤 간단한 코드만 실행시키기도 한다. 더 복잡한 Step
은 복잡한 비즈니스 규칙을 적용시키기도 한다.
ItemWriter
에 의해 기록된 다음 트랜잭션이 커밋된다.ItemProcessor
를 추가하여 ItemWriter가 데이터를 쓰기 전에 특정 처리를 할 수도 있다.Java configuration을 사용하면 Spring Batch builder를 사용하여 Step
을 설정할 수 있다.
// JobRepository는 일반적으로 빈 설정 되어 있어 주입 받을 수 있기에 외부 설정이 필요없다.
@Bean
public Job sampleJob(JobRepository jobRepository, Step sampleStep) {
return new JobBuilder("sampleJob", jobRepository)
.start(sampleStep)
.build();
}
@Bean
public Step sampleStep(JobRepository jobRepository, PlatformTransactionManager transactionManager) {
return new StepBuilder("sampleStep", jobRepository)
.<String, String>chunk(10, transactionManager)
.reader(itemReader())
.writer(itemWriter())
.build();
}
Step
들이 비슷한 설정을 공유하는 경우 ‘parent’ Step
을 정의할 수 있다.Step
은 부모의 속성, 요소와 결합된다.
Step
이 아닌 추상 Step
을 정의해야할 수도 있다.
reader
, writer
, tasklet
속성이 Step 구성에서 제외되는 경우 초기화에 실패하지만 이 구성 없이 초기화시키고 싶을 수도 있다.abstract
Step
은 오직 확장될 수 있고 인스턴스화는 시키지 못한다.Step
은 PlatformTransactionManager
를 사용하여 주기적으로 아이템을 읽고 쓴 뒤 커밋한다.
commit-interval
이 1이면 개별 아이템을 쓴 후 커밋한다.아래 설정에선 commit-interval
이 10이기 때문에 10개씩 커밋을 하게 된다.
@Bean
public Job sampleJob(JobRepository jobRepository) {
return new JobBuilder("sampleJob", jobRepository)
.start(step1())
.build();
}
@Bean
public Step step1(JobRepository jobRepository, PlatformTransactionManager transactionManager) {
return tnew StepBuilder("step1", jobRepository)
.<String, String>chunk(10, transactionManager)
.reader(itemReader())
.writer(itemWriter())
.build();
}
아래 설명은 모드 한 JobInstance
에 해당하는 Step
의 재시작 여부라고 생각하면 이해하기 쉽다.
Step
을 시작하는 횟수를 제어하는 시나리오는 여러가지가 있다.
Step
, 무한 실행 가능한 Step
아래 코드는 1번만 실행하게 하는 Step
설정이다.
@Bean
public Step step1(JobRepository jobRepository, PlatformTransactionManager transactionManager) {
return tnew StepBuilder("step1", jobRepository)
.<String, String>chunk(10, transactionManager)
.reader(itemReader())
.writer(itemWriter())
.startLimit(1)
.build();
}
StartLimitExceededException
이 발생한다.Integer.MAX_VALUE
이다.Step
이 있다.Step
을 처리할 때 COMOLETED
상태는 스킵된다.true
로 오버라이딩하면 step이 항상 실행된다.@Bean
public Step step1(JobRepository jobRepository, PlatformTransactionManager transactionManager) {
return tnew StepBuilder("step1", jobRepository)
.<String, String>chunk(10, transactionManager)
.reader(itemReader())
.writer(itemWriter())
.allowStartIfComplete(true)
.build();
}
Step
을 실패로 처리하는게 아니라 무시하고 넘어가야할 때가 있다.
아래 코드에선 FlatFileParseException
이 발생하는 경우 실패 처리를 하지 않고 10번까지는 스킵하도록 설정했다.
@Bean
public Step step1(JobRepository jobRepository, PlatformTransactionManager transactionManager) {
return tnew StepBuilder("step1", jobRepository)
.<String, String>chunk(10, transactionManager)
.reader(flatFileItemReader())
.writer(itemWriter())
.faultTolerant()
.skipLimit(10)
.skip(FlatFileParseException.class)
.build();
}
아래 예제 코드는 모든 예외를 스킵하지만 FileNotFoundException
만 예외로 실패 처리를 한 코드다.
@Bean
public Step step1(JobRepository jobRepository, PlatformTransactionManager transactionManager) {
return tnew StepBuilder("step1", jobRepository)
.<String, String>chunk(10, transactionManager)
.reader(flatFileItemReader())
.writer(itemWriter())
.faultTolerant()
.skipLimit(10)
.skip(Exception.class)
.noSkip(FileNotFoundException.class)
.build();
}
하지만 DeadlockLoserDataAccessException
처럼 다시 시도해서 성공할 수도 있는 예외엔 재시도 처리를 할 수 있다.
@Bean
public Step step1(JobRepository jobRepository, PlatformTransactionManager transactionManager) {
return new StepBuilder("step1", jobRepository)
.<String, String>chunk(2, transactionManager)
.reader(itemReader())
.writer(itemWriter())
.faultTolerant()
.retryLimit(3)
.retry(DeadlockLoserDataAccessException.class)
.build();
}
ItemWriter
에서 발생하는 모든 예외는 Step에서
처리되는 트랜잭션을 롤백시킨다.
ItemReader
에서 발생한 예외는 롤백시키지 않는다.ItemWriter
에서 예외가 발생해도 롤백시키고 싶지 않을 때 대상 예외들을 지정할 수 있다.
@Bean
public Step step1(JobRepository jobRepository, PlatformTransactionManager transactionManager) {
return tnew StepBuilder("step1", jobRepository)
.<String, String>chunk(2, transactionManager)
.reader(itemReader())
.writer(itemWriter())
.faultTolerant()
.noRollback(ValidationException.class)
.build();
}
ItemReader
는 데이터를 읽을 때 기본적으로 앞에서 뒤로만 읽는다.Step
은 데이터를 읽고 나면 버퍼에 넣어두기에 롤백 후 데이터를 다시 읽을 필요는 없다.@Bean
public Step step1(JobRepository jobRepository, PlatformTransactionManager transactionManager) {
return tnew StepBuilder("step1", jobRepository)
.<String, String>chunk(2, transactionManager)
.reader(itemReader())
.writer(itemWriter())
.readerIsTransactionalQueue()
.build();
}
Step
에서도 트랜잭션 격리와 전파, 그리고 타임아웃 설정을 할 수 있다.@Bean
public Step step1(JobRepository jobRepository, PlatformTransactionManager transactionManager) {
DefaultTransactionAttribute attribute = new DefaultTransactionAttribute();
attribute.setPropagationBehavior(Propagation.REQUIRED.value());
attribute.setIsolationLevel(Isolation.DEFAULT.value());
attribute.setTimeout(30);
return new StepBuilder("step1", jobRepository)
.<String, String>chunk(2, transactionManager)
.reader(itemReader())
.writer(itemWriter())
.transactionAttribute(attribute)
.build();
}
ItemStream
콜백을 처리해야 할 때가 있다.
ItemStream
은 step이 실패 후 재시작하는 경우에 각 실행 상태에 대한 정보를 얻을 수 있는 인터페이스를 제공한다.ItemStream
인터페이스를 ItemReader
, ItemProcessor
, ItemWriter
중 하나로 구현하면 자동으로 등록된다.Step
에 stream을 등록할 땐 아래와 같이 한다.
@Bean
public Step step1(JobRepository jobRepository, PlatformTransactionManager transactionManager) {
return tnew StepBuilder("step1", jobRepository)
.<String, String>chunk(2, transactionManager)
.reader(itemReader())
.writer(compositeItemWriter())
.stream(fileItemWriter1())
.stream(fileItemWriter2())
.build();
}
/**
* In Spring Batch 4, the CompositeItemWriter implements ItemStream so this isn't
* necessary, but used for an example.
*/
@Bean
public CompositeItemWriter compositeItemWriter() {
List<ItemWriter> writers = new ArrayList<>(2);
writers.add(fileItemWriter1());
writers.add(fileItemWriter2());
CompositeItemWriter itemWriter = new CompositeItemWriter();
itemWriter.setDelegates(writers);
return itemWriter;
}
Job
처럼 Step
도 실행 중 발생한 이벤트를 별도로 처리할 때 StepListener
를 사용할 수 있다.StepListener
를 확장한 구현체는 listener()
메서드로 step에 등록할 수 있다.
아래 예제는 청크 레벨로 리스너를 등록했다.
@Bean
public Step step1(JobRepository jobRepository, PlatformTransactionManager transactionManager) {
return new StepBuilder("step1", jobRepository)
.<String, String>chunk(10, transactionManager)
.reader(reader())
.writer(writer())
.listener(chunkListener())
.build();
}
StepListener
가 아니어도 어노테이션으로 같은 관심사를 처리할 수 있다.Step
성공 실패 여부와 상관 없이 시작 전과 완료 후에 작동한다.public interface StepExecutionListener extends StepListener {
void beforeStep(StepExecution stepExecution);
ExitStatus afterStep(StepExecution stepExecution);
}
@BeforeStep
@AfterStep
ChunkListener
는 청크를 처리하기 전이나 후에 호출된다.public interface ChunkListener extends StepListener {
void beforeChunk(ChunkContext context); // 청크 처리 전 호출
void afterChunk(ChunkContext context); // 청크가 커밋된 후에만 호출
void afterChunkError(ChunkContext context);
}
@BeforeChunk
@AfterChunk
@AfterCunkError
ItemReadListener
로 로그를 남길 수 있다.public interface ItemReadListener<T> extends StepListener {
void beforeRead();
void afterRead(T item);
void onReadError(Exception ex);
}
@BeforeRead
@AfterRead
@OnReadError
public interface ItemProcessListener<T, S> extends StepListener {
void beforeProcess(T item);
void afterProcess(T item, S result);
void onProcessError(T item, Exception e);
}
@BeforeProcess
@AfterProcess
@OnProcessError
public interface ItemWriteListener<S> extends StepListener {
void beforeWrite(List<? extends S> items);
void afterWrite(List<? extends S> items);
void onWriteError(Exception exception, List<? extends S> items);
}
@BeforeWrite
@AfterWrite
@OnWriteError
ItemReadListener
, ItemProcessListener
, ItemWriteListener
모두 스킵된 경우엔 알려주지 않는다.public interface SkipListener<T,S> extends StepListener {
void onSkipInRead(Throwable t); // 아이템을 읽는 동안 스킵될 대마다 호출
void onSkipInProcess(T item, Throwable t);
void onSkipInWrite(S item, Throwable t); // 쓰는 동안 스킵할 때 호출
}
@OnSkipInRead
@OnSkipInWrite
@OnSkipInProcess
SkipListener
는 항상 트랜잭션 커밋 직전에 호출된다.
ItemWriter
에서 에러가 발생해도 리스너에서 호출하는 트랜잭션까지는 롤백되지 않는다.Step
을 처리할 수 있다.
Step
이 stored procedure call로 구성될 때 ItemReader
로 call을 구현하면 null을 반환할 수 있다.ItemWriter
가 있어야 해서 부자연스럽다.Tasklet
인터페이스에는 execute
메서드가 하나 존재한다.
TaskletStep
에 의해 반복 호출되는데 RepeatStatus.FINISHED
가 반환되거나 예외가 발생할 때까지 호출한다.Tasklet
호출은 트랜잭션으로 감싸져 있다.Tasklet
구현자는 stored procedure, 스크립트, SQL 문을 호출할 수 있다.
TaskletStep
이StepListener
를 구현하는 경우 자동으로 tasklet이StepListener
로 등록된다.
ItemReader
및 ItemWriter
인터페이스의 다른 어댑터와 마찬가지로 Tasklet
인터페이스도 기존의 모든 클래스에 맞게 조정할 수 있는 TaskletAdapter
가 존재한다.@Bean
public MethodInvokingTaskletAdapter myTasklet() {
MethodInvokingTaskletAdapter adapter = new MethodInvokingTaskletAdapter();
adapter.setTargetObject(fooDao());
adapter.setTargetMethod("updateFoo");
return adapter;
}
public class FileDeletingTasklet implements Tasklet, InitializingBean {
private Resource directory;
public RepeatStatus execute(StepContribution contribution,
ChunkContext chunkContext) throws Exception {
File dir = directory.getFile();
Assert.state(dir.isDirectory());
File[] files = dir.listFiles();
for (int i = 0; i < files.length; i++) {
boolean deleted = files[i].delete();
if (!deleted) {
throw new UnexpectedJobExecutionException("Could not delete file " +
files[i].getPath());
}
}
return RepeatStatus.FINISHED;
}
public void setDirectoryResource(Resource directory) {
this.directory = directory;
}
public void afterPropertiesSet() throws Exception {
Assert.state(directory != null, "directory must be set");
}
}
@Bean
public Job taskletJob(JobRepository jobRepository) {
return new JobBuilder("taskletJob", jobRepository)
.start(deleteFilesInDir())
.build();
}
@Bean
public Step deleteFilesInDir(JobRepository jobRepository, PlatformTransactionManager transactionManager) {
return new StepBuilder("deleteFilesInDir", jobRepository)
.tasklet(fileDeletingTasklet(), transactionManager)
.build();
}
@Bean
public FileDeletingTasklet fileDeletingTasklet() {
FileDeletingTasklet tasklet = new FileDeletingTasklet();
tasklet.setDirectoryResource(new FileSystemResource("target/test-outputs/test-dir"));
return tasklet;
}