TIL

Remote Chunking

Remote Chunking 개요

원격 파티셔닝과의 차이

사용 시나리오

한계점

Manager Node 구성

@Bean
public Step managerStep() {
    return remoteChunkingManagerStepBuilderFactory
            .get("managerStep")
            .chunk(3)
            .reader(dataReader())                        // Manager가 직접 데이터를 읽는다
            .outputChannel(outboundChunksToWorkers())    // Worker에게 ChunkRequest를 보낼 채널
            .inputChannel(inboundRepliesFromWorkers())   // Worker로부터 ChunkResponse를 받을 채널
            .build();
}

내부 메커니즘: ChunkMessageChannelItemWriter

// ChunkRequest — 데이터 자체가 네트워크를 통해 이동한다
public class ChunkRequest<T> implements Serializable {
    private final long jobId;                       // JobInstance ID
    private final Chunk<? extends T> items;         // 실제 데이터 아이템들이 담긴 청크
    private final StepContribution stepContribution;
    private final int sequence;                     // 청크 순번
}

채널과 IntegrationFlow

@Bean
public DirectChannel outboundChunksToWorkers() { return new DirectChannel(); }

@Bean // 비동기 수신을 위해 QueueChannel 사용
public QueueChannel inboundRepliesFromWorkers() { return new QueueChannel(); }

@Bean // outboundChunksToWorkers 채널 → Kafka chunk-request 토픽
public IntegrationFlow outboundChunkFlow(KafkaTemplate<Long, ChunkRequest> kafkaTemplate) {
    KafkaProducerMessageHandler<Long, ChunkRequest> handler =
            new KafkaProducerMessageHandler<>(kafkaTemplate);
    handler.setTopicExpression(new LiteralExpression("chunk-request"));
    // ChunkRequest의 sequence 번호 기반으로 Kafka 파티션(Worker)에 분배
    handler.setPartitionIdExpression(new FunctionExpression<>(message -> {
        ChunkRequest req = (ChunkRequest) message.getPayload();
        return req.getSequence() % partitionSize;
    }));
    return IntegrationFlow.from(outboundChunksToWorkers()).handle(handler).get();
}

@Bean // Kafka chunk-response 토픽 → inboundRepliesFromWorkers 채널
public IntegrationFlow inboundReplyFlow(ConsumerFactory<Long, ChunkResponse> cf) {
    return IntegrationFlow
            .from(Kafka.messageDrivenChannelAdapter(cf, "chunk-response"))
            .channel(inboundRepliesFromWorkers())
            .get();
}

Worker Node 구성

@Bean
public IntegrationFlow workerIntegrationFlow() {
    return remoteChunkingWorkerBuilder
            .inputChannel(inboundChunkRequestsFromManager())   // ChunkRequest 수신 채널
            .outputChannel(outboundRepliesToManager())          // ChunkResponse 전송 채널
            .itemProcessor(processor())                        // 수신된 청크의 각 아이템 처리
            .itemWriter(writer())                              // 처리 결과 기록
            .build();
}

내부 메커니즘: ChunkProcessorChunkHandler

// ChunkResponse — Worker의 청크 처리 결과
public class ChunkResponse implements Serializable {
    private final StepContribution stepContribution; // 핵심: 처리 통계(읽은 수, 쓴 수, 스킵 수 등)
    private final Long jobId;
    private final boolean status;    // 성공/실패 여부
    private final String message;    // 오류 메시지 (선택적)
    private final int sequence;      // 처리한 청크의 순번
}

채널과 IntegrationFlow

@Bean
public QueueChannel inboundChunkRequestsFromManager() { return new QueueChannel(); }

@Bean
public DirectChannel outboundRepliesToManager() { return new DirectChannel(); }

@Bean // Kafka chunk-request 토픽 → inboundChunkRequestsFromManager 채널
public IntegrationFlow inboundChunkRequestFlow(ConsumerFactory<Long, ChunkRequest> cf) {
    return IntegrationFlow
            .from(Kafka.messageDrivenChannelAdapter(cf, "chunk-request"))
            .channel(inboundChunkRequestsFromManager())
            .get();
}

@Bean // outboundRepliesToManager 채널 → Kafka chunk-response 토픽
public IntegrationFlow outboundResponseFlow(KafkaTemplate<Long, ChunkResponse> kafkaTemplate) {
    KafkaProducerMessageHandler<Long, ChunkResponse> handler =
            new KafkaProducerMessageHandler<>(kafkaTemplate);
    handler.setTopicExpression(new LiteralExpression("chunk-response"));
    return IntegrationFlow.from(outboundRepliesToManager()).handle(handler).get();
}

직렬화 및 환경 설정

# Manager 프로파일
spring:
  config:
    activate:
      on-profile: remote-chunking-manager
  kafka:
    producer:
      value-serializer: com.example.ChunkRequestSerDes$ChunkRequestSerializer
    consumer:
      value-deserializer: com.example.ChunkResponseSerDes$ChunkResponseDeserializer
      group-id: manager
---
# Worker 프로파일
spring:
  config:
    activate:
      on-profile: remote-chunking-worker
  kafka:
    producer:
      value-serializer: com.example.ChunkResponseSerDes$ChunkResponseSerializer
    consumer:
      value-deserializer: com.example.ChunkRequestSerDes$ChunkRequestDeserializer
      group-id: worker