@FunctionalInterface
public interface Tasklet {
@Nullable
RepeatStatus execute(StepContribution contribution, ChunkContext chunkContext) throws Exception;
}
private const val PROCESSED_TO_KILL: Int = 10
private var killedProcess: Int = 0
class ZombieProcessCleanupTasklet : Tasklet {
val log: KLogger = KotlinLogging.logger {}
override fun execute(
contribution: StepContribution,
chunkContext: ChunkContext,
): RepeatStatus {
killedProcess += 1
log.info { "☠️ 프로세스 강제 종료... ({$killedProcess}/{$PROCESSED_TO_KILL})" }
if (killedProcess >= PROCESSED_TO_KILL) {
log.info { "💀 시스템 안정화 완료. 모든 좀비 프로세스 제거." }
return RepeatStatus.FINISHED; // 모든 프로세스 종료 후 작업 완료
}
return RepeatStatus.CONTINUABLE; // 아직 더 종료할 프로세스가 남아있음
}
}
Tasklet
의 execute
메서드는 메서드 실행을 계속 할지 RepeatStatus
를 통해 결정한다.ReapeatStatus.FINISHED
Step
처리가 완료되었음을 의미Step
처리가 성공이든 실패든 상관 없다.RepeatStatus.CONTINUABLE
execute
메서드가 추가로 실행되어야 함을 의미execute
가 반복 호출된다.Tasklet
의 execute
실행마다 새 트랜잭션을 실행한다.RepeatStauts
가 반환되면 해당 트랜잭션을 커밋한다.while
문을 사용하지 않고 Tasklet
과 RepeatStatus.CONTINUABLE
덕에 대량의 데이터 처리 도중 중간 실패가 발생해도 지금까지의 데이터는 안전하다.@Configuration
class ZombieBatchConfig(
private val jobRepository: JobRepository,
private val transactionManager: PlatformTransactionManager
) {
@Bean
fun zombieProcessCleanupTasklet(): Tasklet {
return ZombieProcessCleanupTasklet()
}
@Bean
fun zombieCleanupStep(): Step {
return StepBuilder("zombieCleanupStep", jobRepository) // Tasklet과 transactionManager 설정
.tasklet(zombieProcessCleanupTasklet(), transactionManager)
.build()
}
@Bean
fun zombieCleanupJob(): Job {
return JobBuilder("zombieCleanupJob", jobRepository)
.start(zombieCleanupStep()) // Step 등록
.build()
}
}
tasklet
외에도 PlatformTransactionManager
가 전달된다.
ResourcelessTransactionManager
를 고려해볼 수 있다. @Bean
fun zombieCleanupStep(): Step =
StepBuilder("zombieCleanupStep", jobRepository)
.tasklet(zombieProcessCleanupTasklet(), ResourcelessTransactionManager())
.build()
Tasklet
등록 시 람다를 통해 간단히 할 수 있다. @Bean
fun deleteOldRecordsStep(): Step =
StepBuilder("deleteOldRecordsStep", jobRepository)
.tasklet(
Tasklet { contribution: StepContribution, chunkContext: ChunkContext ->
// ...
RepeatStatus.FINISHED
},
transactionManager,
).build()
ItemReader
는 데이터를 읽어오는 역할을 한다.public interface ItemReader<T> {
T read() throws Exception,
UnexpectedInputException,
ParseException,
NonTransientResourceException;
}
read()
메서드는 아이템 하나를 반환한다. (DB의 한 row)null
을 반환하며 스텝은 종료된다.ItemReader
의 null
반환이 처크 지향 처리 Step
의 종료 시점인 것FlatFileItemReader
, JdbcCursorItemReader
ItemProcessor
는 데이터를 가공하고 다듬는 역할을 한다.public interface ItemProcessor<I, O> {
O process(I item) throws Exception;
}
ItemWriter
에 전달되지 않는다.public interface ItemWriter<T> {
void write(Chunk<? extends T> chunk) throws Exception;
}
ItemWriter
는 데이터를 Chunk
단위로 묶어서 한 번에 쓴다.Chunk
타입을 파라미터로 받음FlatFileItemWriter
, JdbcBatchItemWriter
이러한 구조를 통해 각 컴포넌트는 자신의 역할만 수행하며 완벽한 책임 분리, 재사용성 극대화, 높은 유연성을 제공하여 대용량 처리의 표준이 되었다.
chunk()
메서드를 호출하는 것으로 시작된다.@Bean
fun processStep(
jobRepository: JobRepository,
transactionManager: PlatformTransactionManager
): Step {
return StepBuilder("processStep", jobRepository)
.chunk<CustomerDetail, CustomerSummary>(10, transactionManager) // 청크 지향 처리 활성화
.reader(itemReader()) // 데이터 읽기 담당
.processor(itemProcessor()) // 데이터 처리 담당
.writer(itemWriter()) // 데이터 쓰기 담당
.build()
}
@Bean
fun customerProcessingJob(
jobRepository: JobRepository,
processStep: Step
): Job {
return JobBuilder("customerProcessingJob", jobRepository)
.start(processStep) // processStep으로 Job 시작
.build()
}
.chunk(10, transactionManager)
ItemReader
가 데이터를 n개만큼 읽어 ItemProcessor
나 ItemWriter
에 전달한다..<CustomerDetail, CustomerSummary>chunk(..)
ItemReader
가 반환할 타입ItemProcessor
가 아이템을 처리 후 반환할 타입이자 ItemWriter
가 전달 받을 타입ItemReader
)
read()
메서드로 순차적으로 데이터를 반환한다.read()
가 호출되어 하나의 청크가 생성된다.ItemProcessor
)
process()
메서드는 청크 전체를 받지 않고 하나씩 처리한다.process()
가 호출된다.ItemWriter
)
ItemReader
에서 null
이 반환되면 청크 단위 반복을 종료한다.execute()
메서드가 트랜잭션 단위였다.