TIL

REST API를 통한 Job 실행과 JobOperator

웹 환경을 구성하는 경우 application.yml에 spring.batch.job.enabled 프로퍼티를 false로 설정함으로써 애플리케이션 시작 시 Job 자동 실행을 막을 수 있다. false인 경우 JobLauncherApplicationRunner가 자동 구성 되지 않는다.

JobLauncher를 활용한 REST API 구현

REST API 컨트롤러 예제

@PostMapping("/{jobName}/start")
public ResponseEntity<String> launchJob(@PathVariable String jobName) throws Exception {
    Job job = jobRegistry.getJob(jobName); // JobRegistry에서 이름으로 Job 조회

    JobParameters jobParameters = new JobParametersBuilder(jobExplorer)
            .getNextJobParameters(job).toJobParameters(); // 이전 실행 이력 기반으로 다음 파라미터 생성

    JobExecution execution = jobLauncher.run(job, jobParameters); // Job 실행
    return ResponseEntity.ok("Job launched with ID: " + execution.getId());
}

비동기 TaskExecutor 설정

@Configuration
public class BatchCustomConfiguration {
    @Bean
    @BatchTaskExecutor
    public TaskExecutor taskExecutor() {
        return new SimpleAsyncTaskExecutor(); // 비동기 실행
    }
}

실행 테스트

curl -X POST http://localhost:8080/api/jobs/brutalizedSystemJob/start
Job launched with ID: 1

JobOperator

public interface JobOperator {
    Long start(String jobName, Properties parameters);     // Job 시작
    boolean stop(long executionId);                        // Job 중지 요청
    Long restart(long executionId);                        // 실패/중지된 Job 재시작
    Long startNextInstance(String jobName);                // 다음 인스턴스 시작
    JobExecution abandon(long jobExecutionId);             // Job 포기(재시작 불가 처리)
    Set<Long> getRunningExecutions(String jobName);        // 실행 중인 Job 조회
    String getSummary(long executionId);                   // 실행 요약 정보
    Map<Long, String> getStepExecutionSummaries(long executionId); // Step 실행 요약
    Set<String> getJobNames();                             // 등록된 Job 이름 목록
    // ...
}

주요 메서드

stop()과 restart() 동작 흐름

stop() 호출 시

restart() 호출 시

재시작 시 ItemReader의 동작 - AbstractItemCountingItemStreamItemReader

// AbstractItemCountingItemStreamItemReader.update()
executionContext.putInt(getExecutionContextKey(READ_COUNT), currentItemCount);
// AbstractItemCountingItemStreamItemReader.open()
int itemCount = executionContext.getInt(getExecutionContextKey(READ_COUNT));
jumpToItem(itemCount);
currentItemCount = itemCount;

JobOperator Job 중지 메커니즘

SimpleJobOperator.stop()

// SimpleJobOperator.stop()
jobExecution.setStatus(BatchStatus.STOPPING);
jobRepository.update(jobExecution);

중지 요청의 감지 - SimpleJobRepository.checkForInterruption()

// SimpleJobRepository.update()
public void update(StepExecution stepExecution) {
    // ...
    checkForInterruption(stepExecution); // 여기서 중지 요청 감지
}

// SimpleJobRepository.checkForInterruption()
private void checkForInterruption(StepExecution stepExecution) {
    JobExecution jobExecution = stepExecution.getJobExecution();
    jobExecutionDao.synchronizeStatus(jobExecution); // 메타데이터 저장소에서 최신 상태를 읽어옴
    if (jobExecution.isStopping()) {
        stepExecution.setTerminateOnly(); // terminateOnly 플래그 설정
    }
}

실제 중단 처리 - ThreadStepInterruptionPolicy

예외 전파와 상태 결정

// AbstractStep.determineBatchStatus()
private static BatchStatus determineBatchStatus(Throwable e) {
    if (e instanceof JobInterruptedException) {
        return BatchStatus.STOPPED;  // 일반 실패(FAILED)와 구분
    }
    return BatchStatus.FAILED;
}

Step에서 Job으로의 중단 전파

// SimpleStepHandler.handleStep()
if (currentStepExecution.getStatus() == BatchStatus.STOPPING
        || currentStepExecution.getStatus() == BatchStatus.STOPPED) {
    execution.setStatus(BatchStatus.STOPPING);
    throw new JobInterruptedException("Job interrupted by step execution");
}

왜 즉시 중단되지 않는가?

flowchart LR
    JO[JobOperator] -->|"update()\nJobExecution = STOPPING"| JR["JobRepository\n(메타데이터 저장소)"]
    JR -->|"synchronize\nJobExecution"| TS[TaskletStep]
    TS -->|"throw\nJobInterruptedException"| AS[AbstractStep]
    AS -->|return| SSH{"SimpleStepHandler\nif (StepExecution ==\nSTOPPED)"}
    SSH -->|"throw\nJobInterruptedException"| AJ[AbstractJob]
    AS -->|"update()\nStepExecution = STOPPED"| JR
    AJ -->|"? update()\nJobExecution = STOPPED"| JR

JobOperator Job 재시작 메커니즘

SimpleJobOperator.restart()

// SimpleJobOperator.restart()
JobExecution jobExecution = findExecutionById(executionId);
Job job = jobRegistry.getJob(jobExecution.getJobInstance().getJobName());
JobParameters parameters = jobExecution.getJobParameters();

return jobLauncher.run(job, parameters).getId(); // 동일한 JobParameters로 JobLauncher에 위임