OutOfMemoryError 발생| 지표 | 로컬 파티셔닝 | 원격 파티셔닝 |
|---|---|---|
| 실행 환경 | 단일 JVM, 다중 스레드 | 여러 머신에 걸친 다중 JVM |
| 메모리/처리 능력 | 단일 머신으로 제한 | 노드 추가로 선형 확장 |
| 실행 방법 | 직접 메서드 호출 | 메시지 기반 통신 |
| 결과 수집 | 동기식 | 비동기식 |
원격 파티셔닝은 세 가지 구성요소로 작동한다:
StepExecutionRequest를 Message Broker를 통해 Worker에게 전송
TaskExecutorPartitionHandler 대신 MessageChannelPartitionHandler를 사용@EnableBatchIntegration 어노테이션이 필수인데 아래 컴포넌트들이 자동 등록된다.
RemotePartitioningManagerStepBuilderFactoryRemotePartitioningWorkerStepBuilderFactoryIntegrationFlow를 통해 Broker와의 통신 파이프라인을 구성한다
MessageChannel): Spring Integration의 핵심 추상화로, 컴포넌트 간 메시지를 전달하는 통로
DirectChannel: 동기식, 메시지가 들어오면 즉시 하나의 핸들러에 전달QueueChannel: 비동기식, 내부 큐에 메시지를 보관하여 소비자가 가져갈 때까지 대기IntegrationFlow: 메시지가 채널에 들어온 후 어디로 어떻게 흘러가는지를 정의하는 파이프라인 DSL (from → handle → channel 등을 체이닝)RemotePartitioningManagerStepBuilderFactory로 Manager Step을 생성한다
MessageChannelPartitionHandler를 구성하여 메시지 채널을 통해 파티션 요청을 전송.step() 메서드가 없다 — 워커 스텝을 직접 지정할 필요가 없음outputChannel(): StepExecutionRequest 메시지를 Broker로 내보내는 채널을 지정IntegrationFlow: outputChannel로 들어온 메시지를 Kafka 토픽으로 전송하는 파이프라인// Manager 설정 (@Profile("manager"), @EnableBatchIntegration)
// RemotePartitioningManagerStepBuilderFactory는 @EnableBatchIntegration이 자동 등록
@Bean
public Step managerStep() {
return remotePartitioningManagerStepBuilderFactory
.get("managerStep")
.partitioner("workerStep", partitioner)
.outputChannel(outboundRequests()) // 메시지를 내보낼 채널 지정
.gridSize(4)
.build();
}
@Bean
public DirectChannel outboundRequests() { return new DirectChannel(); }
@Bean // outputChannel → Kafka 토픽으로 전송하는 파이프라인
public IntegrationFlow outboundFlow(KafkaTemplate<Long, StepExecutionRequest> kafkaTemplate) {
KafkaProducerMessageHandler<Long, StepExecutionRequest> handler =
new KafkaProducerMessageHandler<>(kafkaTemplate);
handler.setTopicExpression(new LiteralExpression("remote-partitioning"));
return IntegrationFlow
.from(outboundRequests()) // 이 채널에 메시지가 들어오면
.handle(handler) // Kafka로 전송
.get();
}
RemotePartitioningWorkerStepBuilderFactory로 Worker Step을 생성한다inputChannel(): Broker로부터 StepExecutionRequest를 수신하는 채널을 지정 (Manager의 outputChannel과 대응)IntegrationFlow: Kafka 토픽을 구독하여 수신된 메시지를 inputChannel로 전달하는 파이프라인// Worker 설정 (@Profile("worker"), @EnableBatchIntegration)
@Bean
public Step workerStep() {
return remotePartitioningWorkerStepBuilderFactory
.get("workerStep")
.inputChannel(inboundRequests()) // 메시지를 수신할 채널 지정
.<Input, Output>chunk(500, transactionManager)
.reader(reader)
.processor(processor)
.writer(writer)
.build();
}
@Bean // 비동기 수신을 위해 QueueChannel 사용
public QueueChannel inboundRequests() { return new QueueChannel(); }
@Bean // Kafka 토픽 → inputChannel로 전달하는 파이프라인
public IntegrationFlow inboundFlow(ConsumerFactory<String, String> cf) {
return IntegrationFlow
.from(Kafka.messageDrivenChannelAdapter(cf, "remote-partitioning"))
.channel(inboundRequests()) // 수신된 메시지를 이 채널로 전달
.get();
}
StepExecution ID를 기반으로 적절한 Kafka 파티션에 메시지를 분배해야 한다IntegrationFlow에서 KafkaProducerMessageHandler.setPartitionIdExpression()으로 라우팅 전략을 주입한다// Manager의 IntegrationFlow에서 파티션 라우팅 설정
handler.setPartitionIdExpression(new FunctionExpression<>(message -> {
StepExecutionRequest request = (StepExecutionRequest) message.getPayload();
return request.getStepExecutionId() % partitionSize; // modulo 연산으로 분배
}));
MessageChannelPartitionHandler.doHandle()은 각 파티션의 StepExecution마다 StepExecutionRequest 메시지를 생성하여 outputChannel로 전송한다ExecutionContext나 StepExecution 객체가 아닌, 세 가지 식별자만 전송:
stepName: 실행할 Worker Step의 이름jobExecutionId: 현재 Job의 IDstepExecutionId: 처리할 파티션의 StepExecution IDStepExecutionRequestHandler.handle()이 수신된 메시지를 처리한다 (RemotePartitioningWorkerStepBuilder가 내부적으로 등록)StepExecutionRequest에서 jobExecutionId와 stepExecutionId를 추출JobExplorer를 통해 공유 JobRepository에서 해당 StepExecution을 조회 — 여기에 파티션의 ExecutionContext(데이터 범위 정보)가 포함되어 있다stepName으로 Worker Step 빈을 찾아 step.execute(stepExecution) 실행ExecutionContext를 가져와야 한다worker 프로파일로 상시 실행 상태를 유지한다gridSize와 일치시켜야 각 Worker가 고유한 파티션을 할당받을 수 있다MessageChannelPartitionHandler.doHandle()은 메시지 전송 후 두 가지 전략 중 하나로 결과를 취합한다pollInterval 주기로 JobRepository를 반복 조회하여, 모든 Worker StepExecution이 최종 상태(COMPLETED, FAILED 등)에 도달할 때까지 대기pollInterval: 폴링 주기 (기본값 10초)timeout: 최대 대기 시간 (기본값 -1 = 무한 대기, 초과 시 Manager Step 실패)// inputChannel을 설정하지 않으면 자동으로 폴링 방식 사용
@Bean
public Step managerStep() {
return remotePartitioningManagerStepBuilderFactory
.get("managerStep")
.partitioner("workerStep", partitioner)
.outputChannel(outboundRequests())
.gridSize(4)
.build(); // pollInterval 기본값 10초, timeout 기본값 -1(무한 대기)
}
.inputChannel()을 설정하면, 폴링 대신 Worker로부터 직접 응답 메시지를 수신하는 방식으로 전환된다inputChannel을 설정하면 내부적으로 Spring Integration의 Aggregation Flow가 자동 생성된다:
inputChannel로 도착correlationId(그룹 식별자)와 sequenceSize(전체 파티션 수)를 기반으로 그룹핑sequenceSize만큼의 응답이 모두 도착하면 @Aggregator 메서드로 최종 집계Set<StepExecution>이 내부 채널을 통해 receiveReplies()로 전달MessageTimeoutException 발생// Manager: .inputChannel()을 설정하면 메시지 기반 응답 수신으로 전환
@Bean
public Step managerStep() {
return remotePartitioningManagerStepBuilderFactory
.get("managerStep")
.partitioner("workerStep", partitioner)
.outputChannel(outboundRequests())
.inputChannel(workerRepliesInputChannel()) // 이 한 줄이 폴링 → 메시지 방식 전환의 핵심
.gridSize(4)
.build();
}
// Worker: .outputChannel()로 완료된 StepExecution을 응답 토픽으로 전송
@Bean
public Step workerStep() {
return remotePartitioningWorkerStepBuilderFactory
.get("workerStep")
.inputChannel(inboundRequests())
.outputChannel(outboundResultChannel()) // 완료 후 StepExecution 응답 전송
.<Input, Output>chunk(500, transactionManager)
.reader(reader).processor(processor).writer(writer)
.build();
}
StepExecution 객체 자체를 Kafka로 전송해야 한다StepExecution은 내부에 JobExecution과의 순환 참조를 갖고 있어 단순 JSON 직렬화로는 안정적으로 전송할 수 없다DefaultSerializer/DefaultDeserializer(Java 직렬화 기반)를 활용한 커스텀 Kafka Serializer/Deserializer를 구현해야 한다// Kafka Serializer 구현 — StepExecution → byte[]
public class StepExecutionSerializer implements Serializer<StepExecution> {
private final DefaultSerializer serializer = new DefaultSerializer();
@Override
public byte[] serialize(String topic, StepExecution stepExecution) {
ByteArrayOutputStream output = new ByteArrayOutputStream();
serializer.serialize(stepExecution, output); // Spring의 Java 직렬화 활용
return output.toByteArray();
}
}
// Kafka Deserializer 구현 — byte[] → StepExecution
public class StepExecutionDeserializer implements Deserializer<StepExecution> {
private final DefaultDeserializer deserializer = new DefaultDeserializer();
@Override
public StepExecution deserialize(String topic, byte[] data) {
return (StepExecution) deserializer.deserialize(new ByteArrayInputStream(data));
}
}
StepExecution 응답을 수신해야 하므로 consumer 설정 추가 (value-deserializer에 StepExecutionDeserializer 지정)StepExecution 응답을 전송해야 하므로 producer 설정 추가 (value-serializer에 StepExecutionSerializer 지정)QueueChannel을 응답 수신용 채널로 정의하고, Kafka 응답 토픽에서 메시지를 받아 이 채널로 전달하는 IntegrationFlow를 구성한다// Manager 응답 수신 구성
@Bean
public QueueChannel workerRepliesInputChannel() { return new QueueChannel(); }
@Bean // step-execution-results 토픽 → workerRepliesInputChannel
public IntegrationFlow inboundResponseFlow(ConsumerFactory<Long, StepExecution> cf) {
return IntegrationFlow
.from(Kafka.messageDrivenChannelAdapter(cf, "step-execution-results"))
.channel(workerRepliesInputChannel())
.get();
}
DirectChannel을 응답 전송용 채널로 정의하고, 이 채널의 메시지를 Kafka 응답 토픽으로 내보내는 IntegrationFlow를 구성한다// Worker 응답 전송 구성
@Bean
public DirectChannel outboundResultChannel() { return new DirectChannel(); }
@Bean // outboundResultChannel → step-execution-results 토픽
public IntegrationFlow outboundResponseFlow(KafkaTemplate<Long, StepExecution> kafkaTemplate) {
KafkaProducerMessageHandler<Long, StepExecution> handler =
new KafkaProducerMessageHandler<>(kafkaTemplate);
handler.setTopicExpression(new LiteralExpression("step-execution-results"));
return IntegrationFlow
.from(outboundResultChannel())
.handle(handler)
.nullChannel(); // 단방향 전송이므로 응답 불필요
}
outboundRequests 채널 → IntegrationFlow → Kafka(remote-partitioning 토픽) → IntegrationFlow → inboundRequests 채널 → WorkeroutboundResultChannel 채널 → IntegrationFlow → Kafka(step-execution-results 토픽) → IntegrationFlow → workerRepliesInputChannel 채널 → Manager (Aggregator가 correlationId/sequenceSize로 그룹핑 후 집계)JobParameters로 처리 범위를 분할
ExecutionContext와 동일한 원리를 JobParameters로 적용하는 것startId=1, endId=10000, 인스턴스2는 startId=10001, endId=20000StepExecution을 하나의 JobExecution으로 묶어 추적·관리해야 하는 요구가 있을 때