TIL

Spring Batch Listener와 활용

JobExecutionListener

public interface JobExecutionListener {
    default void beforeJob(JobExecution jobExecution) { }
    default void afterJob(JobExecution jobExecution) { }
}

StepExecutionListener

public interface StepExecutionListener extends StepListener {
    default void beforeStep(StepExecution stepExecution) { }

    @Nullable
    default ExitStatus afterStep(StepExecution stepExecution) {
			return null;
    }
}

ChunkListener

public interface ChunkListener extends StepListener {
    default void beforeChunk(ChunkContext context) { }

    default void afterChunk(ChunkContext context) { }
	
    default void afterChunkError(ChunkContext context) { }
}

Item[Read|Process|Write]Listener

public interface ItemReadListener<T> extends StepListener {
    default void beforeRead() { }
    default void afterRead(T item) { }
    default void onReadError(Exception ex) { }
}

public interface ItemProcessListener<T, S> extends StepListener {
    default void beforeProcess(T item) { }
    default void afterProcess(T item, @Nullable S result) { }
    default void onProcessError(T item, Exception e) { }
}

public interface ItemWriteListener<S> extends StepListener {
    default void beforeWrite(Chunk<? extends S> items) { }
    default void afterWrite(Chunk<? extends S> items) { }
    default void onWriteError(Exception exception, Chunk<? extends S> items) { }
}

배치 리스너, 이런 것들을 할 수 있다.

배치 리스너 구현 방법

인터페이스 구현

@Component
class BigBrotherJobExecutionListener : JobExecutionListener {
    override fun beforeJob(jobExecution: JobExecution) {
        // ...
    }

    override fun afterJob(jobExecution: JobExecution) {
        // ...
    }
}

@Component
class BigBrotherStepExecutionListener : StepExecutionListener {
    override fun beforeStep(stepExecution: StepExecution) {
        // ...
    }

    override fun afterStep(stepExecution: StepExecution): ExitStatus {
        // ...
        return ExitStatus.COMPLETED
    }
}
    @Bean
    fun systemTerminationSimulationJob(): Job =
        JobBuilder("systemTerminationSimulationJob", jobRepository)
            .listener(BigBrotherJobExecutionListener())
            .start(enterWorldStep())
            .build()

어노테이션 기반 구현

@Component
class ServerRoomInfiltrationListener {
    @BeforeJob
    fun infiltrateServerRoom(jobExecution: JobExecution?) {
        // ...
    }

    @AfterJob
    fun escapeServerRoom(jobExecution: JobExecution) {
        // ...
    }
}

@Component
class ServerRackControlListener {
    @BeforeStep
    fun accessServerRack(stepExecution: StepExecution?) {
        // ...
    }

    @AfterStep
    fun leaveServerRack(stepExecution: StepExecution?): ExitStatus {
        // ...
        return ExitStatus("POWER_DOWN")
    }
}

JobExecutionListener와 ExecutionContext를 활용한동적 데이터 전달

@Component
class InfiltrationPlanListener : JobExecutionListener {
    override fun beforeJob(jobExecution: JobExecution) {
        val executionContext: ExecutionContext = jobExecution.executionContext

        val infiltrationPlan: Map<String, String> = generateInfiltrationPlan()
        executionContext.put("infiltrationPlan", infiltrationPlan) // 동적 파라미터 전달
    }

    private fun generateInfiltrationPlan(): Map<String, String> =
        mapOf(
            "targetSystem" to listOf("판교 서버실", "안산 데이터센터").random(),
            "objective" to listOf("kill -9 실행", "rm -rf 전개", "chmod 000 적용", "/dev/null로 리다이렉션").random(),
            "targetData" to listOf("코어 덤프 파일", "시스템 로그", "설정 파일", "백업 데이터").random(),
            "requiredTools" to listOf("USB 킬러", "널 바이트 인젝터", "커널 패닉 유발기", "메모리 시퍼너").random(),
        )
}
    @Bean
    fun systemInfiltrationJob(
        jobRepository: JobRepository,
        reconStep: Step,
        attackStep: Step,
    ): Job =
        JobBuilder("systemInfiltrationJob", jobRepository)
            .listener(infiltrationPlanListener) // listener 등록
            .start(reconStep)
            .next(attackStep)
            .build()
    @Bean
    @StepScope
    fun attackStepTasklet(
        @Value("#{jobExecutionContext['infiltrationPlan']}") infiltrationPlan: Map<String, Any>,
    ): Tasklet =
        Tasklet { contribution: StepContribution, _: ChunkContext ->
            val infiltrationSuccess = Random.nextBoolean()
            val infiltrationResult = if (infiltrationSuccess) "TERMINATED" else "DETECTED"
            
            val executionContext: ExecutionContext = contribution.stepExecution.jobExecution.executionContext
            executionContext.put("infiltrationResult", infiltrationResult)

            RepeatStatus.FINISHED
        }

JobParameters가 아닌 ExecutionContext를 사용하는 이유

override fun beforeJob(jobExecution: JobExecution) {
	jobExecution.executionContext.put("targetDate", LocalDate.now())
}

ExecutionContextPromotionListener를 활용한 Step 간 데이터 공유

// Step마다 별도로 존재
val stepExecution: StepExecution = contribution.stepExecution
val stepExecutionContext: ExecutionContext = stepExecution.executionContext

// Step 간 공유를 하려면 Job 수준 ExecutionContext로 옮겨줘야 하는데 불편하다
val jobExecution: JobExecution = stepExecution.jobExecution
jobExecution.executionContext.put("...", "...")
    @Bean
    fun promotionListener(): ExecutionContextPromotionListener =
        ExecutionContextPromotionListener()
            .apply { setKeys(arrayOf("targetSystem")) } // 특정 키를 승격 대상으로 지정
    @Bean
    fun scanningStep(): Step =
        StepBuilder("scanningStep", jobRepository)
            .tasklet({ contribution, chunkContext ->
                val target = "판교 서버실"

                val executionContext = contribution.stepExecution.executionContex
                executionContext.put("targetSystem", target)

                RepeatStatus.FINISHED
            }, transactionManager)
            .listener(promotionListener()) // promotionListener 등록
            .build()

    @Bean
    fun eliminationStep(eliminationTasklet: Tasklet): Step =
        StepBuilder("eliminationSte", jobRepository)
            .tasklet(eliminationTasklet, transactionManager)
            .build()

    @Bean
    @StepScope
    fun eliminationTasklet( // Job의 ExecutionContext에서 값 조회
        @Value("#{jobExecutionContext['targetSystem']}") targetStatus: String,
    ): Tasklet =
        Tasklet { _: StepContribution, _: ChunkContext ->
            logger.info { "시스템 제거 작업 실행: $targetStatus" }
            RepeatStatus.FINISHED
        }

Listener와 @JobScope, @StepScope 통합

class MyJobListener : JobExecutionListener {
    override fun beforeJob(jobExecution: JobExecution) {
        val params: JobParameters = jobExecution.jobParameters
        val myParam: String? = params.getString("myParam")
        println("잡 파라미터: $myParam")
    }
}
    @Bean
    @JobScope
    fun systemTerminationListener(
        @Value("#{jobParameters['terminationType']}") terminationType: String,
    ): JobExecutionListener =
        object : JobExecutionListener {
            override fun beforeJob(jobExecution: JobExecution) {
                logger.info { "terminationType: $terminationType" }
            }

            override fun afterJob(jobExecution: JobExecution) {
                logger.info { "jobExecution.status: ${jobExecution.status}" }
            }
        }

Listener 성능과 모범 사례

리스너 활용

성능 최적화