TIL

Remote Partitioning

로컬 파티셔닝의 한계

로컬 vs 원격 파티셔닝

지표 로컬 파티셔닝 원격 파티셔닝
실행 환경 단일 JVM, 다중 스레드 여러 머신에 걸친 다중 JVM
메모리/처리 능력 단일 머신으로 제한 노드 추가로 선형 확장
실행 방법 직접 메서드 호출 메시지 기반 통신
결과 수집 동기식 비동기식

아키텍처 구성요소

원격 파티셔닝은 세 가지 구성요소로 작동한다:

Manager/Worker 구성

Manager Node

// Manager 설정 (@Profile("manager"), @EnableBatchIntegration)
// RemotePartitioningManagerStepBuilderFactory는 @EnableBatchIntegration이 자동 등록

@Bean
public Step managerStep() {
    return remotePartitioningManagerStepBuilderFactory
            .get("managerStep")
            .partitioner("workerStep", partitioner)
            .outputChannel(outboundRequests()) // 메시지를 내보낼 채널 지정
            .gridSize(4)
            .build();
}

@Bean
public DirectChannel outboundRequests() { return new DirectChannel(); }

@Bean // outputChannel → Kafka 토픽으로 전송하는 파이프라인
public IntegrationFlow outboundFlow(KafkaTemplate<Long, StepExecutionRequest> kafkaTemplate) {
    KafkaProducerMessageHandler<Long, StepExecutionRequest> handler =
            new KafkaProducerMessageHandler<>(kafkaTemplate);
    handler.setTopicExpression(new LiteralExpression("remote-partitioning"));
    return IntegrationFlow
            .from(outboundRequests()) // 이 채널에 메시지가 들어오면
            .handle(handler)          // Kafka로 전송
            .get();
}

Worker Node

// Worker 설정 (@Profile("worker"), @EnableBatchIntegration)

@Bean
public Step workerStep() {
    return remotePartitioningWorkerStepBuilderFactory
            .get("workerStep")
            .inputChannel(inboundRequests()) // 메시지를 수신할 채널 지정
            .<Input, Output>chunk(500, transactionManager)
            .reader(reader)
            .processor(processor)
            .writer(writer)
            .build();
}

@Bean // 비동기 수신을 위해 QueueChannel 사용
public QueueChannel inboundRequests() { return new QueueChannel(); }

@Bean // Kafka 토픽 → inputChannel로 전달하는 파이프라인
public IntegrationFlow inboundFlow(ConsumerFactory<String, String> cf) {
    return IntegrationFlow
            .from(Kafka.messageDrivenChannelAdapter(cf, "remote-partitioning"))
            .channel(inboundRequests()) // 수신된 메시지를 이 채널로 전달
            .get();
}

Kafka 파티션 라우팅

// Manager의 IntegrationFlow에서 파티션 라우팅 설정
handler.setPartitionIdExpression(new FunctionExpression<>(message -> {
    StepExecutionRequest request = (StepExecutionRequest) message.getPayload();
    return request.getStepExecutionId() % partitionSize; // modulo 연산으로 분배
}));