StepExecution을 여러 스레드로 동시에 실행하는 기법AbstractTaskletStepBuilder의 taskExecutor() 메서드에 TaskExecutor를 전달하면 멀티스레드 기반으로 동작한다.return new StepBuilder("step", jobRepository)
.<Input, Output>chunk(100)
.reader(reader())
.processor(processor())
.writer(writer())
.taskExecutor(taskExecutor()) // 멀티스레드 활성화
.throttleLimit(5)
.build();
@Bean
public TaskExecutor taskExecutor() {
ThreadPoolTaskExecutor executor = new ThreadPoolTaskExecutor();
executor.setCorePoolSize(5);
executor.setMaxPoolSize(5);
executor.setThreadNamePrefix("batch-");
return executor;
}
TaskExecutorRepeatTemplate
taskExecutor가 설정되어 있으면, 표준 RepeatTemplate 대신 TaskExecutorRepeatTemplate을 사용
RepeatTemplate은 Tasklet.execute()를 반복 실행하는 역할TaskExecutorRepeatTemplate은 이 반복 실행을 여러 스레드에서 병렬로 수행하는 확장 버전// AbstractTaskletStepBuilder.build()
if (taskExecutor != null) {
TaskExecutorRepeatTemplate repeatTemplate = new TaskExecutorRepeatTemplate();
repeatTemplate.setTaskExecutor(taskExecutor);
repeatTemplate.setThrottleLimit(throttleLimit);
stepOperations = repeatTemplate;
}
step.setStepOperations(stepOperations);
TaskletStep.doExecute()에서 stepOperations.iterate()가 호출되는 시점에 멀티스레드가 개입한다.// TaskletStep.doExecute()
stepOperations.iterate(new StepContextRepeatCallback(stepExecution) { ... }); // 이 지점부터 병렬 처리
청크 단위의 실제 처리 작업만 TaskExecutor의 워커 스레드들이 병렬 처리
Reader → Processor → Writer 사이클)를 여러 스레드가 동시에 수행. 각 스레드가 독립적인 청크를 가져와 처리Tasklet.execute() 메서드를 여러 스레드가 동시에 병렬 실행워커 스레드 #1: [청크1: Read→Process→Write]
워커 스레드 #2: [청크2: Read→Process→Write] ← 동시에 실행
워커 스레드 #3: [청크3: Read→Process→Write]
StepExecution은 ThreadLocal 기반이므로 각 워커 스레드에 별도로 바인딩해야 한다.TaskExecutorRepeatTemplate이 호출하는 StepContextRepeatCallback.doInIteration()에서 이를 처리// StepContextRepeatCallback.doInIteration() — 각 워커 스레드마다 독립적으로 실행됨
public RepeatStatus doInIteration(RepeatContext context) {
StepContext stepContext = StepSynchronizationManager.register(stepExecution); // ThreadLocal에 바인딩
try {
return doInChunkContext(context, chunkContext);
} finally {
StepSynchronizationManager.close();
}
}
StepSynchronizationManager.register()를 호출하여 자신의 ThreadLocal에 StepExecution을 바인딩Step 스코프가 올바르게 동작ItemWriter는 thread-safe하지만, ItemReader는 thread-safe하지 않은 구현체가 많다
FlatFileItemReader — 공유 변수 비보호
// "This reader is not thread-safe."
private int lineCount = 0;
private String readLine() {
line = this.reader.readLine();
lineCount++; // 여러 스레드가 동시에 접근 → 중복 읽기, 데이터 누락 가능
}
JpaCursorItemReader — 비원자적 Iterator 접근
// "The implementation is not thread-safe."
protected T doRead() {
return this.iterator.hasNext() ? this.iterator.next() : null;
// hasNext()와 next() 사이에 다른 스레드가 끼어들 수 있음
}
JdbcPagingItemReaderAbstractPagingItemReader의 doRead()가 ReentrantLock으로 보호되어 있다page), 현재 인덱스(current) 등의 공유 변수가 lock 안에서만 변경됨// "The implementation is thread-safe in between calls to open(ExecutionContext),
// but remember to use saveState=false if used in a multi-threaded client"
private final Lock lock = new ReentrantLock();
protected T doRead() {
this.lock.lock(); // 한 번에 하나의 스레드만 진입
try {
if (results == null || current >= pageSize) {
doReadPage(); // 페이지 단위로 데이터 조회 (독립적인 SQL 쿼리)
page++;
}
int next = current++;
return (next < results.size()) ? results.get(next) : null;
} finally {
this.lock.unlock();
}
}
LIMIT/OFFSET 등)로 실행되기 때문에 페이지 간 의존성이 없음thread-safe하지 않은 ItemReader를 멀티스레드 스텝에서 사용하려면 동기화 데코레이터로 감싸면 된다.
SynchronizedItemStreamReader — ItemStream을 구현한 ItemReader용
read()를 ReentrantLock으로 감싸 동기화ItemStream의 open()/update()/close()는 delegate에 위임하여 리소스 초기화와 상태 관리가 정상 동작// SynchronizedItemStreamReader 핵심 구조
public class SynchronizedItemStreamReader<T> implements ItemStreamReader<T> {
private ItemStreamReader<T> delegate;
private final Lock lock = new ReentrantLock();
public T read() {
this.lock.lock();
try { return this.delegate.read(); }
finally { this.lock.unlock(); }
}
// open(), update(), close()는 delegate에 위임
}
사용 예시:
@Bean
public ItemStreamReader<Data> threadSafeReader() {
FlatFileItemReader<Data> reader = new FlatFileItemReaderBuilder<Data>()
.name("reader")
.resource(new ClassPathResource("data.csv"))
.delimited().names("id", "name")
.targetType(Data.class)
.build();
SynchronizedItemStreamReader<Data> synchronizedReader = new SynchronizedItemStreamReader<>();
synchronizedReader.setDelegate(reader); // delegate에 원본 Reader 전달
return synchronizedReader;
}
반환 타입이 ItemStreamReader이므로, SimpleStepBuilder.registerAsStreamsAndListeners()에서 자동으로 ItemStream으로 인식되어 관리됨
SynchronizedItemReader — ItemStream을 구현하지 않은 ItemReader용
SynchronizedItemStreamReader에서 ItemStream 관련 로직만 제외된 형태ItemWriter용 동기화 데코레이터도 동일한 구조로 제공된다:
SynchronizedItemStreamWriter — ItemStream을 구현한 ItemWriter용SynchronizedItemWriter — ItemStream을 구현하지 않은 ItemWriter용데이터 읽기/쓰기의 스레드 안전성과 별개로, 실행 상태 관리(ItemStream) 측면의 스레드 안전성 문제가 있다.
FlatFileItemWriter의 주석은 “not thread-safe”라고 명시
SynchronizedItemStreamWriter의 주석은 “FlatFileItemWriter does NOT require synchronizing writes”라고 한다.데이터 쓰기는 thread-safe하다
// FlatFileItemWriter.doWrite() — 공유 변수 없이 로컬 변수만 사용
public String doWrite(Chunk<? extends T> items) {
StringBuilder lines = new StringBuilder(); // 스레드마다 독립적인 로컬 변수
for (T item : items) {
lines.append(this.lineAggregator.aggregate(item)).append(this.lineSeparator);
}
return lines.toString();
}
상태 관리는 thread-safe하지 않다
open(): 파일 열기, 버퍼 초기화, 헤더 작성 → 여러 스레드가 동시 호출 시 리소스 충돌close(): 푸터 작성, 버퍼 플러시, 파일 정리 → 동시 호출 시 불안정update(): ExecutionContext에 재시작 정보 저장 → 동시 호출 시 데이터 불일치
open()과 close()는 메인 스레드에서만 호출되므로 문제가 되지 않는다.
update()
ExecutionContext에 처리 위치를 저장하면 마지막 스레드의 값만 남아 재시작 시 정확한 지점을 찾을 수 없다.FlatFileItemWriter를 안전하게 사용하려면 saveState(false)를 설정해야 한다
update()에서 재시작용 메타데이터 저장을 하지 않게 되므로 재시작 기능을 포기하는 대신 상태 일관성을 확보ItemStream 구현체가 saveState=false를 요구
JdbcPagingItemReader의 주석이 이를 잘 보여준다:
open()” → 데이터 읽기(read()) 관점에서는 thread-safesaveState=false” → 실행 상태 관리 관점에서는 재시작 기능을 포기해야 함멀티스레드 스텝에서 TaskExecutor로 주로 ThreadPoolTaskExecutor를 사용한다. 배치 처리 특성에 맞는 주요 설정 항목:
ThreadPoolTaskExecutor executor = new ThreadPoolTaskExecutor();
executor.setCorePoolSize(5);
executor.setMaxPoolSize(5);
executor.setWaitForTasksToCompleteOnShutdown(true);
executor.setAwaitTerminationSeconds(10);
executor.setThreadNamePrefix("batch-");
executor.setAllowCoreThreadTimeOut(true);
executor.setKeepAliveSeconds(30);
corePoolSize / maxPoolSize
corePoolSize: 스레드 풀이 유지하는 기본 스레드 수. 작업이 없어도 이 수만큼의 스레드는 기본적으로 살아있다maxPoolSize: 스레드 풀이 생성할 수 있는 최대 스레드 수waitForTasksToCompleteOnShutdown / awaitTerminationSeconds
waitForTasksToCompleteOnShutdown(true): JVM 종료(shutdown) 시 현재 실행 중인 작업이 완료될 때까지 대기. false(기본값)면 즉시 스레드를 중단시켜 데이터 처리가 중간에 끊길 수 있음awaitTerminationSeconds(10): 대기 시간의 상한. 이 시간이 지나면 미완료 작업이 있어도 강제 종료. 청크당 처리 시간을 고려하여 설정해야 하며, 네트워크 지연이 있는 작업에서는 30초 이상 권장allowCoreThreadTimeOut / keepAliveSeconds
allowCoreThreadTimeOut(true): 코어 스레드도 유휴 상태가 지속되면 종료 대상이 됨. 기본값 false에서는 코어 스레드가 영구 유지되어 배치 완료 후에도 JVM이 종료되지 않는 문제가 발생keepAliveSeconds(30): 유휴 스레드가 종료되기까지의 대기 시간. 이 시간 동안 새 작업이 없으면 스레드가 제거됨. 배치 작업이 모두 끝나면 이 시간 경과 후 스레드 풀이 비워지고 JVM이 정상 종료됨TaskExecutorRepeatTemplate이 TaskExecutor의 maxPoolSize와 별개로 동시 실행 스레드 수를 추가 제한하는 설정TaskExecutor를 받아놓고 내부적으로 별도의 스레드 제한을 암묵적으로 적용하는 설계 문제로 6.0에서 완전 제거ThreadPoolTaskExecutor의 풀 사이즈로 직접 동시성을 제어하는 것이 기본 방식RepeatOperations를 구현하여 stepOperations()에 설정// 기본: TaskExecutor의 풀 사이즈가 곧 동시성 상한
return new StepBuilder("step", jobRepository)
.<Input, Output>chunk(100)
.reader(reader())
.writer(writer())
.taskExecutor(taskExecutor()) // corePoolSize=maxPoolSize=5 → 동시 5개 스레드
.build();
// 세밀한 제어: 커스텀 RepeatOperations 사용
return new StepBuilder("step", jobRepository)
.<Input, Output>chunk(100)
.reader(reader())
.writer(writer())
.taskExecutor(taskExecutor())
.stepOperations(customRepeatOperations) // 커스텀 반복 실행 전략
.build();