OutOfMemoryError 발생| 지표 | 로컬 파티셔닝 | 원격 파티셔닝 |
|---|---|---|
| 실행 환경 | 단일 JVM, 다중 스레드 | 여러 머신에 걸친 다중 JVM |
| 메모리/처리 능력 | 단일 머신으로 제한 | 노드 추가로 선형 확장 |
| 실행 방법 | 직접 메서드 호출 | 메시지 기반 통신 |
| 결과 수집 | 동기식 | 비동기식 |
원격 파티셔닝은 세 가지 구성요소로 작동한다:
StepExecutionRequest를 Message Broker를 통해 Worker에게 전송
TaskExecutorPartitionHandler 대신 MessageChannelPartitionHandler를 사용@EnableBatchIntegration 어노테이션이 필수인데 아래 컴포넌트들이 자동 등록된다.
RemotePartitioningManagerStepBuilderFactoryRemotePartitioningWorkerStepBuilderFactoryIntegrationFlow를 통해 Broker와의 통신 파이프라인을 구성한다
MessageChannel): Spring Integration의 핵심 추상화로, 컴포넌트 간 메시지를 전달하는 통로
DirectChannel: 동기식, 메시지가 들어오면 즉시 하나의 핸들러에 전달QueueChannel: 비동기식, 내부 큐에 메시지를 보관하여 소비자가 가져갈 때까지 대기IntegrationFlow: 메시지가 채널에 들어온 후 어디로 어떻게 흘러가는지를 정의하는 파이프라인 DSL (from → handle → channel 등을 체이닝)RemotePartitioningManagerStepBuilderFactory로 Manager Step을 생성한다
MessageChannelPartitionHandler를 구성하여 메시지 채널을 통해 파티션 요청을 전송.step() 메서드가 없다 — 워커 스텝을 직접 지정할 필요가 없음outputChannel(): StepExecutionRequest 메시지를 Broker로 내보내는 채널을 지정IntegrationFlow: outputChannel로 들어온 메시지를 Kafka 토픽으로 전송하는 파이프라인// Manager 설정 (@Profile("manager"), @EnableBatchIntegration)
// RemotePartitioningManagerStepBuilderFactory는 @EnableBatchIntegration이 자동 등록
@Bean
public Step managerStep() {
return remotePartitioningManagerStepBuilderFactory
.get("managerStep")
.partitioner("workerStep", partitioner)
.outputChannel(outboundRequests()) // 메시지를 내보낼 채널 지정
.gridSize(4)
.build();
}
@Bean
public DirectChannel outboundRequests() { return new DirectChannel(); }
@Bean // outputChannel → Kafka 토픽으로 전송하는 파이프라인
public IntegrationFlow outboundFlow(KafkaTemplate<Long, StepExecutionRequest> kafkaTemplate) {
KafkaProducerMessageHandler<Long, StepExecutionRequest> handler =
new KafkaProducerMessageHandler<>(kafkaTemplate);
handler.setTopicExpression(new LiteralExpression("remote-partitioning"));
return IntegrationFlow
.from(outboundRequests()) // 이 채널에 메시지가 들어오면
.handle(handler) // Kafka로 전송
.get();
}
RemotePartitioningWorkerStepBuilderFactory로 Worker Step을 생성한다inputChannel(): Broker로부터 StepExecutionRequest를 수신하는 채널을 지정 (Manager의 outputChannel과 대응)IntegrationFlow: Kafka 토픽을 구독하여 수신된 메시지를 inputChannel로 전달하는 파이프라인// Worker 설정 (@Profile("worker"), @EnableBatchIntegration)
@Bean
public Step workerStep() {
return remotePartitioningWorkerStepBuilderFactory
.get("workerStep")
.inputChannel(inboundRequests()) // 메시지를 수신할 채널 지정
.<Input, Output>chunk(500, transactionManager)
.reader(reader)
.processor(processor)
.writer(writer)
.build();
}
@Bean // 비동기 수신을 위해 QueueChannel 사용
public QueueChannel inboundRequests() { return new QueueChannel(); }
@Bean // Kafka 토픽 → inputChannel로 전달하는 파이프라인
public IntegrationFlow inboundFlow(ConsumerFactory<String, String> cf) {
return IntegrationFlow
.from(Kafka.messageDrivenChannelAdapter(cf, "remote-partitioning"))
.channel(inboundRequests()) // 수신된 메시지를 이 채널로 전달
.get();
}
StepExecution ID를 기반으로 적절한 Kafka 파티션에 메시지를 분배해야 한다IntegrationFlow에서 KafkaProducerMessageHandler.setPartitionIdExpression()으로 라우팅 전략을 주입한다// Manager의 IntegrationFlow에서 파티션 라우팅 설정
handler.setPartitionIdExpression(new FunctionExpression<>(message -> {
StepExecutionRequest request = (StepExecutionRequest) message.getPayload();
return request.getStepExecutionId() % partitionSize; // modulo 연산으로 분배
}));