TIL

스프링 배치의 두 가지 스텝 유형

태스크릿(Tasklet) 지향 처리란

@FunctionalInterface
public interface Tasklet {
    @Nullable
    RepeatStatus execute(StepContribution contribution, ChunkContext chunkContext) throws Exception;
}

Tasklet 구현 예시

private const val PROCESSED_TO_KILL: Int = 10
private var killedProcess: Int = 0

class ZombieProcessCleanupTasklet : Tasklet {
    val log: KLogger = KotlinLogging.logger {}

    override fun execute(
        contribution: StepContribution,
        chunkContext: ChunkContext,
    ): RepeatStatus {
        killedProcess += 1

        log.info { "☠️  프로세스 강제 종료... ({$killedProcess}/{$PROCESSED_TO_KILL})" }

        if (killedProcess >= PROCESSED_TO_KILL) {
            log.info { "💀 시스템 안정화 완료. 모든 좀비 프로세스 제거." }
            return RepeatStatus.FINISHED; // 모든 프로세스 종료 후 작업 완료
        }

        return RepeatStatus.CONTINUABLE; // 아직 더 종료할 프로세스가 남아있음
    }
}

RepeatStatus: FINISHED vs CONTINUALBE

RepeatStatus가 필요한 이유

Tasklet을 Step으로 등록하는 방법

@Configuration
class ZombieBatchConfig(
    private val jobRepository: JobRepository,
    private val transactionManager: PlatformTransactionManager
) {
    @Bean
    fun zombieProcessCleanupTasklet(): Tasklet {
        return ZombieProcessCleanupTasklet()
    }

    @Bean
    fun zombieCleanupStep(): Step {
        return StepBuilder("zombieCleanupStep", jobRepository) // Tasklet과 transactionManager 설정
            .tasklet(zombieProcessCleanupTasklet(), transactionManager)
            .build()
    }

    @Bean
    fun zombieCleanupJob(): Job {
        return JobBuilder("zombieCleanupJob", jobRepository)
            .start(zombieCleanupStep()) // Step 등록
            .build()
    }
}
    @Bean
    fun zombieCleanupStep(): Step =
        StepBuilder("zombieCleanupStep", jobRepository)
            .tasklet(zombieProcessCleanupTasklet(), ResourcelessTransactionManager())
            .build()

Tasklet 람다

    @Bean
    fun deleteOldRecordsStep(): Step =
        StepBuilder("deleteOldRecordsStep", jobRepository)
            .tasklet(
                Tasklet { contribution: StepContribution, chunkContext: ChunkContext ->
                    // ...
                    RepeatStatus.FINISHED
                },
                transactionManager,
            ).build()

청크 지향 처리

Chunk

청크 기반 처리를 하는 이유

ItemReader

public interface ItemReader<T> {
    T read() throws Exception, 
        UnexpectedInputException, 
        ParseException, 
        NonTransientResourceException;
}

ItemProcessor

public interface ItemProcessor<I, O> {
    O process(I item) throws Exception;
}

ItemWriter

public interface ItemWriter<T> {
    void write(Chunk<? extends T> chunk) throws Exception;
}

이러한 구조를 통해 각 컴포넌트는 자신의 역할만 수행하며 완벽한 책임 분리, 재사용성 극대화, 높은 유연성을 제공하여 대용량 처리의 표준이 되었다.

청크 지향 처리 조립하기

@Bean
fun processStep(
    jobRepository: JobRepository,
    transactionManager: PlatformTransactionManager
): Step {
    return StepBuilder("processStep", jobRepository)
        .chunk<CustomerDetail, CustomerSummary>(10, transactionManager) // 청크 지향 처리 활성화
        .reader(itemReader())   // 데이터 읽기 담당
        .processor(itemProcessor())  // 데이터 처리 담당
        .writer(itemWriter())   // 데이터 쓰기 담당
        .build()
}

@Bean
fun customerProcessingJob(
    jobRepository: JobRepository,
    processStep: Step
): Job {
    return JobBuilder("customerProcessingJob", jobRepository)
        .start(processStep)    // processStep으로 Job 시작
        .build()
}

청크 지향 처리 흐름

  1. 데이터 읽기 (ItemReader)
    1. read() 메서드로 순차적으로 데이터를 반환한다.
    2. 청크 크기만큼 read()가 호출되어 하나의 청크가 생성된다.
  2. 데이터 깎기 (ItemProcessor)
    1. process() 메서드는 청크 전체를 받지 않고 하나씩 처리한다.
    2. 즉 청크 크기만큼 process()가 호출된다.
  3. 데이터 쓰기 (ItemWriter)
    1. 청크 단위로 데이터를 저장한다.
    2. 읽기/가공 단계와 달리 청크 전체를 한 번에 처리한다.

청크 처리와 트랜잭션

적절한 청크 사이즈란?