TIL

Configuring a Step

Step은 개발자가 작성하기 나름이기 때문에 모호할 수 있다. 간단한 Step은 데이터를 DB나 파일에서 로드한 뒤 간단한 코드만 실행시키기도 한다. 더 복잡한 Step은 복잡한 비즈니스 규칙을 적용시키기도 한다.

Chunk-oriented Processing

img.png

Configuring a Step

Java configuration을 사용하면 Spring Batch builder를 사용하여 Step을 설정할 수 있다.

// JobRepository는 일반적으로 빈 설정 되어 있어 주입 받을 수 있기에 외부 설정이 필요없다.
@Bean
public Job sampleJob(JobRepository jobRepository, Step sampleStep) {
        return new JobBuilder("sampleJob", jobRepository)
        .start(sampleStep)
        .build();
        }

@Bean
public Step sampleStep(JobRepository jobRepository, PlatformTransactionManager transactionManager) {
        return new StepBuilder("sampleStep", jobRepository)
        .<String, String>chunk(10, transactionManager)
        .reader(itemReader())
        .writer(itemWriter())
        .build();
        }

Ingeriting from a Parent Step

Abstract Step

The Commit Interval

Configuring a Step for Restart

아래 설명은 모드 한 JobInstance에 해당하는 Step의 재시작 여부라고 생각하면 이해하기 쉽다.

Setting a Start Limit

Restarting a Completed Step

@Bean
public Step step1(JobRepository jobRepository, PlatformTransactionManager transactionManager) {
	return tnew StepBuilder("step1", jobRepository)
				.<String, String>chunk(10, transactionManager)
				.reader(itemReader())
				.writer(itemWriter())
				.allowStartIfComplete(true)
				.build();
}

Step Restart Configuration Example

Configuring Skip Logic

Configuring Retry Logic

Controlling Rollback

Transactional Readers

@Bean
public Step step1(JobRepository jobRepository, PlatformTransactionManager transactionManager) {
	return tnew StepBuilder("step1", jobRepository)
				.<String, String>chunk(2, transactionManager)
				.reader(itemReader())
				.writer(itemWriter())
				.readerIsTransactionalQueue()
				.build();
}

Transaction Attributes

@Bean
public Step step1(JobRepository jobRepository, PlatformTransactionManager transactionManager) {
	DefaultTransactionAttribute attribute = new DefaultTransactionAttribute();
	attribute.setPropagationBehavior(Propagation.REQUIRED.value());
	attribute.setIsolationLevel(Isolation.DEFAULT.value());
	attribute.setTimeout(30);

	return new StepBuilder("step1", jobRepository)
				.<String, String>chunk(2, transactionManager)
				.reader(itemReader())
				.writer(itemWriter())
				.transactionAttribute(attribute)
				.build();
}

Registering ItemStream with a Step

Intercepting Step Execution

StepExecutionListener

public interface StepExecutionListener extends StepListener {

    void beforeStep(StepExecution stepExecution);

    ExitStatus afterStep(StepExecution stepExecution);

}

ChunkListener

public interface ChunkListener extends StepListener {

    void beforeChunk(ChunkContext context); // 청크 처리 전 호출
    void afterChunk(ChunkContext context); // 청크가 커밋된 후에만 호출
    void afterChunkError(ChunkContext context);

}

ItemReadListener

public interface ItemReadListener<T> extends StepListener {

    void beforeRead();
    void afterRead(T item);
    void onReadError(Exception ex);

}

ItemProcessListener

public interface ItemProcessListener<T, S> extends StepListener {

    void beforeProcess(T item);
    void afterProcess(T item, S result);
    void onProcessError(T item, Exception e);

}

ItemWriteListener

public interface ItemWriteListener<S> extends StepListener {

    void beforeWrite(List<? extends S> items);
    void afterWrite(List<? extends S> items);
    void onWriteError(Exception exception, List<? extends S> items);

}

SkipListener

public interface SkipListener<T,S> extends StepListener {

    void onSkipInRead(Throwable t); // 아이템을 읽는 동안 스킵될 대마다 호출
    void onSkipInProcess(T item, Throwable t);
    void onSkipInWrite(S item, Throwable t); // 쓰는 동안 스킵할 때 호출

}

TaskletStep

TaskletStepStepListener를 구현하는 경우 자동으로 tasklet이 StepListener로 등록된다.

TaskletAdapter

@Bean
public MethodInvokingTaskletAdapter myTasklet() {
	MethodInvokingTaskletAdapter adapter = new MethodInvokingTaskletAdapter();

	adapter.setTargetObject(fooDao());
	adapter.setTargetMethod("updateFoo");

	return adapter;
}

Example Tasklet Implementation

public class FileDeletingTasklet implements Tasklet, InitializingBean {

  private Resource directory;

  public RepeatStatus execute(StepContribution contribution,
                              ChunkContext chunkContext) throws Exception {
    File dir = directory.getFile();
    Assert.state(dir.isDirectory());

    File[] files = dir.listFiles();
    for (int i = 0; i < files.length; i++) {
      boolean deleted = files[i].delete();
      if (!deleted) {
        throw new UnexpectedJobExecutionException("Could not delete file " +
                files[i].getPath());
      }
    }
    return RepeatStatus.FINISHED;
  }

  public void setDirectoryResource(Resource directory) {
    this.directory = directory;
  }

  public void afterPropertiesSet() throws Exception {
    Assert.state(directory != null, "directory must be set");
  }
}
@Bean
public Job taskletJob(JobRepository jobRepository) {
        return new JobBuilder("taskletJob", jobRepository)
        .start(deleteFilesInDir())
        .build();
}

@Bean
public Step deleteFilesInDir(JobRepository jobRepository, PlatformTransactionManager transactionManager) {
        return new StepBuilder("deleteFilesInDir", jobRepository)
        .tasklet(fileDeletingTasklet(), transactionManager)
        .build();
}

@Bean
public FileDeletingTasklet fileDeletingTasklet() {
        FileDeletingTasklet tasklet = new FileDeletingTasklet();

        tasklet.setDirectoryResource(new FileSystemResource("target/test-outputs/test-dir"));

        return tasklet;
}