Job을 실행할 수 있다.
JobLauncher를 실행하는 REST API를 통한 Job 제어JobOperator 활용으로 Job 중지와 재시작 관리웹 환경을 구성하는 경우 application.yml에
spring.batch.job.enabled프로퍼티를false로 설정함으로써 애플리케이션 시작 시Job자동 실행을 막을 수 있다.false인 경우JobLauncherApplicationRunner가 자동 구성 되지 않는다.
@PostMapping("/{jobName}/start")
public ResponseEntity<String> launchJob(@PathVariable String jobName) throws Exception {
Job job = jobRegistry.getJob(jobName); // JobRegistry에서 이름으로 Job 조회
JobParameters jobParameters = new JobParametersBuilder(jobExplorer)
.getNextJobParameters(job).toJobParameters(); // 이전 실행 이력 기반으로 다음 파라미터 생성
JobExecution execution = jobLauncher.run(job, jobParameters); // Job 실행
return ResponseEntity.ok("Job launched with ID: " + execution.getId());
}
JobLauncherApplicationRunner가 내부적으로 수행하던 로직과 유사JobLauncher: 실제 Job 실행을 담당하는 진입점JobExplorer: 이전 실행 이력을 바탕으로 다음 JobParameters를 생성JobRegistry: 등록된 모든 Job들을 이름으로 조회하는 중앙 집중식 저장소TaskExecutorJobLauncher는 동기 방식의 SyncTaskExecutor를 사용
@Configuration
public class BatchCustomConfiguration {
@Bean
@BatchTaskExecutor
public TaskExecutor taskExecutor() {
return new SimpleAsyncTaskExecutor(); // 비동기 실행
}
}
@BatchTaskExecutor로 등록하면 Spring Boot가 자동으로 JobLauncher에 주입curl -X POST http://localhost:8080/api/jobs/brutalizedSystemJob/start
Job launched with ID: 1
JobExecution ID는 메타데이터 저장소의 고유 식별자로, 실행 상태 조회나 제어에 사용JobLauncher만으로는 Job 시작만 가능하고 중지/재시작/모니터링 등 정교한 제어에는 한계가 있다.JobLauncher보다 한 단계 높은 수준의 추상화를 제공하는 운영 인터페이스Job의 시작, 중지, 재시작 등 배치 작업의 전체 생명주기를 관리public interface JobOperator {
Long start(String jobName, Properties parameters); // Job 시작
boolean stop(long executionId); // Job 중지 요청
Long restart(long executionId); // 실패/중지된 Job 재시작
Long startNextInstance(String jobName); // 다음 인스턴스 시작
JobExecution abandon(long jobExecutionId); // Job 포기(재시작 불가 처리)
Set<Long> getRunningExecutions(String jobName); // 실행 중인 Job 조회
String getSummary(long executionId); // 실행 요약 정보
Map<Long, String> getStepExecutionSummaries(long executionId); // Step 실행 요약
Set<String> getJobNames(); // 등록된 Job 이름 목록
// ...
}
start(jobName, parameters)
JobRegistry에서 Job을 조회하고 JobParameters를 변환하여 JobLauncher.run()을 호출JobParametersIncrementer를 자동 처리하지 않으므로 매번 다른 파라미터를 직접 전달해야 한다.stop(executionId)
JobExecution의 BatchStatus를 STOPPING으로 변경Step이 다음 청크 경계에서 상태를 확인한 후 이루어지므로 즉시 중지되지는 않는다.restart(executionId)
JobExecution을 기반으로 Job을 재시작ExecutionContext를 통해 중단된 지점부터 계속 실행하며, 새 JobExecution ID를 반환startNextInstance(jobName)
JobParametersIncrementer를 사용해 자동으로 증분된 파라미터로 새 JobInstance를 시작abandon(executionId)
JobExecution의 상태를 ABANDONED로 변경ABANDONED 상태의 Step은 재시작 시에도 실행되지 않는다.getRunningExecutions(jobName) - 현재 실행 중인 JobExecution ID 목록 조회getSummary(executionId) - JobExecution의 요약 정보를 문자열로 반환getStepExecutionSummaries(executionId) - 해당 JobExecution에 포함된 각 StepExecution의 요약 정보 조회stop() 호출 시
JobExecution의 BatchStatus가 STOPPING으로 변경되고 메타데이터 저장소에 반영Job이 즉시 중지되는 것이 아니라, Step이 다음 청크 경계에서 상태를 확인한 후 중지
STATUS와 EXIT_CODE가 모두 STOPPED로 변경restart() 호출 시
JobExecution의 ID를 전달하면 새로운 JobExecution이 생성되어 재시작ExecutionContext에 저장된 진행 상태를 기반으로 중단된 지점부터 계속 실행update()에서 현재까지 읽은 아이템 수를 ExecutionContext에 저장// AbstractItemCountingItemStreamItemReader.update()
executionContext.putInt(getExecutionContextKey(READ_COUNT), currentItemCount);
open()에서 저장된 위치를 읽어 jumpToItem()을 호출하여 중단 지점으로 이동// AbstractItemCountingItemStreamItemReader.open()
int itemCount = executionContext.getInt(getExecutionContextKey(READ_COUNT));
jumpToItem(itemCount);
currentItemCount = itemCount;
JobExecution을 찾아 BatchStatus.STOPPING으로 변경하고 메타데이터 저장소에 업데이트Job을 즉시 중단시키는 것이 아니라, 중단 요청을 메타데이터에 기록하는 것에 불과// SimpleJobOperator.stop()
jobExecution.setStatus(BatchStatus.STOPPING);
jobRepository.update(jobExecution);
JobRepository.update(stepExecution)가 호출될 때 중지 요청을 감지// SimpleJobRepository.update()
public void update(StepExecution stepExecution) {
// ...
checkForInterruption(stepExecution); // 여기서 중지 요청 감지
}
// SimpleJobRepository.checkForInterruption()
private void checkForInterruption(StepExecution stepExecution) {
JobExecution jobExecution = stepExecution.getJobExecution();
jobExecutionDao.synchronizeStatus(jobExecution); // 메타데이터 저장소에서 최신 상태를 읽어옴
if (jobExecution.isStopping()) {
stepExecution.setTerminateOnly(); // terminateOnly 플래그 설정
}
}
synchronizeStatus()로 메타데이터 저장소에서 JobExecution의 최신 상태를 읽어옴STOPPING 상태를 감지하면 StepExecution의 terminateOnly 플래그를 true로 설정doInChunkContext()에서 checkInterrupted()가 호출됨StepExecution.isTerminateOnly()가 true이면 JobInterruptedException을 던짐JobInterruptedException이 AbstractStep.execute()의 catch 절까지 전파// AbstractStep.determineBatchStatus()
private static BatchStatus determineBatchStatus(Throwable e) {
if (e instanceof JobInterruptedException) {
return BatchStatus.STOPPED; // 일반 실패(FAILED)와 구분
}
return BatchStatus.FAILED;
}
JobInterruptedException은 FAILED가 아닌 STOPPED로 처리됨ExitStatus도 STOPPED로 설정되며, EXIT_MESSAGE에 JobInterruptedException 클래스명이 기록SimpleStepHandler.handleStep()에서 StepExecution이 STOPPED 상태임을 감지// SimpleStepHandler.handleStep()
if (currentStepExecution.getStatus() == BatchStatus.STOPPING
|| currentStepExecution.getStatus() == BatchStatus.STOPPED) {
execution.setStatus(BatchStatus.STOPPING);
throw new JobInterruptedException("Job interrupted by step execution");
}
JobExecution의 상태를 STOPPING으로 설정하고 다시 JobInterruptedException을 던짐AbstractJob.execute()까지 전파되어 JobExecution의 상태가 최종적으로 STOPPED로 확정stop() 호출 시점에 제어권은 Spring Batch가 아닌 비즈니스 로직(Tasklet/ItemReader 등)에 있을 수 있다.doInChunkContext())에서 checkInterrupted()를 통해 이루어짐stop() 호출 → 현재 청크 완료 → 다음 청크로 넘어가기 전에 중지KafkaItemWriter 등 롤백이 불가능한 구현체에서는 중간 중단 시 데이터 일관성이 깨질 수 있다.flowchart LR
JO[JobOperator] -->|"update()\nJobExecution = STOPPING"| JR["JobRepository\n(메타데이터 저장소)"]
JR -->|"synchronize\nJobExecution"| TS[TaskletStep]
TS -->|"throw\nJobInterruptedException"| AS[AbstractStep]
AS -->|return| SSH{"SimpleStepHandler\nif (StepExecution ==\nSTOPPED)"}
SSH -->|"throw\nJobInterruptedException"| AJ[AbstractJob]
AS -->|"update()\nStepExecution = STOPPED"| JR
AJ -->|"? update()\nJobExecution = STOPPED"| JR
executionId로 JobExecution을 조회하고, JobRegistry에서 Job 빈을 찾아 JobLauncher.run()을 호출JobLauncher 내부에 있으므로 restart() 자체는 단순한 위임 역할// SimpleJobOperator.restart()
JobExecution jobExecution = findExecutionById(executionId);
Job job = jobRegistry.getJob(jobExecution.getJobInstance().getJobName());
JobParameters parameters = jobExecution.getJobParameters();
return jobLauncher.run(job, parameters).getId(); // 동일한 JobParameters로 JobLauncher에 위임