TIL

ItemWriter와 ItemReader 통합

CompositeItemReader

List<ItemStreamReader<Customer>> readers = List.of(
    shard1ItemReader,  
    shard2ItemREader   
);

CompositeItemReader<Customer> compositeReader = new CompositeItemReader<>(readers);

CompositeItemWriter

public void write(Chunk<? extends T> chunk) throws Exception {
    for (ItemWriter<? super T> writer : delegates) {
       writer.write(chunk);
    }
}
CompositeItemWriter<Hacker> writer = new CompositeItemWriter<>(
   List.of(
       firstWriter,
       secondWriter,
       thirdWriter
   )
);

ClassifierCompositeItemWriter

// ClassifierCompositeItemWriter 내부의 classifier 필드
private Classifier<T, ItemWriter<? super T>> classifier = new ClassifierSupport<>(null);

구성 방법

예제 코드

    class SystemLogClassifier(
        private val criticalWriter: ItemWriter<SystemLog>,
        private val normalWriter: ItemWriter<SystemLog>,
    ) : Classifier<SystemLog, ItemWriter<in SystemLog>> {
        companion object {
            const val CRITICAL_CPU_THRESHOLD = 90
            const val CRITICAL_MEMORY_THRESHOLD = 1024L * 1024 * 1024 // 1GB
        }

        override fun classify(log: SystemLog): ItemWriter<in SystemLog> =
            if (isCritical(log)) {
                criticalWriter
            } else {
                normalWriter
            }

        private fun isCritical(log: SystemLog): Boolean =
            log.type == "CRITICAL" ||
                log.cpuUsage >= CRITICAL_CPU_THRESHOLD ||
                log.memoryUsage >= CRITICAL_MEMORY_THRESHOLD
    }
    @Bean
    fun classifierWriter(): ClassifierCompositeItemWriter<SystemLog> =
        ClassifierCompositeItemWriter<SystemLog>()
            .apply {
                setClassifier(
                    SystemLogClassifier(
                        criticalLogWriter(),
                        normalLogWriter(),
                    ),
                )
            }

    @Bean
    fun normalLogWriter(): ItemWriter<SystemLog> =
        ItemWriter { items ->
            log.info { "✅NormalLogWriter: 일반 로그 처리 중... 대충 파일에 출력하거나 하자.." }
            items.forEach { item ->
                log.info { "✅일반 처리: $item" }
            }
        }

    @Bean
    fun criticalLogWriter(): ItemWriter<SystemLog> =
        ItemWriter { items ->
            log.info { "🚨CriticalLogWriter: 치명적 시스템 로그 감지! 즉시 처리 시작!" }
            items.forEach { item ->
                // 실제 운영에선 여기서 슬랙 혹은 이메일 발송
                log.info { "🚨긴급 처리: $item" }
            }
        }