TIL

Multi-threaded Step

멀티스레드 스텝이란

구성 방법

return new StepBuilder("step", jobRepository)
    .<Input, Output>chunk(100)
    .reader(reader())
    .processor(processor())
    .writer(writer())
    .taskExecutor(taskExecutor())  // 멀티스레드 활성화
    .throttleLimit(5)
    .build();

@Bean
public TaskExecutor taskExecutor() {
    ThreadPoolTaskExecutor executor = new ThreadPoolTaskExecutor();
    executor.setCorePoolSize(5);
    executor.setMaxPoolSize(5);
    executor.setThreadNamePrefix("batch-");
    return executor;
}

내부 동작 원리

TaskExecutorRepeatTemplate

// AbstractTaskletStepBuilder.build()
if (taskExecutor != null) {
    TaskExecutorRepeatTemplate repeatTemplate = new TaskExecutorRepeatTemplate();
    repeatTemplate.setTaskExecutor(taskExecutor);
    repeatTemplate.setThrottleLimit(throttleLimit);
    stepOperations = repeatTemplate;
}
step.setStepOperations(stepOperations);

멀티스레드가 적용되는 지점

// TaskletStep.doExecute()
stepOperations.iterate(new StepContextRepeatCallback(stepExecution) { ... }); // 이 지점부터 병렬 처리
워커 스레드 #1: [청크1: Read→Process→Write]
워커 스레드 #2: [청크2: Read→Process→Write]    ← 동시에 실행
워커 스레드 #3: [청크3: Read→Process→Write]

워커 스레드의 StepExecution 바인딩

// StepContextRepeatCallback.doInIteration() — 각 워커 스레드마다 독립적으로 실행됨
public RepeatStatus doInIteration(RepeatContext context) {
    StepContext stepContext = StepSynchronizationManager.register(stepExecution); // ThreadLocal에 바인딩
    try {
        return doInChunkContext(context, chunkContext);
    } finally {
        StepSynchronizationManager.close();
    }
}

데이터 처리 안전성

thread-safe하지 않은 Reader 예시

FlatFileItemReader — 공유 변수 비보호

// "This reader is not thread-safe."
private int lineCount = 0;

private String readLine() {
    line = this.reader.readLine();
    lineCount++; // 여러 스레드가 동시에 접근 → 중복 읽기, 데이터 누락 가능
}

JpaCursorItemReader — 비원자적 Iterator 접근

// "The implementation is not thread-safe."
protected T doRead() {
    return this.iterator.hasNext() ? this.iterator.next() : null;
    // hasNext()와 next() 사이에 다른 스레드가 끼어들 수 있음
}

thread-safe한 Reader: JdbcPagingItemReader

// "The implementation is thread-safe in between calls to open(ExecutionContext),
//  but remember to use saveState=false if used in a multi-threaded client"
private final Lock lock = new ReentrantLock();

protected T doRead() {
    this.lock.lock();       // 한 번에 하나의 스레드만 진입
    try {
        if (results == null || current >= pageSize) {
            doReadPage();   // 페이지 단위로 데이터 조회 (독립적인 SQL 쿼리)
            page++;
        }
        int next = current++;
        return (next < results.size()) ? results.get(next) : null;
    } finally {
        this.lock.unlock();
    }
}

동기화 데코레이터

thread-safe하지 않은 ItemReader를 멀티스레드 스텝에서 사용하려면 동기화 데코레이터로 감싸면 된다.

SynchronizedItemStreamReaderItemStream을 구현한 ItemReader

// SynchronizedItemStreamReader 핵심 구조
public class SynchronizedItemStreamReader<T> implements ItemStreamReader<T> {
    private ItemStreamReader<T> delegate;
    private final Lock lock = new ReentrantLock();

    public T read() {
        this.lock.lock();
        try { return this.delegate.read(); }
        finally { this.lock.unlock(); }
    }

    // open(), update(), close()는 delegate에 위임
}

사용 예시:

@Bean
public ItemStreamReader<Data> threadSafeReader() {
    FlatFileItemReader<Data> reader = new FlatFileItemReaderBuilder<Data>()
            .name("reader")
            .resource(new ClassPathResource("data.csv"))
            .delimited().names("id", "name")
            .targetType(Data.class)
            .build();

    SynchronizedItemStreamReader<Data> synchronizedReader = new SynchronizedItemStreamReader<>();
    synchronizedReader.setDelegate(reader); // delegate에 원본 Reader 전달
    return synchronizedReader;
}

실행 상태 안전성 - FlatFileItemWriter

데이터 쓰기는 thread-safe하다

// FlatFileItemWriter.doWrite() — 공유 변수 없이 로컬 변수만 사용
public String doWrite(Chunk<? extends T> items) {
    StringBuilder lines = new StringBuilder(); // 스레드마다 독립적인 로컬 변수
    for (T item : items) {
        lines.append(this.lineAggregator.aggregate(item)).append(this.lineSeparator);
    }
    return lines.toString();
}

상태 관리는 thread-safe하지 않다

saveState=false는 대부분의 ItemStream 구현체의 공통 요구사항

ThreadPoolTaskExecutor 설정

멀티스레드 스텝에서 TaskExecutor로 주로 ThreadPoolTaskExecutor를 사용한다. 배치 처리 특성에 맞는 주요 설정 항목:

ThreadPoolTaskExecutor executor = new ThreadPoolTaskExecutor();
    executor.setCorePoolSize(5);
    executor.setMaxPoolSize(5);
    executor.setWaitForTasksToCompleteOnShutdown(true);
    executor.setAwaitTerminationSeconds(10);
    executor.setThreadNamePrefix("batch-");
    executor.setAllowCoreThreadTimeOut(true);
    executor.setKeepAliveSeconds(30);

동시성 제어

~5.x: throttleLimit (6.0에서 제거됨)

6.0~: stepOperations를 통한 동시성 제어

// 기본: TaskExecutor의 풀 사이즈가 곧 동시성 상한
return new StepBuilder("step", jobRepository)
    .<Input, Output>chunk(100)
    .reader(reader())
    .writer(writer())
    .taskExecutor(taskExecutor()) // corePoolSize=maxPoolSize=5 → 동시 5개 스레드
    .build();

// 세밀한 제어: 커스텀 RepeatOperations 사용
return new StepBuilder("step", jobRepository)
    .<Input, Output>chunk(100)
    .reader(reader())
    .writer(writer())
    .taskExecutor(taskExecutor())
    .stepOperations(customRepeatOperations) // 커스텀 반복 실행 전략
    .build();