TIL

Local Partitioning

멀티스레드 스텝의 성능 한계

파티셔닝이란

[ 멀티스레드 스텝 ]

                  ┌─── 워커 스레드 #1 ──▶ [청크1: Read→Process→Write] ───┐
                  │                                                     │
[공유 Reader] ◀───┼─── 워커 스레드 #2 ──▶ [청크2: Read→Process→Write] ───┼──▶ [공유 Writer]
  (lock 경쟁)     │                                                     │     (lock 경쟁)
                  └─── 워커 스레드 #3 ──▶ [청크3: Read→Process→Write] ───┘

→ 하나의 StepExecution, 컴포넌트 공유 → lock 병목


[ 파티셔닝 ]

스레드 #1: [전용 Reader] ──▶ 파티션1 (ID 1~100)   ──▶ [전용 Writer]   ← StepExecution #1
스레드 #2: [전용 Reader] ──▶ 파티션2 (ID 101~200) ──▶ [전용 Writer]   ← StepExecution #2
스레드 #3: [전용 Reader] ──▶ 파티션3 (ID 201~300) ──▶ [전용 Writer]   ← StepExecution #3

→ 각각 독립된 StepExecution, 컴포넌트 비공유 → lock 경쟁 없음

파티셔닝 아키텍처: ManagerStep과 WorkerStep

ManagerStep

WorkerStep

                ┌──────────────┐
                │  ManagerStep │
                │ (지휘만 담당)  │
                └──────┬───────┘
                       │
        ┌──────────────┼──────────────┐
        ▼              ▼              ▼
 ┌─────────────┐┌─────────────┐┌─────────────┐
 │ WorkerStep#1││ WorkerStep#2││ WorkerStep#3│
 │ 파티션1 처리  ││ 파티션2 처리  ││ 파티션3 처리  │
 │ 전용 Reader  ││ 전용 Reader  ││ 전용 Reader  │
 │ 전용 Writer  ││ 전용 Writer  ││ 전용 Writer  │
 └─────────────┘└─────────────┘└─────────────┘

Partitioner

@FunctionalInterface
public interface Partitioner {
    Map<String, ExecutionContext> partition(int gridSize);
}

구현 전제: 입력 데이터 범위의 확정

// 24시간을 gridSize개의 시간 블록으로 분할하는 예시
@Override
public Map<String, ExecutionContext> partition(int gridSize) {
    Map<String, ExecutionContext> partitions = new HashMap<>(gridSize);
    int hours = 24 / gridSize; // gridSize=4이면 6시간씩

    for (int i = 0; i < gridSize; i++) {
        ExecutionContext ctx = new ExecutionContext();
        ctx.put("startDateTime", targetDate.atStartOfDay().plusHours(i * hours));
        ctx.put("endDateTime", targetDate.atStartOfDay().plusHours((i + 1) * hours));
        partitions.put(String.valueOf(i), ctx);
    }
    return partitions;
}

워커에서의 파티션 정보 활용

@Bean
@StepScope
public MongoCursorItemReader<Log> reader(
        @Value("#{stepExecutionContext['startDateTime']}") Date start,
        @Value("#{stepExecutionContext['endDateTime']}") Date end) {
    return new MongoCursorItemReaderBuilder<Log>()
            .jsonQuery("{ 'timestamp': { '$gte': ?0, '$lt': ?1 } }")
            .parameterValues(List.of(start, end))
            // ...
            .build();
}

파티셔닝 구성

ManagerStep 구성

@Bean
public Step managerStep(Step workerStep) {
    return new StepBuilder("managerStep", jobRepository)
            .partitioner("workerStep", dailyTimeRangePartitioner) // Partitioner 주입
            .step(workerStep)             // 워커 스텝 지정
            .taskExecutor(taskExecutor()) // 병렬 실행용 스레드 풀
            .gridSize(4)                  // 파티션 개수
            .build();
}

WorkerStep 구성

@Bean
public Step workerStep() {
    return new StepBuilder("workerStep", jobRepository)
            .<Input, Output>chunk(500, transactionManager)
            .reader(reader)       // @StepScope로 파티션별 인스턴스 생성
            .processor(processor)
            .writer(writer)
            .build();
}

TaskExecutor 설정

다중 계층 병렬화: 파티셔닝 + 멀티스레드 스텝

구현

실제로 필요한가?

파일 기반 파티셔닝

DB 기반 vs 파일 기반

MultiResourcePartitioner

@Bean
@StepScope
public Partitioner partitioner(@Value("#{jobParameters['path']}") String path) {
    MultiResourcePartitioner partitioner = new MultiResourcePartitioner();
    Resource[] resources = new PathMatchingResourcePatternResolver()
            .getResources("file://" + path + "/*.csv");
    partitioner.setResources(resources);
    return partitioner;
}

FlatFileItemWriter 주의사항

결과 파일 병합

@Bean
public Job fileJob(Step managerStep, Step mergeStep) {
    return new JobBuilder("fileJob", jobRepository)
            .start(managerStep)
            .next(mergeStep)    // 파티셔닝 완료 후 병합
            .build();
}

PartitionStep 내부 동작

실행 흐름: 3단계

  1. 분할(Split): SimpleStepExecutionSplitter.split()Partitioner.partition(gridSize)를 호출하여 ExecutionContext 맵을 받고, 각 엔트리마다 워커 StepExecution을 생성하여 Set<StepExecution>으로 반환
  2. 병렬 실행(Parallel Execution): TaskExecutorPartitionHandler.doHandle()이 각 StepExecutionFutureTask로 감싸 TaskExecutor에 제출하고, step.execute(stepExecution)을 병렬 호출
  3. 결과 통합(Aggregation): StepExecutionAggregator.aggregate()가 모든 워커의 실행 결과를 매니저 StepExecution에 병합

DefaultStepExecutionAggregator

커스텀 Aggregator