StepExecutionRequest — ID만 포함)을 Worker에게 보내고, Worker가 직접 데이터를 조회하여 처리**ChunkProvider** (데이터 읽기 담당)**ChunkProcessor** (데이터 처리·쓰기 담당)ChunkRequest(처리할 데이터 포함), Worker → Manager로 ChunkResponse(처리 결과 요약)@EnableBatchIntegration 선언 시 자동 주입되는 RemoteChunkingManagerStepBuilderFactory를 사용한다reader()가 있다 — Manager가 직접 데이터를 읽는다@Bean
public Step managerStep() {
return remoteChunkingManagerStepBuilderFactory
.get("managerStep")
.chunk(3)
.reader(dataReader()) // Manager가 직접 데이터를 읽는다
.outputChannel(outboundChunksToWorkers()) // Worker에게 ChunkRequest를 보낼 채널
.inputChannel(inboundRepliesFromWorkers()) // Worker로부터 ChunkResponse를 받을 채널
.build();
}
RemoteChunkingManagerStepBuilder는 내부적으로 ItemWriter로 ChunkMessageChannelItemWriter를 사용한다ChunkRequest 메시지로 포장하여 outputChannel로 전송한다inputChannel을 통해 모든 Worker로부터 응답(ChunkResponse)이 도착할 때까지 대기(getNextResult())하여 작업 완료를 보장한다// ChunkRequest — 데이터 자체가 네트워크를 통해 이동한다
public class ChunkRequest<T> implements Serializable {
private final long jobId; // JobInstance ID
private final Chunk<? extends T> items; // 실제 데이터 아이템들이 담긴 청크
private final StepContribution stepContribution;
private final int sequence; // 청크 순번
}
chunk-request 토픽, 응답은 chunk-response 토픽을 사용한다@Bean
public DirectChannel outboundChunksToWorkers() { return new DirectChannel(); }
@Bean // 비동기 수신을 위해 QueueChannel 사용
public QueueChannel inboundRepliesFromWorkers() { return new QueueChannel(); }
@Bean // outboundChunksToWorkers 채널 → Kafka chunk-request 토픽
public IntegrationFlow outboundChunkFlow(KafkaTemplate<Long, ChunkRequest> kafkaTemplate) {
KafkaProducerMessageHandler<Long, ChunkRequest> handler =
new KafkaProducerMessageHandler<>(kafkaTemplate);
handler.setTopicExpression(new LiteralExpression("chunk-request"));
// ChunkRequest의 sequence 번호 기반으로 Kafka 파티션(Worker)에 분배
handler.setPartitionIdExpression(new FunctionExpression<>(message -> {
ChunkRequest req = (ChunkRequest) message.getPayload();
return req.getSequence() % partitionSize;
}));
return IntegrationFlow.from(outboundChunksToWorkers()).handle(handler).get();
}
@Bean // Kafka chunk-response 토픽 → inboundRepliesFromWorkers 채널
public IntegrationFlow inboundReplyFlow(ConsumerFactory<Long, ChunkResponse> cf) {
return IntegrationFlow
.from(Kafka.messageDrivenChannelAdapter(cf, "chunk-response"))
.channel(inboundRepliesFromWorkers())
.get();
}
@EnableBatchIntegration 선언 시 자동 주입되는 RemoteChunkingWorkerBuilder를 사용한다IntegrationFlow를 생성한다 — Worker는 스텝을 실행하는 것이 아니라 메시지 처리 흐름을 타기 때문이다itemProcessor()와 itemWriter()를 설정하는 것이 핵심 — 원격 청킹의 목적 자체가 이 두 단계의 부하를 Worker에 분산시키는 것이다@Bean
public IntegrationFlow workerIntegrationFlow() {
return remoteChunkingWorkerBuilder
.inputChannel(inboundChunkRequestsFromManager()) // ChunkRequest 수신 채널
.outputChannel(outboundRepliesToManager()) // ChunkResponse 전송 채널
.itemProcessor(processor()) // 수신된 청크의 각 아이템 처리
.itemWriter(writer()) // 처리 결과 기록
.build();
}
RemoteChunkingWorkerBuilder는 내부적으로 itemProcessor와 itemWriter를 사용하는 SimpleChunkProcessor를 만들고, 이를 ChunkProcessorChunkHandler에 설정한다ChunkRequest 메시지가 inputChannel로 들어오면, IntegrationFlow가 ChunkProcessorChunkHandler.handleChunk()로 전달한다handleChunk()는 SimpleChunkProcessor를 사용하여 청크(ChunkRequest.items)를 처리(process/write)하고, 결과를 담은 ChunkResponse를 생성하여 outputChannel로 내보낸다// ChunkResponse — Worker의 청크 처리 결과
public class ChunkResponse implements Serializable {
private final StepContribution stepContribution; // 핵심: 처리 통계(읽은 수, 쓴 수, 스킵 수 등)
private final Long jobId;
private final boolean status; // 성공/실패 여부
private final String message; // 오류 메시지 (선택적)
private final int sequence; // 처리한 청크의 순번
}
stepContribution - Worker가 처리한 단일 청크의 모든 처리 기록(읽은 수, 쓴 수, 필터링/스킵 수, 상태 등)이 담겨 있다ChunkResponse를 수신할 때마다 각 stepContribution의 값을 자신의 메인 StepExecution 상태에 합산(aggregate)하여, 최종적으로 Job의 전체 처리 통계와 완료 상태를 결정한다@Bean
public QueueChannel inboundChunkRequestsFromManager() { return new QueueChannel(); }
@Bean
public DirectChannel outboundRepliesToManager() { return new DirectChannel(); }
@Bean // Kafka chunk-request 토픽 → inboundChunkRequestsFromManager 채널
public IntegrationFlow inboundChunkRequestFlow(ConsumerFactory<Long, ChunkRequest> cf) {
return IntegrationFlow
.from(Kafka.messageDrivenChannelAdapter(cf, "chunk-request"))
.channel(inboundChunkRequestsFromManager())
.get();
}
@Bean // outboundRepliesToManager 채널 → Kafka chunk-response 토픽
public IntegrationFlow outboundResponseFlow(KafkaTemplate<Long, ChunkResponse> kafkaTemplate) {
KafkaProducerMessageHandler<Long, ChunkResponse> handler =
new KafkaProducerMessageHandler<>(kafkaTemplate);
handler.setTopicExpression(new LiteralExpression("chunk-response"));
return IntegrationFlow.from(outboundRepliesToManager()).handle(handler).get();
}
ChunkRequest(내부에 데이터 Item 포함)와 ChunkResponse를 Kafka로 전송하려면 전용 Serializer/Deserializer가 필요하다StepExecution 직렬화와 동일한 원리 — Spring의 DefaultSerializer/DefaultDeserializer를 활용한 커스텀 SerDes 구현ChunkRequest 전송(producer → ChunkRequestSerializer), ChunkResponse 수신(consumer → ChunkResponseDeserializer)ChunkRequest 수신(consumer → ChunkRequestDeserializer), ChunkResponse 전송(producer → ChunkResponseSerializer)# Manager 프로파일
spring:
config:
activate:
on-profile: remote-chunking-manager
kafka:
producer:
value-serializer: com.example.ChunkRequestSerDes$ChunkRequestSerializer
consumer:
value-deserializer: com.example.ChunkResponseSerDes$ChunkResponseDeserializer
group-id: manager
---
# Worker 프로파일
spring:
config:
activate:
on-profile: remote-chunking-worker
kafka:
producer:
value-serializer: com.example.ChunkResponseSerDes$ChunkResponseSerializer
consumer:
value-deserializer: com.example.ChunkRequestSerDes$ChunkRequestDeserializer
group-id: worker