Job을 실행할 수 있다.
JobLauncher를 실행하는 REST API를 통한 Job 제어JobOperator 활용으로 Job 중지와 재시작 관리웹 환경을 구성하는 경우 application.yml에
spring.batch.job.enabled프로퍼티를false로 설정함으로써 애플리케이션 시작 시Job자동 실행을 막을 수 있다.false인 경우JobLauncherApplicationRunner가 자동 구성 되지 않는다.
@PostMapping("/{jobName}/start")
public ResponseEntity<String> launchJob(@PathVariable String jobName) throws Exception {
Job job = jobRegistry.getJob(jobName); // JobRegistry에서 이름으로 Job 조회
JobParameters jobParameters = new JobParametersBuilder(jobExplorer)
.getNextJobParameters(job).toJobParameters(); // 이전 실행 이력 기반으로 다음 파라미터 생성
JobExecution execution = jobLauncher.run(job, jobParameters); // Job 실행
return ResponseEntity.ok("Job launched with ID: " + execution.getId());
}
JobLauncherApplicationRunner가 내부적으로 수행하던 로직과 유사JobLauncher: 실제 Job 실행을 담당하는 진입점JobExplorer: 이전 실행 이력을 바탕으로 다음 JobParameters를 생성JobRegistry: 등록된 모든 Job들을 이름으로 조회하는 중앙 집중식 저장소TaskExecutorJobLauncher는 동기 방식의 SyncTaskExecutor를 사용
@Configuration
public class BatchCustomConfiguration {
@Bean
@BatchTaskExecutor
public TaskExecutor taskExecutor() {
return new SimpleAsyncTaskExecutor(); // 비동기 실행
}
}
@BatchTaskExecutor로 등록하면 Spring Boot가 자동으로 JobLauncher에 주입curl -X POST http://localhost:8080/api/jobs/brutalizedSystemJob/start
Job launched with ID: 1
JobExecution ID는 메타데이터 저장소의 고유 식별자로, 실행 상태 조회나 제어에 사용JobLauncher만으로는 Job 시작만 가능하고 중지/재시작/모니터링 등 정교한 제어에는 한계가 있다.JobLauncher보다 한 단계 높은 수준의 추상화를 제공하는 운영 인터페이스Job의 시작, 중지, 재시작 등 배치 작업의 전체 생명주기를 관리public interface JobOperator {
Long start(String jobName, Properties parameters); // Job 시작
boolean stop(long executionId); // Job 중지 요청
Long restart(long executionId); // 실패/중지된 Job 재시작
Long startNextInstance(String jobName); // 다음 인스턴스 시작
JobExecution abandon(long jobExecutionId); // Job 포기(재시작 불가 처리)
Set<Long> getRunningExecutions(String jobName); // 실행 중인 Job 조회
String getSummary(long executionId); // 실행 요약 정보
Map<Long, String> getStepExecutionSummaries(long executionId); // Step 실행 요약
Set<String> getJobNames(); // 등록된 Job 이름 목록
// ...
}
start(jobName, parameters)
JobRegistry에서 Job을 조회하고 JobParameters를 변환하여 JobLauncher.run()을 호출JobParametersIncrementer를 자동 처리하지 않으므로 매번 다른 파라미터를 직접 전달해야 한다.stop(executionId)
JobExecution의 BatchStatus를 STOPPING으로 변경Step이 다음 청크 경계에서 상태를 확인한 후 이루어지므로 즉시 중지되지는 않는다.restart(executionId)
JobExecution을 기반으로 Job을 재시작ExecutionContext를 통해 중단된 지점부터 계속 실행하며, 새 JobExecution ID를 반환startNextInstance(jobName)
JobParametersIncrementer를 사용해 자동으로 증분된 파라미터로 새 JobInstance를 시작abandon(executionId)
JobExecution의 상태를 ABANDONED로 변경ABANDONED 상태의 Step은 재시작 시에도 실행되지 않는다.getRunningExecutions(jobName) - 현재 실행 중인 JobExecution ID 목록 조회getSummary(executionId) - JobExecution의 요약 정보를 문자열로 반환getStepExecutionSummaries(executionId) - 해당 JobExecution에 포함된 각 StepExecution의 요약 정보 조회stop() 호출 시
JobExecution의 BatchStatus가 STOPPING으로 변경되고 메타데이터 저장소에 반영Job이 즉시 중지되는 것이 아니라, Step이 다음 청크 경계에서 상태를 확인한 후 중지
STATUS와 EXIT_CODE가 모두 STOPPED로 변경restart() 호출 시
JobExecution의 ID를 전달하면 새로운 JobExecution이 생성되어 재시작ExecutionContext에 저장된 진행 상태를 기반으로 중단된 지점부터 계속 실행update()에서 현재까지 읽은 아이템 수를 ExecutionContext에 저장// AbstractItemCountingItemStreamItemReader.update()
executionContext.putInt(getExecutionContextKey(READ_COUNT), currentItemCount);
open()에서 저장된 위치를 읽어 jumpToItem()을 호출하여 중단 지점으로 이동// AbstractItemCountingItemStreamItemReader.open()
int itemCount = executionContext.getInt(getExecutionContextKey(READ_COUNT));
jumpToItem(itemCount);
currentItemCount = itemCount;
JobExecution을 찾아 BatchStatus.STOPPING으로 변경하고 메타데이터 저장소에 업데이트Job을 즉시 중단시키는 것이 아니라, 중단 요청을 메타데이터에 기록하는 것에 불과// SimpleJobOperator.stop()
jobExecution.setStatus(BatchStatus.STOPPING);
jobRepository.update(jobExecution);
JobRepository.update(stepExecution)가 호출될 때 중지 요청을 감지// SimpleJobRepository.update()
public void update(StepExecution stepExecution) {
// ...
checkForInterruption(stepExecution); // 여기서 중지 요청 감지
}
// SimpleJobRepository.checkForInterruption()
private void checkForInterruption(StepExecution stepExecution) {
JobExecution jobExecution = stepExecution.getJobExecution();
jobExecutionDao.synchronizeStatus(jobExecution); // 메타데이터 저장소에서 최신 상태를 읽어옴
if (jobExecution.isStopping()) {
stepExecution.setTerminateOnly(); // terminateOnly 플래그 설정
}
}
synchronizeStatus()로 메타데이터 저장소에서 JobExecution의 최신 상태를 읽어옴STOPPING 상태를 감지하면 StepExecution의 terminateOnly 플래그를 true로 설정doInChunkContext()에서 checkInterrupted()가 호출됨StepExecution.isTerminateOnly()가 true이면 JobInterruptedException을 던짐JobInterruptedException이 AbstractStep.execute()의 catch 절까지 전파// AbstractStep.determineBatchStatus()
private static BatchStatus determineBatchStatus(Throwable e) {
if (e instanceof JobInterruptedException) {
return BatchStatus.STOPPED; // 일반 실패(FAILED)와 구분
}
return BatchStatus.FAILED;
}
JobInterruptedException은 FAILED가 아닌 STOPPED로 처리됨ExitStatus도 STOPPED로 설정되며, EXIT_MESSAGE에 JobInterruptedException 클래스명이 기록SimpleStepHandler.handleStep()에서 StepExecution이 STOPPED 상태임을 감지// SimpleStepHandler.handleStep()
if (currentStepExecution.getStatus() == BatchStatus.STOPPING
|| currentStepExecution.getStatus() == BatchStatus.STOPPED) {
execution.setStatus(BatchStatus.STOPPING);
throw new JobInterruptedException("Job interrupted by step execution");
}
JobExecution의 상태를 STOPPING으로 설정하고 다시 JobInterruptedException을 던짐AbstractJob.execute()까지 전파되어 JobExecution의 상태가 최종적으로 STOPPED로 확정stop() 호출 시점에 제어권은 Spring Batch가 아닌 비즈니스 로직(Tasklet/ItemReader 등)에 있을 수 있다.doInChunkContext())에서 checkInterrupted()를 통해 이루어짐stop() 호출 → 현재 청크 완료 → 다음 청크로 넘어가기 전에 중지KafkaItemWriter 등 롤백이 불가능한 구현체에서는 중간 중단 시 데이터 일관성이 깨질 수 있다.flowchart LR
JO[JobOperator] -->|"update()\nJobExecution = STOPPING"| JR["JobRepository\n(메타데이터 저장소)"]
JR -->|"synchronize\nJobExecution"| TS[TaskletStep]
TS -->|"throw\nJobInterruptedException"| AS[AbstractStep]
AS -->|return| SSH{"SimpleStepHandler\nif (StepExecution ==\nSTOPPED)"}
SSH -->|"throw\nJobInterruptedException"| AJ[AbstractJob]
AS -->|"update()\nStepExecution = STOPPED"| JR
AJ -->|"? update()\nJobExecution = STOPPED"| JR
executionId로 JobExecution을 조회하고, JobRegistry에서 Job 빈을 찾아 JobLauncher.run()을 호출JobLauncher 내부에 있으므로 restart() 자체는 단순한 위임 역할// SimpleJobOperator.restart()
JobExecution jobExecution = findExecutionById(executionId);
Job job = jobRegistry.getJob(jobExecution.getJobInstance().getJobName());
JobParameters parameters = jobExecution.getJobParameters();
return jobLauncher.run(job, parameters).getId(); // 동일한 JobParameters로 JobLauncher에 위임
Job을 자동 실행할 수 있다.Job을 실행하는 게이트웨이JobLaunchingMessageHandler가 JobLaunchRequest에서 Job과 JobParameters를 추출하여 JobLauncher.run()을 호출// JobLaunchingMessageHandler
public JobExecution launch(JobLaunchRequest request) throws JobExecutionException {
Job job = request.getJob();
JobParameters jobParameters = request.getJobParameters();
return jobLauncher.run(job, jobParameters); // 동일한 패턴
}
JobLaunchRequest 변환 → JobLaunchingGateway에서 Job 실행JobRegistry와 JobParametersBuilder를 사용하여 JobLaunchRequest를 생성// Spring Integration DSL로 정의한 흐름
IntegrationFlow.from(Amqp.inboundAdapter(connectionFactory, "job-requests-queue")) // RabbitMQ에서 메시지 수신
.transform(this::createJobLaunchRequest) // JSON → JobLaunchRequest 변환
.handle(jobLaunchingGateway()) // JobLauncher.run() 실행
.get();
implementation 'org.springframework.batch:spring-batch-integration'
implementation 'org.springframework.boot:spring-boot-starter-amqp'
implementation 'org.springframework.integration:spring-integration-amqp'
JobRegistry, JobParametersBuilder, JobLauncher 등 핵심 컴포넌트의 사용 방식은 커맨드라인, REST API, 메시징 기반 모두 동일