@FunctionalInterface
public interface Tasklet {
@Nullable
RepeatStatus execute(StepContribution contribution, ChunkContext chunkContext) throws Exception;
}
private const val PROCESSED_TO_KILL: Int = 10
private var killedProcess: Int = 0
class ZombieProcessCleanupTasklet : Tasklet {
val log: KLogger = KotlinLogging.logger {}
override fun execute(
contribution: StepContribution,
chunkContext: ChunkContext,
): RepeatStatus {
killedProcess += 1
log.info { "☠️ 프로세스 강제 종료... ({$killedProcess}/{$PROCESSED_TO_KILL})" }
if (killedProcess >= PROCESSED_TO_KILL) {
log.info { "💀 시스템 안정화 완료. 모든 좀비 프로세스 제거." }
return RepeatStatus.FINISHED; // 모든 프로세스 종료 후 작업 완료
}
return RepeatStatus.CONTINUABLE; // 아직 더 종료할 프로세스가 남아있음
}
}
Tasklet의 execute 메서드는 메서드 실행을 계속 할지 RepeatStatus를 통해 결정한다.ReapeatStatus.FINISHED
Step 처리가 완료되었음을 의미Step 처리가 성공이든 실패든 상관 없다.RepeatStatus.CONTINUABLE
execute 메서드가 추가로 실행되어야 함을 의미execute가 반복 호출된다.Tasklet의 execute 실행마다 새 트랜잭션을 실행한다.RepeatStauts가 반환되면 해당 트랜잭션을 커밋한다.while문을 사용하지 않고 Tasklet과 RepeatStatus.CONTINUABLE 덕에 대량의 데이터 처리 도중 중간 실패가 발생해도 지금까지의 데이터는 안전하다.@Configuration
class ZombieBatchConfig(
private val jobRepository: JobRepository,
private val transactionManager: PlatformTransactionManager
) {
@Bean
fun zombieProcessCleanupTasklet(): Tasklet {
return ZombieProcessCleanupTasklet()
}
@Bean
fun zombieCleanupStep(): Step {
return StepBuilder("zombieCleanupStep", jobRepository) // Tasklet과 transactionManager 설정
.tasklet(zombieProcessCleanupTasklet(), transactionManager)
.build()
}
@Bean
fun zombieCleanupJob(): Job {
return JobBuilder("zombieCleanupJob", jobRepository)
.start(zombieCleanupStep()) // Step 등록
.build()
}
}
tasklet 외에도 PlatformTransactionManager가 전달된다.
ResourcelessTransactionManager를 고려해볼 수 있다. @Bean
fun zombieCleanupStep(): Step =
StepBuilder("zombieCleanupStep", jobRepository)
.tasklet(zombieProcessCleanupTasklet(), ResourcelessTransactionManager())
.build()
Tasklet 등록 시 람다를 통해 간단히 할 수 있다. @Bean
fun deleteOldRecordsStep(): Step =
StepBuilder("deleteOldRecordsStep", jobRepository)
.tasklet(
Tasklet { contribution: StepContribution, chunkContext: ChunkContext ->
// ...
RepeatStatus.FINISHED
},
transactionManager,
).build()
ItemReader는 데이터를 읽어오는 역할을 한다.public interface ItemReader<T> {
T read() throws Exception,
UnexpectedInputException,
ParseException,
NonTransientResourceException;
}
read() 메서드는 아이템 하나를 반환한다. (DB의 한 row)null을 반환하며 스텝은 종료된다.ItemReader의 null 반환이 처크 지향 처리 Step의 종료 시점인 것FlatFileItemReader, JdbcCursorItemReaderItemProcessor는 데이터를 가공하고 다듬는 역할을 한다.public interface ItemProcessor<I, O> {
O process(I item) throws Exception;
}
ItemWriter에 전달되지 않는다.public interface ItemWriter<T> {
void write(Chunk<? extends T> chunk) throws Exception;
}
ItemWriter는 데이터를 Chunk 단위로 묶어서 한 번에 쓴다.Chunk 타입을 파라미터로 받음FlatFileItemWriter, JdbcBatchItemWriter이러한 구조를 통해 각 컴포넌트는 자신의 역할만 수행하며 완벽한 책임 분리, 재사용성 극대화, 높은 유연성을 제공하여 대용량 처리의 표준이 되었다.
chunk() 메서드를 호출하는 것으로 시작된다.@Bean
fun processStep(
jobRepository: JobRepository,
transactionManager: PlatformTransactionManager
): Step {
return StepBuilder("processStep", jobRepository)
.chunk<CustomerDetail, CustomerSummary>(10, transactionManager) // 청크 지향 처리 활성화
.reader(itemReader()) // 데이터 읽기 담당
.processor(itemProcessor()) // 데이터 처리 담당
.writer(itemWriter()) // 데이터 쓰기 담당
.build()
}
@Bean
fun customerProcessingJob(
jobRepository: JobRepository,
processStep: Step
): Job {
return JobBuilder("customerProcessingJob", jobRepository)
.start(processStep) // processStep으로 Job 시작
.build()
}
.chunk(10, transactionManager)ItemReader가 데이터를 n개만큼 읽어 ItemProcessor나 ItemWriter에 전달한다..<CustomerDetail, CustomerSummary>chunk(..)ItemReader가 반환할 타입ItemProcessor가 아이템을 처리 후 반환할 타입이자 ItemWriter가 전달 받을 타입ItemReader)
read() 메서드로 순차적으로 데이터를 반환한다.read()가 호출되어 하나의 청크가 생성된다.ItemProcessor)
process() 메서드는 청크 전체를 받지 않고 하나씩 처리한다.process()가 호출된다.ItemWriter)
ItemReader에서 null이 반환되면 청크 단위 반복을 종료한다.execute() 메서드가 트랜잭션 단위였다.