Step
의 실패가 반드시 Job
의 실패를 의미하는 것은 아니다.Step
그룹이 어떻게 구성되는지에 따라 특정 Step
은 전혀 처리되지 않을 수도 있다.next()
로 Step
을 설정하면 된다.
@Bean
public Job job(JobRepository jobRepository) {
return new JobBuilder("job", jobRepository)
.start(stepA())
.next(stepB())
.next(stepC())
.build();
}
Step
A, B, C 순으로 실행된다.Step
이 실패하면 전체 Job
은 실패하고 다음 Step
은 실행되지 않는다.Step
성공 후 다음 Step
으로 이어지거나 실패해서 다음 Step
이 실행되지 않는 두 시나리오만 있었다.Step
실패 후 다른 Step
을 트리거해야 하는 시나리오가 필요할 수도 있다.Java API를 사용해서 실패 시 수행할 Step
을 지정할 수 있다.
@Bean
public Job job(JobRepository jobRepository) {
return new JobBuilder("job", jobRepository)
.start(stepA())
.on("*").to(stepB())
.from(stepA()).on("FAILED").to(stepC())
.end()
.build();
}
on()
메서드는 간단한 패턴 매칭 체계를 사용하여 Step
실행 결과인 ExitStatus
를 검사한다.
*
: 0개 이상의 문자와 일치?
: 정확히 한 문자와 일치c*t
는 cat
, count
와 매칭되지만 c?t
는 cat
이랑만 매칭된다.Step
실행 결과인 ExitStatus
가 요소에 매칭되지 않으면 예외가 발생하고 Job
이 실패한다.Step
A가 실패해도 Step
C가 실행된다.Conditional Flow를 구성할 때 BatchStatus
와 ExitStatus
차이점을 이해하는 것이 중요하다.
BatchStatus
는 JobExecution
과 StepExecution
의 속성인 Enum
이며 Job
또는 Step
상태를 기록하는 데 사용된다.
COMPLETED
, STARTING
, STARTED
, STOPPING
, STOPPED
, FAILED
, ABANDONED
, UNKNOWN
on
을 사용한 Java Configuration
...
.from(stepA()).on("FAILED").to(stepB())
...
BatchStatus
가 아닌 ExitStatus
를 참조한다.Step
A가 실패하는 경우 Step
B로 이동이라는 의미를 나타낸다.아래 step1
엔 세 가지 가능성이 존재한다.
@Bean
public Job job(JobRepository jobRepository) {
return new JobBuilder("job", jobRepository)
.start(step1()).on("FAILED").end()
.from(step1()).on("COMPLETED WITH SKIPS").to(errorPrint1())
.from(step1()).on("*").to(step2())
.end()
.build();
}
Step
이 실패하여 Job
이 실패하는 경우Step
이 성공하는 경우Step
이 성공했지만 exit code가 COMPLETED WITH SKIPS
이기 때문에 오류를 처리하기 위해 다른 Step
을 실행하게 된다.적절한 exit code를 반환하도록 아래 코드를 작성할 수 있다.
public class SkipCheckingListener extends StepExecutionListenerSupport {
public ExitStatus afterStep(StepExecution stepExecution) {
String exitCode = stepExecution.getExitStatus().getExitCode();
if (!exitCode.equals(ExitStatus.FAILED.getExitCode()) &&
stepExecution.getSkipCount() > 0) {
return new ExitStatus("COMPLETED WITH SKIPS");
}
else {
return null;
}
}
}
Step
이 성공했는지 확인한 후 다음 Step
실행의 스킵 횟수가 0보다 큰지 확인하는 StepExecutionListener
코드다.COMPLETED WITH SKIPS
를 반환한다.BatchStatus
와 ExitStatus
는 실행되는 코드인 Step
에 의해 결정되는 반면 Job
의 상태는 설정에 따라 결정된다.Step
이 하나 이상 포함되어 있다.아래 예제는 Job
이 Step
실행 후 끝난다.
@Bean
public Job job(JobRepository jobRepository) {
return new JobBuilder("job", jobRepository)
.start(step1())
.build();
}
Step
에 대해 전환이 정의되지 않은 경우 Job
상태는 다음과 같이 정의된다.Step
이 ExistStatus
FAILED
로 끝나는 경우 BatchStatus
도 FAILED
가 된다.COMPLETED
가 된다.next()
를 포함하여 Job
을 멈추는 세 가지 전환 요소를 제공한다.
BatchStatus
와 함께 Job
을 죵료시킨다.Job
의 최종 상태에만 영향을 주고 어떤 Step
의 BatchStatus
또는 ExitStatus
에 영향을 미치지 않는다.Job
의 모든 Step
이 FAILED
라도 Job
상태는 COMPLETED
일 수 있다.BatchStatus
가 COMPLETED
가 되도록 Job
을 중지한다.
JobInstanceAlreadyCompleteException
을 던짐)end()
메서드를 사용해서 step을 끝낼 수 있고 선택적 exitStatus
매개 변수를 허용한다.
exiitStatus
가 제공되지 않으면 기본적으로 ExitStatus
가 BatchStatus
와 일치되어 COMPLETED
로 설정된다.@Bean
public Job job(JobRepository jobRepository) {
return new JobBuilder("job", jobRepository)
.start(step1())
.next(step2())
.on("FAILED").end()
.from(step2()).on("*").to(step3())
.end()
.build();
}
Job
의 BatchStatus
가 FAILED
가 된다.
Job의
재시작을 막지 않는다.@Bean
public Job job(JobRepository jobRepository) {
return new JobBuilder("job", jobRepository)
.start(step1())
.next(step2()).on("FAILED").fail()
.from(step2()).on("*").to(step3())
.end()
.build();
}
BatchStatus
가 STOPPED
로 중지되도록 할 수 있다.
Job
을 중지하면 처리가 일시적으로 중단되어 작업자가 작업을 다시 시작하기 전에 몇 가지 조치를 취할 수 있다.stopAndRestart
메서드로 작업이 다시 시작될 때 실행이 재개될 step을 지정하는 restart
속성을 필요로 한다.
@Bean
public Job job(JobRepository jobRepository) {
return new JobBuilder("job", jobRepository)
.start(step1()).on("COMPLETED").stopAndRestart(step2())
.end()
.build();
}
step1()
이 COMPLETED
로 끝나면 작업이 중지된다.step2()
에서 실행이 시작된다.ExitStatus
보다 더 많은 정보가 필요할 수 있다.JobExecutionDecider
를 사용해서 결정을 지원할 수 있다.public class MyDecider implements JobExecutionDecider {
public FlowExecutionStatus decide(JobExecution jobExecution, StepExecution stepExecution) {
String status;
if (someCondition()) {
status = "FAILED";
}
else {
status = "COMPLETED";
}
return new FlowExecutionStatus(status);
}
}
@Bean
public Job job(JobRepository jobRepository) {
return new JobBuilder("job", jobRepository)
.start(step1())
.next(decider()).on("FAILED").to(step2())
.from(decider()).on("COMPLETED").to(step3())
.end()
.build();
}
Job
이었지만 병렬 흐름으로도 구성할 수 있다.split()
은 하나 이상의 flow 요소를 포함하며 전체 개별 흐름을 정의할 수 있다.split()
은 next
속성이나 next
, end
, 그리고 fail
속성과 같이 전환 요소도 포함될 수 있다.flow1
과 flow2
가 병렬 실행되고 마지막으로 step4
가 실행된다.@Bean
public Flow flow1() {
return new FlowBuilder<SimpleFlow>("flow1")
.start(step1())
.next(step2())
.build();
}
@Bean
public Flow flow2() {
return new FlowBuilder<SimpleFlow>("flow2")
.start(step3())
.build();
}
@Bean
public Job job(Flow flow1, Flow flow2) {
return this.jobBuilderFactory.get("job")
.start(flow1)
.split(new SimpleAsyncTaskExecutor())
.add(flow2)
.next(step4())
.end()
.build();
}
첫 번째 방법으로 flow를 다른 곳에 정의된 flow에 대한 참조로 선언하는 방법이 있다.
@Bean
public Job job(JobRepository jobRepository) {
return new JobBuilder("job", jobRepository)
.start(flow1())
.next(step3())
.end()
.build();
}
@Bean
public Flow flow1() {
return new FlowBuilder<SimpleFlow>("flow1")
.start(step1())
.next(step2())
.build();
}
JobStep
을 사용하는 것이 있다.
FlowStep
과 유사하지만 실제로는 지정된 flow 단계에 대해 별도의 job execution을 생성하고 실행한다.
@Bean
public Job jobStepJob(JobRepository jobRepository) {
return new JobBuilder("jobStepJob", jobRepository)
.start(jobStepJobStep1(null))
.build();
}
@Bean
public Step jobStepJobStep1(JobLauncher jobLauncher, JobRepository jobRepository) {
return new StepBuilder("jobStepJobStep1", jobRepository)
.job(job())
.launcher(jobLauncher)
.parametersExtractor(jobParametersExtractor())
.build();
}
@Bean
public Job job(JobRepository jobRepository) {
return new JobBuilder("job", jobRepository)
.start(step1())
.build();
}
@Bean
public DefaultJobParametersExtractor jobParametersExtractor() {
DefaultJobParametersExtractor extractor = new DefaultJobParametersExtractor();
extractor.setKeys(new String[]{"input.file"});
return extractor;
}
jobParametersExtractor
는 실행되는 Job
에 대한 ExecutionContext
가 JobParameters
로 변환되는 방법을 결정하는 전략이다.JobStep
은 큰 시스템을 더 작은 모듈로 나누고 작업 흐름을 제어할 수 있는 좋은 방법이다.@Bean
public FlatFileItemReader flatFileItemReader() {
FlatFileItemReader<Foo> reader = new FlatFileItemReaderBuilder<Foo>()
.name("flatFileItemReader")
.resource(new FileSystemResource("file://outputs/file.txt"))
...
}
Resource
는 지정된 파일 시스템 위치에서 파일을 로드한다.
//
로 시작해야 한다.-D
매개 변수를 통새 시스템 property를 읽으면 해결할 수 있다.
@Bean
public FlatFileItemReader flatFileItemReader(@Value("${input.file.name}") String name) {
return new FlatFileItemReaderBuilder<Foo>()
.name("flatFileItemReader")
.resource(new FileSystemResource(name))
...
}
Dinput.file.name="file://outputs/file.txt”
시스템 property를 통하지 않고 JobParameters
를 바인딩할 수 있다.
@StepScope
@Bean
public FlatFileItemReader flatFileItemReader(@Value("#{jobParameters['input.file.name']}") String name) {
return new FlatFileItemReaderBuilder<Foo>()
.name("flatFileItemReader")
.resource(new FileSystemResource(name))
...
}
동일한 방식으로 JobExecution
및 StepExecution
수준의 ExecutionContext
에 모두 접근할 수 있다.
@StepScope
@Bean
public FlatFileItemReader flatFileItemReader(@Value("#{jobExecutionContext['input.file.name']}") String name) {
return new FlatFileItemReaderBuilder<Foo>()
.name("flatFileItemReader")
.resource(new FileSystemResource(name))
...
}
@StepScope
@Bean
public FlatFileItemReader flatFileItemReader(@Value("#{stepExecutionContext['input.file.name']}") String name) {
return new FlatFileItemReaderBuilder<Foo>()
.name("flatFileItemReader")
.resource(new FileSystemResource(name))
...
}
@StepScope
@Bean
public FlatFileItemReader flatFileItemReader(@Value("#{jobParameters[input.file.name]}") String name) {
return new FlatFileItemReaderBuilder<Foo>()
.name("flatFileItemReader")
.resource(new FileSystemResource(name))
...
}
Step
이 시작될 때까지 빈을 인스턴스화하지 않기 때문에 지연 바인딩을 사용하려면 @StepScope
를 사용해야 한다.@StepScope
로 빈 정의@EnableBatchProcessing
을 사용하여 범위를 명시적으로 추가@JobScope
는 @StepScope
와 유사하지만 Job
컨텍스트에 대한 범위이므로 실행 중인 작업 당 해당 빈의 인스턴스가 하나만 존재한다.
#{..}
표현을 사용하여 JobContext
에서 접근할 수 있는 참조의 지연 바인딩도 지원된다.@JobScope
@Bean
public FlatFileItemReader flatFileItemReader(@Value("#{jobParameters[input]}") String name) {
return new FlatFileItemReaderBuilder<Foo>()
.name("flatFileItemReader")
.resource(new FileSystemResource(name))
...
}
@JobScope
@Bean
public FlatFileItemReader flatFileItemReader(@Value("#{jobExecutionContext['input.name']}") String name) {
return new FlatFileItemReaderBuilder<Foo>()
.name("flatFileItemReader")
.resource(new FileSystemResource(name))
...
}
@JobScope
에 대한 빈 정의 명시적 선언@EnableBatchProcessing
을 사용하여 범위를 명시적으로 추가ItemStream
빈을 job 또는 step scope에 정의하는 경우 반환 유형은 최소한 ItemStream
이어야 한다.open
, update
, close
메서드르 호출하여 계약을 준수하기 위해서다.@Bean
@StepScope
public FlatFileItemReader flatFileItemReader(@Value("#{jobParameters['input.file.name']}") String name) {
return new FlatFileItemReaderBuilder<Foo>()
.resource(new FileSystemResource(name))
// set other properties of the item reader
.build();
}