Step의 실패가 반드시 Job의 실패를 의미하는 것은 아니다.Step 그룹이 어떻게 구성되는지에 따라 특정 Step은 전혀 처리되지 않을 수도 있다.next()로 Step을 설정하면 된다.
@Bean
public Job job(JobRepository jobRepository) {
return new JobBuilder("job", jobRepository)
.start(stepA())
.next(stepB())
.next(stepC())
.build();
}
Step A, B, C 순으로 실행된다.Step이 실패하면 전체 Job은 실패하고 다음 Step은 실행되지 않는다.Step 성공 후 다음 Step으로 이어지거나 실패해서 다음 Step이 실행되지 않는 두 시나리오만 있었다.Step 실패 후 다른 Step을 트리거해야 하는 시나리오가 필요할 수도 있다.Java API를 사용해서 실패 시 수행할 Step을 지정할 수 있다.
@Bean
public Job job(JobRepository jobRepository) {
return new JobBuilder("job", jobRepository)
.start(stepA())
.on("*").to(stepB())
.from(stepA()).on("FAILED").to(stepC())
.end()
.build();
}
on() 메서드는 간단한 패턴 매칭 체계를 사용하여 Step 실행 결과인 ExitStatus를 검사한다.
*: 0개 이상의 문자와 일치?: 정확히 한 문자와 일치c*t는 cat, count와 매칭되지만 c?t는 cat이랑만 매칭된다.Step 실행 결과인 ExitStatus가 요소에 매칭되지 않으면 예외가 발생하고 Job이 실패한다.Step A가 실패해도 Step C가 실행된다.Conditional Flow를 구성할 때 BatchStatus와 ExitStatus 차이점을 이해하는 것이 중요하다.
BatchStatus는 JobExecution과 StepExecution의 속성인 Enum이며 Job 또는 Step 상태를 기록하는 데 사용된다.
COMPLETED, STARTING, STARTED, STOPPING, STOPPED, FAILED, ABANDONED, UNKNOWNon을 사용한 Java Configuration
...
.from(stepA()).on("FAILED").to(stepB())
...
BatchStatus가 아닌 ExitStatus를 참조한다.Step A가 실패하는 경우 Step B로 이동이라는 의미를 나타낸다.아래 step1엔 세 가지 가능성이 존재한다.
@Bean
public Job job(JobRepository jobRepository) {
return new JobBuilder("job", jobRepository)
.start(step1()).on("FAILED").end()
.from(step1()).on("COMPLETED WITH SKIPS").to(errorPrint1())
.from(step1()).on("*").to(step2())
.end()
.build();
}
Step이 실패하여 Job이 실패하는 경우Step이 성공하는 경우Step이 성공했지만 exit code가 COMPLETED WITH SKIPS이기 때문에 오류를 처리하기 위해 다른 Step을 실행하게 된다.적절한 exit code를 반환하도록 아래 코드를 작성할 수 있다.
public class SkipCheckingListener extends StepExecutionListenerSupport {
public ExitStatus afterStep(StepExecution stepExecution) {
String exitCode = stepExecution.getExitStatus().getExitCode();
if (!exitCode.equals(ExitStatus.FAILED.getExitCode()) &&
stepExecution.getSkipCount() > 0) {
return new ExitStatus("COMPLETED WITH SKIPS");
}
else {
return null;
}
}
}
Step이 성공했는지 확인한 후 다음 Step 실행의 스킵 횟수가 0보다 큰지 확인하는 StepExecutionListener 코드다.COMPLETED WITH SKIPS를 반환한다.BatchStatus와 ExitStatus는 실행되는 코드인 Step에 의해 결정되는 반면 Job의 상태는 설정에 따라 결정된다.Step이 하나 이상 포함되어 있다.아래 예제는 Job이 Step 실행 후 끝난다.
@Bean
public Job job(JobRepository jobRepository) {
return new JobBuilder("job", jobRepository)
.start(step1())
.build();
}
Step에 대해 전환이 정의되지 않은 경우 Job 상태는 다음과 같이 정의된다.Step이 ExistStatus FAILED로 끝나는 경우 BatchStatus도 FAILED가 된다.COMPLETED가 된다.next()를 포함하여 Job을 멈추는 세 가지 전환 요소를 제공한다.
BatchStatus와 함께 Job을 죵료시킨다.Job의 최종 상태에만 영향을 주고 어떤 Step의 BatchStatus 또는 ExitStatus에 영향을 미치지 않는다.Job의 모든 Step이 FAILED라도 Job 상태는 COMPLETED일 수 있다.BatchStatus가 COMPLETED가 되도록 Job을 중지한다.
JobInstanceAlreadyCompleteException을 던짐)end() 메서드를 사용해서 step을 끝낼 수 있고 선택적 exitStatus 매개 변수를 허용한다.
exiitStatus가 제공되지 않으면 기본적으로 ExitStatus가 BatchStatus와 일치되어 COMPLETED로 설정된다.@Bean
public Job job(JobRepository jobRepository) {
return new JobBuilder("job", jobRepository)
.start(step1())
.next(step2())
.on("FAILED").end()
.from(step2()).on("*").to(step3())
.end()
.build();
}
Job의 BatchStatus가 FAILED가 된다.
Job의 재시작을 막지 않는다.@Bean
public Job job(JobRepository jobRepository) {
return new JobBuilder("job", jobRepository)
.start(step1())
.next(step2()).on("FAILED").fail()
.from(step2()).on("*").to(step3())
.end()
.build();
}
BatchStatus가 STOPPED로 중지되도록 할 수 있다.
Job을 중지하면 처리가 일시적으로 중단되어 작업자가 작업을 다시 시작하기 전에 몇 가지 조치를 취할 수 있다.stopAndRestart 메서드로 작업이 다시 시작될 때 실행이 재개될 step을 지정하는 restart 속성을 필요로 한다.
@Bean
public Job job(JobRepository jobRepository) {
return new JobBuilder("job", jobRepository)
.start(step1()).on("COMPLETED").stopAndRestart(step2())
.end()
.build();
}
step1()이 COMPLETED로 끝나면 작업이 중지된다.step2()에서 실행이 시작된다.ExitStatus보다 더 많은 정보가 필요할 수 있다.JobExecutionDecider를 사용해서 결정을 지원할 수 있다.public class MyDecider implements JobExecutionDecider {
public FlowExecutionStatus decide(JobExecution jobExecution, StepExecution stepExecution) {
String status;
if (someCondition()) {
status = "FAILED";
}
else {
status = "COMPLETED";
}
return new FlowExecutionStatus(status);
}
}
@Bean
public Job job(JobRepository jobRepository) {
return new JobBuilder("job", jobRepository)
.start(step1())
.next(decider()).on("FAILED").to(step2())
.from(decider()).on("COMPLETED").to(step3())
.end()
.build();
}
Job이었지만 병렬 흐름으로도 구성할 수 있다.split()은 하나 이상의 flow 요소를 포함하며 전체 개별 흐름을 정의할 수 있다.split()은 next 속성이나 next, end, 그리고 fail 속성과 같이 전환 요소도 포함될 수 있다.flow1과 flow2가 병렬 실행되고 마지막으로 step4가 실행된다.@Bean
public Flow flow1() {
return new FlowBuilder<SimpleFlow>("flow1")
.start(step1())
.next(step2())
.build();
}
@Bean
public Flow flow2() {
return new FlowBuilder<SimpleFlow>("flow2")
.start(step3())
.build();
}
@Bean
public Job job(Flow flow1, Flow flow2) {
return this.jobBuilderFactory.get("job")
.start(flow1)
.split(new SimpleAsyncTaskExecutor())
.add(flow2)
.next(step4())
.end()
.build();
}
첫 번째 방법으로 flow를 다른 곳에 정의된 flow에 대한 참조로 선언하는 방법이 있다.
@Bean
public Job job(JobRepository jobRepository) {
return new JobBuilder("job", jobRepository)
.start(flow1())
.next(step3())
.end()
.build();
}
@Bean
public Flow flow1() {
return new FlowBuilder<SimpleFlow>("flow1")
.start(step1())
.next(step2())
.build();
}
JobStep을 사용하는 것이 있다.
FlowStep과 유사하지만 실제로는 지정된 flow 단계에 대해 별도의 job execution을 생성하고 실행한다.
@Bean
public Job jobStepJob(JobRepository jobRepository) {
return new JobBuilder("jobStepJob", jobRepository)
.start(jobStepJobStep1(null))
.build();
}
@Bean
public Step jobStepJobStep1(JobLauncher jobLauncher, JobRepository jobRepository) {
return new StepBuilder("jobStepJobStep1", jobRepository)
.job(job())
.launcher(jobLauncher)
.parametersExtractor(jobParametersExtractor())
.build();
}
@Bean
public Job job(JobRepository jobRepository) {
return new JobBuilder("job", jobRepository)
.start(step1())
.build();
}
@Bean
public DefaultJobParametersExtractor jobParametersExtractor() {
DefaultJobParametersExtractor extractor = new DefaultJobParametersExtractor();
extractor.setKeys(new String[]{"input.file"});
return extractor;
}
jobParametersExtractor는 실행되는 Job에 대한 ExecutionContext가 JobParameters로 변환되는 방법을 결정하는 전략이다.JobStep은 큰 시스템을 더 작은 모듈로 나누고 작업 흐름을 제어할 수 있는 좋은 방법이다.@Bean
public FlatFileItemReader flatFileItemReader() {
FlatFileItemReader<Foo> reader = new FlatFileItemReaderBuilder<Foo>()
.name("flatFileItemReader")
.resource(new FileSystemResource("file://outputs/file.txt"))
...
}
Resource는 지정된 파일 시스템 위치에서 파일을 로드한다.
//로 시작해야 한다.-D 매개 변수를 통새 시스템 property를 읽으면 해결할 수 있다.
@Bean
public FlatFileItemReader flatFileItemReader(@Value("${input.file.name}") String name) {
return new FlatFileItemReaderBuilder<Foo>()
.name("flatFileItemReader")
.resource(new FileSystemResource(name))
...
}
Dinput.file.name="file://outputs/file.txt”시스템 property를 통하지 않고 JobParameters를 바인딩할 수 있다.
@StepScope
@Bean
public FlatFileItemReader flatFileItemReader(@Value("#{jobParameters['input.file.name']}") String name) {
return new FlatFileItemReaderBuilder<Foo>()
.name("flatFileItemReader")
.resource(new FileSystemResource(name))
...
}
동일한 방식으로 JobExecution 및 StepExecution 수준의 ExecutionContext에 모두 접근할 수 있다.
@StepScope
@Bean
public FlatFileItemReader flatFileItemReader(@Value("#{jobExecutionContext['input.file.name']}") String name) {
return new FlatFileItemReaderBuilder<Foo>()
.name("flatFileItemReader")
.resource(new FileSystemResource(name))
...
}
@StepScope
@Bean
public FlatFileItemReader flatFileItemReader(@Value("#{stepExecutionContext['input.file.name']}") String name) {
return new FlatFileItemReaderBuilder<Foo>()
.name("flatFileItemReader")
.resource(new FileSystemResource(name))
...
}
@StepScope
@Bean
public FlatFileItemReader flatFileItemReader(@Value("#{jobParameters[input.file.name]}") String name) {
return new FlatFileItemReaderBuilder<Foo>()
.name("flatFileItemReader")
.resource(new FileSystemResource(name))
...
}
Step이 시작될 때까지 빈을 인스턴스화하지 않기 때문에 지연 바인딩을 사용하려면 @StepScope를 사용해야 한다.@StepScope로 빈 정의@EnableBatchProcessing을 사용하여 범위를 명시적으로 추가@JobScope는 @StepScope와 유사하지만 Job 컨텍스트에 대한 범위이므로 실행 중인 작업 당 해당 빈의 인스턴스가 하나만 존재한다.
#{..} 표현을 사용하여 JobContext에서 접근할 수 있는 참조의 지연 바인딩도 지원된다.@JobScope
@Bean
public FlatFileItemReader flatFileItemReader(@Value("#{jobParameters[input]}") String name) {
return new FlatFileItemReaderBuilder<Foo>()
.name("flatFileItemReader")
.resource(new FileSystemResource(name))
...
}
@JobScope
@Bean
public FlatFileItemReader flatFileItemReader(@Value("#{jobExecutionContext['input.name']}") String name) {
return new FlatFileItemReaderBuilder<Foo>()
.name("flatFileItemReader")
.resource(new FileSystemResource(name))
...
}
@JobScope에 대한 빈 정의 명시적 선언@EnableBatchProcessing을 사용하여 범위를 명시적으로 추가ItemStream 빈을 job 또는 step scope에 정의하는 경우 반환 유형은 최소한 ItemStream이어야 한다.open, update, close 메서드르 호출하여 계약을 준수하기 위해서다.@Bean
@StepScope
public FlatFileItemReader flatFileItemReader(@Value("#{jobParameters['input.file.name']}") String name) {
return new FlatFileItemReaderBuilder<Foo>()
.resource(new FileSystemResource(name))
// set other properties of the item reader
.build();
}