Job
실행의 시작과 종료 시점에 호출되는 리스너 인터페이스afterJob
Job
실행 정보가 메타데이터에 저장되기 전에 호출된다.Job
실행 결과 상태를 완료(COMPLETED
)에서 실패(FAILED
)로 변경하거나 반대도 가능public interface JobExecutionListener {
default void beforeJob(JobExecution jobExecution) { }
default void afterJob(JobExecution jobExecution) { }
}
Step
실행의 시작과 종료 시점에 호출되는 리스너 인터페이스public interface StepExecutionListener extends StepListener {
default void beforeStep(StepExecution stepExecution) { }
@Nullable
default ExitStatus afterStep(StepExecution stepExecution) {
return null;
}
}
afterStep
에서의 ExitStatus
반환을 통해 Step
실행 결과 상태를 직접 변경 가능하다.public interface ChunkListener extends StepListener {
default void beforeChunk(ChunkContext context) { }
default void afterChunk(ChunkContext context) { }
default void afterChunkError(ChunkContext context) { }
}
afterChunk
는 트랜잭션 커밋 후 호출된다.afterChunkErrror
는 트랜잭션 롤백 이후 호출된다.ItemReadListener
ItemProcessListener
ItemWriteListener
public interface ItemReadListener<T> extends StepListener {
default void beforeRead() { }
default void afterRead(T item) { }
default void onReadError(Exception ex) { }
}
public interface ItemProcessListener<T, S> extends StepListener {
default void beforeProcess(T item) { }
default void afterProcess(T item, @Nullable S result) { }
default void onProcessError(T item, Exception e) { }
}
public interface ItemWriteListener<S> extends StepListener {
default void beforeWrite(Chunk<? extends S> items) { }
default void afterWrite(Chunk<? extends S> items) { }
default void onWriteError(Exception exception, Chunk<? extends S> items) { }
}
ItemReadListener.afterRead()
ItemReader.read
호출 후에 호출되지만 read
가 null
을 반환하면 호출되지 않는다.ItemProcessListener.afterProcess()
process
메서드가 null
을 반환하더라도 호출된다.ItemWriteListener.afterWrite()
ChunkListener.afterChunk()
가 청크 커밋 후 호출되었다면 아이템 리스너는 커밋 전에 호출된다.Job
, Step
전후로 로깅Job
종료 상태를 확인하고 종료 상태에 따른 처리 가능@Component
class BigBrotherJobExecutionListener : JobExecutionListener {
override fun beforeJob(jobExecution: JobExecution) {
// ...
}
override fun afterJob(jobExecution: JobExecution) {
// ...
}
}
@Component
class BigBrotherStepExecutionListener : StepExecutionListener {
override fun beforeStep(stepExecution: StepExecution) {
// ...
}
override fun afterStep(stepExecution: StepExecution): ExitStatus {
// ...
return ExitStatus.COMPLETED
}
}
default
메서드이기에 필요한 메서드만 오버라이드하면 된다.JobBuilder
, StepBuilder
체이닝에 리스너를 등록할 수 있다. @Bean
fun systemTerminationSimulationJob(): Job =
JobBuilder("systemTerminationSimulationJob", jobRepository)
.listener(BigBrotherJobExecutionListener())
.start(enterWorldStep())
.build()
@AfterChunk
, @AfterChunkError
, @AfterJob
, @AfterProcess
, @AfterRead
, @AfterStep
, @AfterWrite
, @BeforeChunk
, @BeforeJob
, @BeforeProcess
, @BeforeRead
, @BeforeStep
, @BeforeWrite
, @OnProcessError
, @OnReadError
, @OnSkipInProcess
, @OnSkipInRead
, @OnSkipInWrite
@Component
class ServerRoomInfiltrationListener {
@BeforeJob
fun infiltrateServerRoom(jobExecution: JobExecution?) {
// ...
}
@AfterJob
fun escapeServerRoom(jobExecution: JobExecution) {
// ...
}
}
@Component
class ServerRackControlListener {
@BeforeStep
fun accessServerRack(stepExecution: StepExecution?) {
// ...
}
@AfterStep
fun leaveServerRack(stepExecution: StepExecution?): ExitStatus {
// ...
return ExitStatus("POWER_DOWN")
}
}
@AfterStep
어노테이션을 사용하는 메서드는 반드시 ExitStatus
를 반환해줘야 한다.XxxBuilder.listener()
메서드는 Object
타입을 받는 오버로딩 메서드를 제공하기 때문beforeJob
메서드를 활용할 수 있다.@Component
class InfiltrationPlanListener : JobExecutionListener {
override fun beforeJob(jobExecution: JobExecution) {
val executionContext: ExecutionContext = jobExecution.executionContext
val infiltrationPlan: Map<String, String> = generateInfiltrationPlan()
executionContext.put("infiltrationPlan", infiltrationPlan) // 동적 파라미터 전달
}
private fun generateInfiltrationPlan(): Map<String, String> =
mapOf(
"targetSystem" to listOf("판교 서버실", "안산 데이터센터").random(),
"objective" to listOf("kill -9 실행", "rm -rf 전개", "chmod 000 적용", "/dev/null로 리다이렉션").random(),
"targetData" to listOf("코어 덤프 파일", "시스템 로그", "설정 파일", "백업 데이터").random(),
"requiredTools" to listOf("USB 킬러", "널 바이트 인젝터", "커널 패닉 유발기", "메모리 시퍼너").random(),
)
}
Job
수준 ExecutionContext
내부 데이터는 Job
내부 모든 Step
에서 접근 가능하다는 점을 이용하는 것이다. @Bean
fun systemInfiltrationJob(
jobRepository: JobRepository,
reconStep: Step,
attackStep: Step,
): Job =
JobBuilder("systemInfiltrationJob", jobRepository)
.listener(infiltrationPlanListener) // listener 등록
.start(reconStep)
.next(attackStep)
.build()
Job
내부 Step
에서 잡 파라미터로 전달 받는 것이 가능하다.
Job
수준 ExecutionContext
에 저장되어 있기 때문 @Bean
@StepScope
fun attackStepTasklet(
@Value("#{jobExecutionContext['infiltrationPlan']}") infiltrationPlan: Map<String, Any>,
): Tasklet =
Tasklet { contribution: StepContribution, _: ChunkContext ->
val infiltrationSuccess = Random.nextBoolean()
val infiltrationResult = if (infiltrationSuccess) "TERMINATED" else "DETECTED"
val executionContext: ExecutionContext = contribution.stepExecution.jobExecution.executionContext
executionContext.put("infiltrationResult", infiltrationResult)
RepeatStatus.FINISHED
}
JobParameters
는 불변하게 설계되었다.
JobParameters
로 실행한 Job
은 항상 동일한 결과를 생성해야 한다.JobInstance
, JobExecution
)과 JobParameters
는 메타 저장소에 저장되는데 JobaPrameters
가 변경된다면 기록과 실제 작업 간 불일치가 생긴다.따라서 동적 변경이 필요한 데이터는 ExecutionContext
를 통해 관리하는 것이 좋다.
JobParameters
를 사용할 수 있는데 ExecutionContext
를 사용하는 것은 지양해야 한다.
Job
을 재처리하고 싶다면 프로그램을 수정/배포해야 한다.targetDate
가 JobParameters
였다면 유연하게 배치 작업을 실행시킬 수 있다.override fun beforeJob(jobExecution: JobExecution) {
jobExecution.executionContext.put("targetDate", LocalDate.now())
}
Step
수준 ExecutionContext
에 저장된 데이터는 다른 Step
과 공유는 불가능하다.// Step마다 별도로 존재
val stepExecution: StepExecution = contribution.stepExecution
val stepExecutionContext: ExecutionContext = stepExecution.executionContext
// Step 간 공유를 하려면 Job 수준 ExecutionContext로 옮겨줘야 하는데 불편하다
val jobExecution: JobExecution = stepExecution.jobExecution
jobExecution.executionContext.put("...", "...")
ExecutionContextPromotionListener
Step
수준 ExecutionContext
데이터를 Job
수준 ExecutionContext
로 등록시켜주는 StepExecutionListener
의 구현체Step
수준 데이터를 Job
수준으로 승격(Promote) 시키는 것 @Bean
fun promotionListener(): ExecutionContextPromotionListener =
ExecutionContextPromotionListener()
.apply { setKeys(arrayOf("targetSystem")) } // 특정 키를 승격 대상으로 지정
Step
수준 ExecutionContext
에 등록해도 Job
수준에서 공유된다. @Bean
fun scanningStep(): Step =
StepBuilder("scanningStep", jobRepository)
.tasklet({ contribution, chunkContext ->
val target = "판교 서버실"
val executionContext = contribution.stepExecution.executionContex
executionContext.put("targetSystem", target)
RepeatStatus.FINISHED
}, transactionManager)
.listener(promotionListener()) // promotionListener 등록
.build()
@Bean
fun eliminationStep(eliminationTasklet: Tasklet): Step =
StepBuilder("eliminationSte", jobRepository)
.tasklet(eliminationTasklet, transactionManager)
.build()
@Bean
@StepScope
fun eliminationTasklet( // Job의 ExecutionContext에서 값 조회
@Value("#{jobExecutionContext['targetSystem']}") targetStatus: String,
): Tasklet =
Tasklet { _: StepContribution, _: ChunkContext ->
logger.info { "시스템 제거 작업 실행: $targetStatus" }
RepeatStatus.FINISHED
}
Step
은 독립적으로 설계하여 재사용성과 유지보수성을 높이는 것이 좋다.Step
간 데이터 공유는 최소화하는 것이 좋다.JobExecution
객체에서 getter 체이닝을 통해 꺼낼 수밖에 없다.class MyJobListener : JobExecutionListener {
override fun beforeJob(jobExecution: JobExecution) {
val params: JobParameters = jobExecution.jobParameters
val myParam: String? = params.getString("myParam")
println("잡 파라미터: $myParam")
}
}
JobScope
와 생명주기를 함께하는) DI 받는 형식으로 쉽게 꺼낼 수 있다.StepScope
에서도 마찬가지로 사용 가능하다. @Bean
@JobScope
fun systemTerminationListener(
@Value("#{jobParameters['terminationType']}") terminationType: String,
): JobExecutionListener =
object : JobExecutionListener {
override fun beforeJob(jobExecution: JobExecution) {
logger.info { "terminationType: $terminationType" }
}
override fun afterJob(jobExecution: JobExecution) {
logger.info { "jobExecution.status: ${jobExecution.status}" }
}
}
JobExecutionListener
: 전체 Job
의 시작과 종료를 통제StepExecutionListener
: 각 Step
단계의 실행을 감시ChunkListener
: 시스템을 청크 단위로 제거할 때, 반복의 시작과 종료 시점을 통제Item[Read|Process|Write]Listener
: 개별 아이템 식별 통제beforeJob
이나 beforeStep
에서 예외가 발생하면 Job
, Step
이 실패한 것으로 판단된다.JobExecutionListener
/StepExecutionListener
Job
, Step
실행당 한 번씩만 실행ItemReadListener
/ItemProcessListener