ItemReader/ItemWriter 인스턴스를 공유한다StepExecution으로 실행되므로, @StepScope 컴포넌트는 스레드마다 별도 인스턴스로 생성된다
ItemReader/ItemWriter의 공유 자체가 발생하지 않아 lock 경쟁이 사라짐[ 멀티스레드 스텝 ]
┌─── 워커 스레드 #1 ──▶ [청크1: Read→Process→Write] ───┐
│ │
[공유 Reader] ◀───┼─── 워커 스레드 #2 ──▶ [청크2: Read→Process→Write] ───┼──▶ [공유 Writer]
(lock 경쟁) │ │ (lock 경쟁)
└─── 워커 스레드 #3 ──▶ [청크3: Read→Process→Write] ───┘
→ 하나의 StepExecution, 컴포넌트 공유 → lock 병목
[ 파티셔닝 ]
스레드 #1: [전용 Reader] ──▶ 파티션1 (ID 1~100) ──▶ [전용 Writer] ← StepExecution #1
스레드 #2: [전용 Reader] ──▶ 파티션2 (ID 101~200) ──▶ [전용 Writer] ← StepExecution #2
스레드 #3: [전용 Reader] ──▶ 파티션3 (ID 201~300) ──▶ [전용 Writer] ← StepExecution #3
→ 각각 독립된 StepExecution, 컴포넌트 비공유 → lock 경쟁 없음
StepExecutionSplitter: Partitioner를 사용해 전체 데이터를 여러 파티션으로 나누고, 각 파티션마다 독립적인 워커 StepExecution을 생성PartitionHandler: 생성된 워커 StepExecution들을 가용한 스레드에 할당하여 병렬 실행을 지시하고, 모든 워커 완료 후 결과를 취합ItemReader/ItemWriter 인스턴스를 가짐
┌──────────────┐
│ ManagerStep │
│ (지휘만 담당) │
└──────┬───────┘
│
┌──────────────┼──────────────┐
▼ ▼ ▼
┌─────────────┐┌─────────────┐┌─────────────┐
│ WorkerStep#1││ WorkerStep#2││ WorkerStep#3│
│ 파티션1 처리 ││ 파티션2 처리 ││ 파티션3 처리 │
│ 전용 Reader ││ 전용 Reader ││ 전용 Reader │
│ 전용 Writer ││ 전용 Writer ││ 전용 Writer │
└─────────────┘└─────────────┘└─────────────┘
@FunctionalInterface
public interface Partitioner {
Map<String, ExecutionContext> partition(int gridSize);
}
gridSize개의 파티션을 생성하여 Map<String, ExecutionContext>로 반환
"0", "1" 또는 "MORNING_LOGS" 등 자유 지정)ExecutionContextStepExecutionSplitter가 파티셔닝 시작 시점에 이 메서드를 호출하고, 반환된 ExecutionContext마다 독립적인 워커 StepExecution을 생성한다Partitioner 구현의 기반이 된다gridSize로 나눠 각 파티션의 범위를 계산한다ExecutionContext를 생성하고 범위 정보를 저장한다Map에 담아 반환한다// 24시간을 gridSize개의 시간 블록으로 분할하는 예시
@Override
public Map<String, ExecutionContext> partition(int gridSize) {
Map<String, ExecutionContext> partitions = new HashMap<>(gridSize);
int hours = 24 / gridSize; // gridSize=4이면 6시간씩
for (int i = 0; i < gridSize; i++) {
ExecutionContext ctx = new ExecutionContext();
ctx.put("startDateTime", targetDate.atStartOfDay().plusHours(i * hours));
ctx.put("endDateTime", targetDate.atStartOfDay().plusHours((i + 1) * hours));
partitions.put(String.valueOf(i), ctx);
}
return partitions;
}
ItemReader를 @StepScope로 선언하면, 각 워커 StepExecution마다 새 인스턴스가 생성된다#{stepExecutionContext['key']}로 파티션별 ExecutionContext에 저장된 범위 정보를 주입받는다@Bean
@StepScope
public MongoCursorItemReader<Log> reader(
@Value("#{stepExecutionContext['startDateTime']}") Date start,
@Value("#{stepExecutionContext['endDateTime']}") Date end) {
return new MongoCursorItemReaderBuilder<Log>()
.jsonQuery("{ 'timestamp': { '$gte': ?0, '$lt': ?1 } }")
.parameterValues(List.of(start, end))
// ...
.build();
}
@Bean
public Step managerStep(Step workerStep) {
return new StepBuilder("managerStep", jobRepository)
.partitioner("workerStep", dailyTimeRangePartitioner) // Partitioner 주입
.step(workerStep) // 워커 스텝 지정
.taskExecutor(taskExecutor()) // 병렬 실행용 스레드 풀
.gridSize(4) // 파티션 개수
.build();
}
partitioner(stepName, partitioner): 이 메서드를 호출하면 Spring Batch가 해당 스텝을 PartitionStep으로 생성한다
Partitioner 구현체step(workerStep): 실제 데이터를 처리할 워커 스텝을 지정한다. ManagerStep은 이 워커를 파티션 개수만큼 복제하여 실행한다gridSize(n): 생성할 파티션 개수. Partitioner.partition(gridSize)의 인자로 전달된다@Bean
public Step workerStep() {
return new StepBuilder("workerStep", jobRepository)
.<Input, Output>chunk(500, transactionManager)
.reader(reader) // @StepScope로 파티션별 인스턴스 생성
.processor(processor)
.writer(writer)
.build();
}
StepExecution으로 복제되어 병렬 실행된다gridSize와 스레드 풀 크기를 일치시키는 것이 권장된다
StepExecution 레벨에서 데이터 분할 병렬화taskExecutor()를 추가하면 된다SynchronizedItemStreamReader로 Reader를 감싸야 한다ExecutionContext에 쿼리 범위값(startDateTime, endDateTime)을 전달하여, 같은 데이터 소스에서 범위를 나눠 읽음Partitioner 구현체Resource[] 배열을 받아 각 파일마다 ExecutionContext를 생성하고, "fileName" 키에 파일 URL을 저장한다gridSize를 무시하고 파일 개수만큼 파티션을 생성한다 (100개 파일 → 100개 파티션)file://...)이므로, Reader에서 Resource로 변환하여 사용해야 한다@Bean
@StepScope
public Partitioner partitioner(@Value("#{jobParameters['path']}") String path) {
MultiResourcePartitioner partitioner = new MultiResourcePartitioner();
Resource[] resources = new PathMatchingResourcePatternResolver()
.getResources("file://" + path + "/*.csv");
partitioner.setResources(resources);
return partitioner;
}
@StepScope로 선언하면 파티션마다 별도의 FlatFileItemWriter 인스턴스가 생성된다SystemCommandTasklet으로 OS 명령(예: cat *.out > merged.log)을 실행하여 통합@Bean
public Job fileJob(Step managerStep, Step mergeStep) {
return new JobBuilder("fileJob", jobRepository)
.start(managerStep)
.next(mergeStep) // 파티셔닝 완료 후 병합
.build();
}
StepBuilder.partitioner()를 호출하면 PartitionStepBuilder가 생성되고, 최종적으로 PartitionStep이라는 특수한 Step 구현체가 만들어진다TaskExecutorPartitionHandler: 워커 스텝, TaskExecutor, gridSize를 보유하며 병렬 실행을 담당SimpleStepExecutionSplitter: Partitioner를 사용해 파티션별 StepExecution을 생성SimpleStepExecutionSplitter.split()이 Partitioner.partition(gridSize)를 호출하여 ExecutionContext 맵을 받고, 각 엔트리마다 워커 StepExecution을 생성하여 Set<StepExecution>으로 반환TaskExecutorPartitionHandler.doHandle()이 각 StepExecution을 FutureTask로 감싸 TaskExecutor에 제출하고, step.execute(stepExecution)을 병렬 호출StepExecutionAggregator.aggregate()가 모든 워커의 실행 결과를 매니저 StepExecution에 병합BatchStatus.max()로 취합 — 워커 중 하나라도 FAILED이면 매니저도 FAILEDExitStatus.and()로 취합 — 하나라도 비정상이면 비정상readCount, writeCount, commitCount, rollbackCount, skipCount 등 모든 표준 카운트를 합산ExecutionContext에 저장한 커스텀 데이터(예: 처리 금액 총합)를 매니저 결과에 반영하려면 StepExecutionAggregator를 직접 구현한다.aggregator(customAggregator)로 매니저 스텝 빌더에 지정