ItemReader들을 순차적으로 실행ItemReader를 List로 전달하면 이를 순서대로 실행한다.
ItemReader가 더 이상 읽을 게 없다고 판단되면 null을 반환List<ItemStreamReader<Customer>> readers = List.of(
shard1ItemReader,
shard2ItemREader
);
CompositeItemReader<Customer> compositeReader = new CompositeItemReader<>(readers);
ItemReader들만 준비하면 된다.CompositeItemWriter는 여러 ItemWriter들에게 같은 데이터를 전달한다.
write()를 호출public void write(Chunk<? extends T> chunk) throws Exception {
for (ItemWriter<? super T> writer : delegates) {
writer.write(chunk);
}
}
CompositeItemWriter<Hacker> writer = new CompositeItemWriter<>(
List.of(
firstWriter,
secondWriter,
thirdWriter
)
);
CompositeItemWriter 사용에선 FlatFileItemWriter와 MongoItemWriter의 지연 쓰기가 빛을 발한다.
FlatFileItemWriter와 MongoItemWriter는 beforeCommit() 시점까지 실제 쓰기를 미룬다.FlatFileItemWriter, JdbcBatchItemWriter로 CompositeItemWriter를 구성한 경우
JdbcBatchItemWriter에서 예외가 발생한 경우 RDB에선 자연스레 롤백이 일어난다.FlatFileItemWriter.write()가 실행되었고, 즉시 데이터를 썼다면 파일의 데이터와 RDB 데이터 싱크는 어긋난다.CompositeItemWriter에선 모든 ItemWriter들이 같은 데이터를 처리해야 한다.ClassifierCompositeItemWriter는 여러 데이터들을 처리할 수 있다.
Classifier를 사용해 데이터를 분기write() 메서드에 청크가 전달되면 classify() 메서드가 동작ItemWriter가 선택된다.// ClassifierCompositeItemWriter 내부의 classifier 필드
private Classifier<T, ItemWriter<? super T>> classifier = new ClassifierSupport<>(null);
ClassifierCompositeItemWriter 구성하는 두 가지 방법
ClassifierCompositeItemWriter 객체 생성 후 setClassifer()로 분류기 설정ClassifierCompositeItemWriterBuilder.classifier()로 한 번의 설정 class SystemLogClassifier(
private val criticalWriter: ItemWriter<SystemLog>,
private val normalWriter: ItemWriter<SystemLog>,
) : Classifier<SystemLog, ItemWriter<in SystemLog>> {
companion object {
const val CRITICAL_CPU_THRESHOLD = 90
const val CRITICAL_MEMORY_THRESHOLD = 1024L * 1024 * 1024 // 1GB
}
override fun classify(log: SystemLog): ItemWriter<in SystemLog> =
if (isCritical(log)) {
criticalWriter
} else {
normalWriter
}
private fun isCritical(log: SystemLog): Boolean =
log.type == "CRITICAL" ||
log.cpuUsage >= CRITICAL_CPU_THRESHOLD ||
log.memoryUsage >= CRITICAL_MEMORY_THRESHOLD
}
@Bean
fun classifierWriter(): ClassifierCompositeItemWriter<SystemLog> =
ClassifierCompositeItemWriter<SystemLog>()
.apply {
setClassifier(
SystemLogClassifier(
criticalLogWriter(),
normalLogWriter(),
),
)
}
@Bean
fun normalLogWriter(): ItemWriter<SystemLog> =
ItemWriter { items ->
log.info { "✅NormalLogWriter: 일반 로그 처리 중... 대충 파일에 출력하거나 하자.." }
items.forEach { item ->
log.info { "✅일반 처리: $item" }
}
}
@Bean
fun criticalLogWriter(): ItemWriter<SystemLog> =
ItemWriter { items ->
log.info { "🚨CriticalLogWriter: 치명적 시스템 로그 감지! 즉시 처리 시작!" }
items.forEach { item ->
// 실제 운영에선 여기서 슬랙 혹은 이메일 발송
log.info { "🚨긴급 처리: $item" }
}
}