TIL

Remote Partitioning

로컬 파티셔닝의 한계

로컬 vs 원격 파티셔닝

지표 로컬 파티셔닝 원격 파티셔닝
실행 환경 단일 JVM, 다중 스레드 여러 머신에 걸친 다중 JVM
메모리/처리 능력 단일 머신으로 제한 노드 추가로 선형 확장
실행 방법 직접 메서드 호출 메시지 기반 통신
결과 수집 동기식 비동기식

아키텍처 구성요소

원격 파티셔닝은 세 가지 구성요소로 작동한다:

Manager/Worker 구성

Manager Node

// Manager 설정 (@Profile("manager"), @EnableBatchIntegration)
// RemotePartitioningManagerStepBuilderFactory는 @EnableBatchIntegration이 자동 등록

@Bean
public Step managerStep() {
    return remotePartitioningManagerStepBuilderFactory
            .get("managerStep")
            .partitioner("workerStep", partitioner)
            .outputChannel(outboundRequests()) // 메시지를 내보낼 채널 지정
            .gridSize(4)
            .build();
}

@Bean
public DirectChannel outboundRequests() { return new DirectChannel(); }

@Bean // outputChannel → Kafka 토픽으로 전송하는 파이프라인
public IntegrationFlow outboundFlow(KafkaTemplate<Long, StepExecutionRequest> kafkaTemplate) {
    KafkaProducerMessageHandler<Long, StepExecutionRequest> handler =
            new KafkaProducerMessageHandler<>(kafkaTemplate);
    handler.setTopicExpression(new LiteralExpression("remote-partitioning"));
    return IntegrationFlow
            .from(outboundRequests()) // 이 채널에 메시지가 들어오면
            .handle(handler)          // Kafka로 전송
            .get();
}

Worker Node

// Worker 설정 (@Profile("worker"), @EnableBatchIntegration)

@Bean
public Step workerStep() {
    return remotePartitioningWorkerStepBuilderFactory
            .get("workerStep")
            .inputChannel(inboundRequests()) // 메시지를 수신할 채널 지정
            .<Input, Output>chunk(500, transactionManager)
            .reader(reader)
            .processor(processor)
            .writer(writer)
            .build();
}

@Bean // 비동기 수신을 위해 QueueChannel 사용
public QueueChannel inboundRequests() { return new QueueChannel(); }

@Bean // Kafka 토픽 → inputChannel로 전달하는 파이프라인
public IntegrationFlow inboundFlow(ConsumerFactory<String, String> cf) {
    return IntegrationFlow
            .from(Kafka.messageDrivenChannelAdapter(cf, "remote-partitioning"))
            .channel(inboundRequests()) // 수신된 메시지를 이 채널로 전달
            .get();
}

Kafka 파티션 라우팅

// Manager의 IntegrationFlow에서 파티션 라우팅 설정
handler.setPartitionIdExpression(new FunctionExpression<>(message -> {
    StepExecutionRequest request = (StepExecutionRequest) message.getPayload();
    return request.getStepExecutionId() % partitionSize; // modulo 연산으로 분배
}));

메시지 교환 메커니즘

Manager가 전송하는 메시지

Worker의 메시지 처리

Manager와 Worker가 JobRepository를 공유해야 하는 이유

실행 순서와 주의사항

Manager의 결과 통합(Aggregation)

전략 1: JobRepository 폴링 (기본값)

// inputChannel을 설정하지 않으면 자동으로 폴링 방식 사용
@Bean
public Step managerStep() {
    return remotePartitioningManagerStepBuilderFactory
            .get("managerStep")
            .partitioner("workerStep", partitioner)
            .outputChannel(outboundRequests())
            .gridSize(4)
            .build(); // pollInterval 기본값 10초, timeout 기본값 -1(무한 대기)
}

전략 2: 메시지 기반 응답 수신

// Manager: .inputChannel()을 설정하면 메시지 기반 응답 수신으로 전환
@Bean
public Step managerStep() {
    return remotePartitioningManagerStepBuilderFactory
            .get("managerStep")
            .partitioner("workerStep", partitioner)
            .outputChannel(outboundRequests())
            .inputChannel(workerRepliesInputChannel()) // 이 한 줄이 폴링 → 메시지 방식 전환의 핵심
            .gridSize(4)
            .build();
}

// Worker: .outputChannel()로 완료된 StepExecution을 응답 토픽으로 전송
@Bean
public Step workerStep() {
    return remotePartitioningWorkerStepBuilderFactory
            .get("workerStep")
            .inputChannel(inboundRequests())
            .outputChannel(outboundResultChannel()) // 완료 후 StepExecution 응답 전송
            .<Input, Output>chunk(500, transactionManager)
            .reader(reader).processor(processor).writer(writer)
            .build();
}

메시지 응답 방식 구현

StepExecution 직렬화

// Kafka Serializer 구현 — StepExecution → byte[]
public class StepExecutionSerializer implements Serializer<StepExecution> {
    private final DefaultSerializer serializer = new DefaultSerializer();

    @Override
    public byte[] serialize(String topic, StepExecution stepExecution) {
        ByteArrayOutputStream output = new ByteArrayOutputStream();
        serializer.serialize(stepExecution, output); // Spring의 Java 직렬화 활용
        return output.toByteArray();
    }
}

// Kafka Deserializer 구현 — byte[] → StepExecution
public class StepExecutionDeserializer implements Deserializer<StepExecution> {
    private final DefaultDeserializer deserializer = new DefaultDeserializer();

    @Override
    public StepExecution deserialize(String topic, byte[] data) {
        return (StepExecution) deserializer.deserialize(new ByteArrayInputStream(data));
    }
}

application.yml 설정 변경

응답 전용 토픽

Manager 응답 수신 채널과 IntegrationFlow

// Manager 응답 수신 구성
@Bean
public QueueChannel workerRepliesInputChannel() { return new QueueChannel(); }

@Bean // step-execution-results 토픽 → workerRepliesInputChannel
public IntegrationFlow inboundResponseFlow(ConsumerFactory<Long, StepExecution> cf) {
    return IntegrationFlow
            .from(Kafka.messageDrivenChannelAdapter(cf, "step-execution-results"))
            .channel(workerRepliesInputChannel())
            .get();
}

Worker 응답 전송 채널과 IntegrationFlow

// Worker 응답 전송 구성
@Bean
public DirectChannel outboundResultChannel() { return new DirectChannel(); }

@Bean // outboundResultChannel → step-execution-results 토픽
public IntegrationFlow outboundResponseFlow(KafkaTemplate<Long, StepExecution> kafkaTemplate) {
    KafkaProducerMessageHandler<Long, StepExecution> handler =
            new KafkaProducerMessageHandler<>(kafkaTemplate);
    handler.setTopicExpression(new LiteralExpression("step-execution-results"));
    return IntegrationFlow
            .from(outboundResultChannel())
            .handle(handler)
            .nullChannel(); // 단방향 전송이므로 응답 불필요
}

메시지 흐름 요약

부록: 원격 파티셔닝의 한계와 대안

한계 1: 높은 복잡성

한계 2: 공유 DB 강제

현실적 대안: JobParameters 기반 분산 실행