JobInstance는 Job의 논리적 실행 단위를 의미한다.Job을 실행하더라도 처리하는 데이터가 다르면 별개의 JobInstance다.
JobInstance다.JobParameters를 통해 JobInstance를 구분한다.JobParameters는 동일한 Job을 서로 다른 JobInstance로 구분하는 핵심 요소다Job + JobParameters = JobInstanceJobExecution은 JobInstance의 실제 실행 시도를 뜻한다.
JobInstance(ex. 4월 정산 배치)라도 여러 번 실행될 수 있다.JobExecution에는 다음 정보들이 포함된다.
COMPLETED,FAILED,STOPPED등)COMPLETED: 배치 작업 완료STARTING: 배치 작업이 실행되기 직전STARTED: 배치 작업이 실행 중STOPPING: 배치 작업이 중지 요청을 받아 중지 진행 중STOPPED: 배치 작업이 요청에 의해 중지된 상태FAILED: 배치 작업이 실행 중 오류로 실패ABANDONED: 배치 작업이 비정상 종료되어 재시작할 수 없는 상태UNKNOWN: 배치 작업의 상태를 확인할 수 없는 불확실한 상태JobInstance는 재실행할 수 없도록 제한한다.
JobParameters로 성공적으로 완료된 Job을 다시 실행하면 예외가 발생한다.Job 실행 시 해당 JobInstance가 과거에 성공적으로 완료된 적이 있는지 검사한다.
JobInstance의 JobExecution 중 BatchStatus가 COMPLETED인 것이 존재하는지 확인COMPLETED 상태의 JobExecution이 존재하면 JobInstanceAlreadyCompleteException을 발생시킨다.COMPLETED 상태의 JobExecution이 없는 경우에는 해당 JobInstance를 다시 실행할 수 있다.
FAILED 상태로 종료된 JobInstance는 재실행 가능하다.spring:
sql:
init:
mode: always
schema-locations: classpath:org/springframework/batch/core/schema-drop-postgresql.sql
Job을 여러 번 실행할 수 있게 하는 특수 컴포넌트JobParameters를 받아 약간 변형된 새로운 JobParameters를 반환한다.
public interface JobParametersIncrementer {
JobParameters getNext(@Nullable JobParameters parameters);
}
RunIdIncrementer
JobParametersIncrementer의 대표적인 구현체run.id라는 이름의 파라미터 값을 증가시키며 파라미터를 변형시킨다.@Bean
public Job brutalizedSystemJob() {
return new JobBuilder("brutalizedSystemJob", jobRepository)
.incrementer(new RunIdIncrementer())
.start(brutalizedSystemStep())
.build();
}
JobParameters는 단순한 데이터 전달체가 아니라 JobInstance를 식별하는 핵심 키로 작동하지만 모든 파라미터가 식별 용도로 사용되는 것은 아니다.identifying 속성이 해당 파라미터가 JobInstance 식별에 사용되는지 여부를 결정한다.{'chaos':'{value=true, type=class java.lang.Boolean, identifying=true}',
'run.id':'{value=2, type=class java.lang.Long, identifying=true}'}
기본적으로 모든 JobParameter는 identifying=true로 설정된다.
JobInstance로 취급해야 할 경우verbose=truechunk.size=1000output.path=/tmp/batch/report./gradlew bootRun --args='--spring.batch.job.name=brutalizedSystemJob chaos=true,java.lang.Boolean verbose=true,java.lang.String,false'
verbose=true,java.lang.String,false에서 마지막 false가 identifying 속성이다.
주의사항
identifying=false로 설정하면 같은 JobInstance로 인식되어 재실행이 불가능해진다.identifying=true로 유지하면 불필요하게 중복 실행이 가능해진다.Job은 기본적으로 실패한 경우 재시작이 가능하다.preventRestart()
JobBuilder의 메서드로, 설정된 Job이 재시작 불가능하도록 만든다.Job이 실패한 후 다시 실행하려고 하면 JobRestartException을 발생시킨다.@Bean
public Job brutalizedSystemJob() {
return new JobBuilder("brutalizedSystemJob", jobRepository)
.start(brutalizedSystemStep())
.preventRestart()
.build();
}
preventRestart() 설정이 적용된 Job은 어떤 이유로든 실패하면 같은 파라미터로는 다시 실행할 수 없다.RunIdIncrementer와 함께 사용 시 preventRestart() 설정이 의미가 없어진다.
RunIdIncrementer는 매번 다른 run.id 값을 생성하여 새로운 JobInstance를 만든다.StepExecution은 단일 Step의 실행 시도를 나타내는 객체다.Job이 여러 Step으로 구성될 수 있기 때문에, 하나의 JobExecution은 여러 개의 StepExecution을 포함할 수 있다.JobExecution {
StepExecution("step1")
StepExecution("step2")
StepExecution("step3")
}
StepExecution은 Step 실행 시 생성되며, 해당 Step이 실제로 시작될 때만 생성된다.
Step이 실패하면 두 번째 Step은 실행되지 않으므로 두 번째 Step에 대한 StepExecution도 생성되지 않는다.StepExecution은 다음과 같은 주요 실행 정보를 포함한다.
Step이 현재 어떤 상태인지를 나타내는 BatchStatusStep 실행의 시간적 정보Step 실행의 최종 결과 코드StepExecution에도 현재 실행 상태를 의미하는 BatchStatus가 사용된다.
StepExecution도 JobExecution과 마찬가지로 실행 중 다양한 상태를 가질 수 있기 때문JobExecution의 최종 BatchStatus는 해당 JobExecution에서 가장 마지막에 실행된 StepExecution의 BatchStatus 값을 기준으로 결정된다.Step을 다시 실행할 때 새로운 StepExecution이 생성된다.
StepExecution이 JobExecution에 종속되어 있기 때문JobExecution이 생성되면 그에 속한 모든 StepExecution도 새롭게 생성된다.Job을 재시작할 때는 실패한 Step부터 다시 시작된다.
Step은 재실행되지 않으므로 새로운 StepExecution도 생성되지 않는다.JobExecution#1 (FAILED) {
StepExecution#1 ("step1", COMPLETED)
StepExecution#2 ("step2", FAILED) // 실패
}
JobExecution#2 (COMPLETED) {
// StepExecution#1 ("step1")은 이미 성공했으므로 다시 생성되지 않음
StepExecution#3 ("step2", COMPLETED) // 실패한 step2부터 재시작
}
StepExecution 정보는 단순한 로깅 이상의 가치가 있다.
Step의 처리 시간과 처리량을 분석하여 병목 지점 식별ExecutionContext
Map 인터페이스를 구현히야 key-value 형태의 데이터를 관리한다.// ExecutionContext 예시
ExecutionContext {
"processingIndex": 42500, // 마지막으로 처리한 항목 인덱스
"totalAmount": 2750000.00, // 중간 집계 결과
"lastProcessedId": "TRX-20240315-789", // 마지막으로 처리한 거래 ID
}
ExecutionContext의 두 가지 범위
ExecutionContext는 JobExecution, StepExecution 각각 별도로 존재한다.JobExecution의 ExecutionContext는 모든 StepExecution에서 접근 가능하지만, StepExecution의 ExecutionContext는 해당 StepExecution 내에서만 유효하다.BATCH_JOB_INSTANCEBATCH_JOB_EXECUTIONBATCH_JOB_EXECUTION_PARAMSBATCH_JOB_EXECUTION_CONTEXTBATCH_STEP_EXECUTIONBATCH_STEP_EXECUTION_CONTEXTJobInstance를 저장하는 테이블| 컬럼명 | 설명 | 키 타입 |
|---|---|---|
| JOB_INSTANCE_ID | Job 인스턴스의 고유 식별자 | PK |
| JOB_NAME | Job 이름. JobInstance 식별에 반드시 필요 | |
| JOB_KEY | JobParameters의 해시값 | |
| VERSION | 낙관적 락(Optimistic Lock) 버전. JobInstance의 경우 항상 0으로 유지됨 |
JOB_NAME과 JOB_KEY 컬럼에는 유니크 제약조건이 걸려있다.
Job 이름과 파라미터 조합으로는 하나의 JobInstance만 존재할 수 있음JobInstance는 Job의 이름(jobName)과 해당 JobInstance에 속한 모든 실행 시도들(jobExecutions)을 포함JobInstance의 실제 실행 시도를 나타내는 JobExecution을 저장하는 테이블| 컬럼명 | 설명 | 키 타입 |
|---|---|---|
| JOB_EXECUTION_ID | 작업 실행의 고유 식별자 | PK |
| VERSION | 낙관적 락 버전 | |
| JOB_INSTANCE_ID | 연관된 JobInstance의 ID | FK |
| CREATE_TIME | JobExecution 생성 시간 | |
| START_TIME | JobExecution 시작 시간 | |
| END_TIME | JobExecution 종료 시간 | |
| STATUS | JobExecution 현재 상태(BatchStatus) | |
| EXIT_CODE | JobExecution 종료 코드 | |
| EXIT_MESSAGE | JobExecution 종료 메시지(오류 포함) | |
| LAST_UPDATED | 마지막 업데이트 시간 |
JOB_INSTANCE_ID 컬럼이 FK로 지정되어 있다.
JobInstance는 여러 번 실행될 수 있으므로 동일한 JOB_INSTANCE_ID 값을 갖는 여러 BATCH_JOB_EXECUTION 레코드가 존재할 수 있다.VERSION 컬럼은 동시성 제어를 위한 낙관적 락 메커니즘으로 사용된다.
VERSION 컬럼으로 인해 OptimisticLockingFailureException 예외가 발생JobExecution 은 JobParameters, JobInstance와 같은 핵심 객체와 연결되어 있으며, 자신만의 ExecutionContext를 가진다.JobExecution이 여러 개의 StepExecution을 포함JobParameters를 저장하는 테이블| 컬럼명 | 설명 | 키 타입 |
|---|---|---|
| JOB_EXECUTION_ID | 작업 실행의 ID | FK |
| PARAMETER_NAME | 파라미터 이름 | |
| PARAMETER_TYPE | 파라미터 타입 | |
| PARAMETER_VALUE | 파라미터 값 | |
| IDENTIFYING | JobInstance 식별에 사용 여부 |
IDENTIFYING 컬럼은 해당 파라미터가 JobInstance를 식별하는 데 사용되는지 여부를(Y 또는 N).이 테이블은 JOB_INSTANCE_ID가 아닌 JOB_EXECUTION_ID를 참조한다.
Job을 실행할 때 여러 개의 JobParameter를 전달할 수 있다.JobParameters 객체 내부에서는 이를 Map 컬렉션으로 관리하지만, 데이터베이스에 저장될 때는 각 JobParameter가 테이블의 개별 레코드로 저장된다.StepExecution을 저장하는 테이블| 컬럼명 | 설명 | 키 타입 |
|---|---|---|
| STEP_EXECUTION_ID | StepExecution 고유 식별자 | PK |
| VERSION | 낙관적 락 버전 | |
| STEP_NAME | Step 이름 | |
| JOB_EXECUTION_ID | 연관된 JobExecution의 ID | FK |
| CREATE_TIME | 실행 레코드 생성 시간 | |
| START_TIME | StepExecution 시작 시간 | |
| END_TIME | StepExecution 종료 시간 | |
| STATUS | StepExecution의 현재 상태(BatchStatus) | |
| COMMIT_COUNT | 커밋 횟수 | |
| READ_COUNT | 읽은 아이템 수 | |
| FILTER_COUNT | 필터링된 아이템 수 | |
| WRITE_COUNT | 쓴 아이템 수 | |
| READ_SKIP_COUNT | 읽기 건너뛴 수 | |
| WRITE_SKIP_COUNT | 쓰기 건너뛴 수 | |
| PROCESS_SKIP_COUNT | 처리 건너뛴 수 | |
| ROLLBACK_COUNT | 롤백 횟수 | |
| EXIT_CODE | StepExecution 종료 코드 | |
| EXIT_MESSAGE | StepExecution 종료 메시지 | |
| LAST_UPDATED | 마지막 업데이트 시간 |
JOB_EXECUTION_ID 컬럼이 FK로 지정되어 있다.
BATCH_JOB_EXECUTION 테이블을 참조하며, 각 StepExecution이 어떤 JobExecution에 속하는지를 나타낸다.JobExecution과 마찬가지로 StepExecution도 자신만의 ExecutionContext를 가지고 있어, Step 단위로 상태를 저장하고 복원할 수 있다.JobExecution 수준의 ExecutionContext는 BATCH_JOB_EXECUTION_CONTEXT 테이블에 저장StepExecution 수준의 ExecutionContext는 BATCH_STEP_EXECUTION_CONTEXT 테이블에 저장| 컬럼명 | 설명 | 키 타입 |
|---|---|---|
| JOB_EXECUTION_ID 또는 STEP_EXECUTION_ID | 각각 JobExecution 또는 StepExecution의 ID | PK, FK |
| SHORT_CONTEXT | 직렬화된 ExecutionContext의 문자열 버전 | |
| SERIALIZED_CONTEXT | 전체 컨텍스트, 직렬화된 형태 |
ExecutionContext의 데이터는 직렬화되어 문자열로 변환된다.
SHORT_CONTEXT에만 저장되고 SERIALIZED_CONTEXT는 NULL로 설정SHORT_CONTEXT에는 잘린 버전(약 2492자 + “…“)이 저장되고, 전체 내용은 SERIALIZED_CONTEXT에 CLOB 형태로 저장JobLauncher는 Job을 실행시키는 컴포넌트다.Job과 JobParameters를 받아 실행하고, 그 결과로 JobExecution을 반환한다.TaskExecutorJobLauncher다.JobRepository를 사용해 메타데이터 저장소로부터 가장 최근 JobExecution을 가져온다.JobExecution lastExecution = jobRepository.getLastJobExecution(job.getName(), jobParameters);
lastExecution이 null이 아니라면 재시작 케이스다.Job의 restartable 필드 값을 검사한다.
JobBuilder.preventRestart() 메서드를 호출한 경우 JobRestartException 발생if (lastExecution != null) {
if (!job.isRestartable()) {
throw new JobRestartException("JobInstance already exists and is not restartable");
}
}
JobParametersValidator를 사용해 JobParameters를 검증한다.
JobExecution 객체 생성을 방지job.getJobParametersValidator().validate(jobParameters);
jobExecution = jobRepository.createJobExecution(job.getName(), jobParameters);
JobRepository의 createJobExecution() 메서드를 호출해 현재 실행에 대한 JobExecution을 생성한다.
JobExecution은 항상 새로 만든다.jobName과 jobParameters로 생성된 JobInstance가 있는지 찾아본다.public JobExecution createJobExecution(String jobName, JobParameters jobParameters) {
JobInstance jobInstance = jobInstanceDao.getJobInstance(jobName, jobParameters);
ExecutionContext executionContext;
if (jobInstance != null) {
// 재시작 케이스
} else {
// 첫 실행 케이스
}
...
}
jobInstance != null)
JobInstance의 이전 실행 이력을 조회JobInstance가 존재하면 비정상 상태이므로 IllegalStateExceptionJobExecution)들을 순회하며 BatchStatus를 검사COMPLETED이거나 ABANDONED인 경우 JobInstanceAlreadyCompleteExceptionJobExecution의 ExecutionContext를 조회jobInstance == null)
JobInstance를 새로 생성하고 메타데이터 저장소에 저장ExecutionContext도 빈(empty) 상태로 새로 생성JobExecution 생성 및 저장JobExecution jobExecution = new JobExecution(jobInstance, jobParameters);
jobExecution.setExecutionContext(executionContext);
jobExecution.setLastUpdated(LocalDateTime.now());
jobExecutionDao.saveJobExecution(jobExecution);
ecDao.saveExecutionContext(jobExecution);
return jobExecution;
JobExecution을 초기화하고 메타데이터 저장소에 저장
ExecutionContext도 함께 저장createJobExecution() 메서드 정리
JobInstance의 재실행을 막고, 재시작 상황에서의 상태 복원까지 담당JobExecution을 만들되, 재시작일 때는 이전 ExecutionContext를 그대로 사용JobInstance와 JobExecution이 생성되면 이제 남은 것은 실제 Job을 실행하는 일이다.TaskExecutor 구현체를 사용해 Job을 실행한다.// TaskExecutorJobLauncher.run()
taskExecutor.execute(new Runnable() {
@Override
public void run() {
job.execute(jobExecution);
}
});
public final void execute(JobExecution execution) {
JobSynchronizationManager.register(execution); // JobScope 활성화
try {
jobParametersValidator.validate(execution.getJobParameters());
if (execution.getStatus() != BatchStatus.STOPPING) {
execution.setStartTime(LocalDateTime.now());
updateStatus(execution, BatchStatus.STARTED);
listener.beforeJob(execution);
try {
doExecute(execution);
}
catch (RepeatException e) {
throw e.getCause();
}
}
else {
// STOPPING 상태 처리 (Running Job 중지 시나리오)
}
}
...
}
JobSynchronizationManager.register()
JobExecution과 JobContext를 현재 스레드에 바인딩JobScope가 활성화된다.@JobScope 빈들이 정상적으로 생성되고 Job 관련 컨텍스트 정보에 접근할 수 있게 됨AbstractStep에서 StepSynchronizationManager.register()를 호출하는 것과 동일한 패턴JobParametersValidator 중복 호출
TaskExecutorJobLauncher에서 이미 검증했음에도 다시 호출Job.execute()가 JobLauncher 외의 경로로 직접 호출될 수 있기 때문BatchStatus.STOPPING이 아닌 경우의 처리
JobExecution은 이 시점에서 BatchStatus.STARTING 상태BatchStatus.STARTED로 변경JobExecutionListener.beforeJob() 호출 후 doExecute() 실행protected void doExecute(JobExecution execution)
throws JobInterruptedException, JobRestartException, StartLimitExceededException {
StepExecution stepExecution = null;
for (Step step : steps) {
stepExecution = handleStep(step, execution);
if (stepExecution.getStatus() != BatchStatus.COMPLETED) {
break;
}
}
if (stepExecution != null) {
execution.upgradeStatus(stepExecution.getStatus());
execution.setExitStatus(stepExecution.getExitStatus());
}
}
Job에 구성된 모든 Step들을 순차적으로 실행
Step이 완료되면 다음 Step으로 진행@Bean
public Job exampleJob() {
return new JobBuilder("exampleJob", jobRepository)
.start(firstStep())
.next(secondStep())
.build();
}
handleStep() 메서드
StepHandler 구현체를 사용해 Step을 실행protected final StepExecution handleStep(Step step, JobExecution execution)
throws JobInterruptedException, JobRestartException, StartLimitExceededException {
return stepHandler.handleStep(step, execution);
}
JobExecution 상태 결정
StepExecution의 상태로 JobExecution의 상태를 업데이트Step이 성공하면 JobExecution도 COMPLETEDStep이 실패하면 해당 StepExecution의 상태가 JobExecution의 최종 상태가 된다.SimpleJob.doExecute()에서 BatchStatus 외에 ExitStatus라는 상태값도 설정한다.BATCH_JOB_EXECUTION, BATCH_STEP_EXECUTION 테이블의 EXIT_CODE 컬럼이 이 값에 해당| 구분 | BatchStatus |
ExitStatus |
|---|---|---|
| 의미 | 작업의 현재 진행 상태 | 실행이 종료된 시점의 결과 |
| 변화 | STARTING → STARTED → … 계속 변화 |
종료 시점에 최종 결과로 설정 |
| 테이블 컬럼 | STATUS |
EXIT_CODE |
SimpleJob.doExecute()가 완료되면 AbstractJob.execute()의 finally 절이 실행된다.finally {
// JobExecution에 종료 시간 기록
execution.setEndTime(LocalDateTime.now());
// 사용자 정의 후처리 로직 실행
listener.afterJob(execution);
jobRepository.update(execution);
// `JobScope` 비활성화
JobSynchronizationManager.release();
}
AbstractJob에서 Step을 실행할 때 handeStep() 메서드를 사용했었다.
stepHandler.handleStep(step, execution);Step의 실행의 전체 라이프사이클은 StepHandler가 관리한다.
SimpleStepHandlerpublic StepExecution handleStep(Step step, JobExecution execution) {
JobInstance jobInstance = execution.getJobInstance();
StepExecution lastStepExecution = jobRepository.getLastStepExecution(jobInstance, step.getName());
// ...
StepExecution currentStepExecution = lastStepExecution;
if (shouldStart(lastStepExecution, execution, step)) {
// ...
}
return currentStepExecution;
}
protected boolean shouldStart(StepExecution lastStepExecution, JobExecution jobExecution, Step step)
throws JobRestartException, StartLimitExceededException {
BatchStatus stepStatus;
if (lastStepExecution == null) { // 이전 실행이 없으면 실행 가능
stepStatus = BatchStatus.STARTING;
}
else {
stepStatus = lastStepExecution.getStatus(); // 이전 실행 상태 있으면 할당
}
// 이전 실행이 COMPLETED이고 alowStartIfComplete가 false라면 실행하지 않음
// 또는 이전 실행이 ABANDONED면 절대 재시작 X
if ((stepStatus == BatchStatus.COMPLETED && !step.isAllowStartIfComplete())
|| stepStatus == BatchStatus.ABANDONED) {
return false;
}
// 재시작 가능 상태면 startLimit 초과 여부를 검사
if (jobRepository.getStepExecutionCount(jobExecution.getJobInstance(), step.getName()) < step.getStartLimit()) {
return true;
}
else { // 초과하면 예외
throw new StartLimitExceededException("...");
}
}
// StepBuilder 설정
builder.allowStartIfComplete(true) // 완료된 Step도 재실행 가능. 기본값: false
builder.startLimit(10) // 최대 실행 시도 횟수 제한. 기본값: Integer.MAX_VALUE
lastStepExecution이 COMPLETED일 수 있는 시나리오
Job이 여러 Step으로 구성되어 있고 후속 Step이 실패한 경우allowStartIfComplete를 true로 설정해야 하는 경우
Step이 이전 Step의 실행 결과에 의존하는 경우Step #2가 Step #1의 데이터에 의존한다면 Step #1부터 다시 실행해야 함Step 간의 데이터 의존성이나 비즈니스 요구사항에 따라 결정shouldStart()가 true를 반환한 이후의 흐름public StepExecution handleStep(Step step, JobExecution execution) {
// ...
StepExecution currentStepExecution = lastStepExecution;
if (shouldStart(lastStepExecution, execution, step)) {
// 재시작 여부를 떠나 StepExecution은 매번 새로 생성
currentStepExecution = execution.createStepExecution(step.getName());
boolean isRestart = (lastStepExecution != null
&& !lastStepExecution.getStatus().equals(BatchStatus.COMPLETED));
if (isRestart) { // 재시작인 경우: 이전 `StepExecution`의 `ExecutionContext`를 그대로 사용
currentStepExecution.setExecutionContext(lastStepExecution.getExecutionContext());
}
else { // 첫 실행인 경우: 새로운 `ExecutionContext`를 생성
currentStepExecution.setExecutionContext(new ExecutionContext(executionContext));
}
// 메타데이터 저장소에 `StepExecution`과 `ExecutionContext` 즉시 저장
jobRepository.add(currentStepExecution);
try {
step.execute(currentStepExecution); // 실제 `Step` 실행을 시작
currentStepExecution.getExecutionContext().put("batch.executed", true);
}
catch (JobInterruptedException e) {
// ...
}
// 실행 완료 후 `ExecutionContext`에 `batch.executed` 플래그를 설정하고 메타데이터 저장소에 반영
jobRepository.updateExecutionContext(execution);
}
return currentStepExecution;
}
public final void execute(StepExecution stepExecution)
throws JobInterruptedException, UnexpectedJobExecutionException {
stepExecution.setStartTime(LocalDateTime.now());
stepExecution.setStatus(BatchStatus.STARTED);
getJobRepository().update(stepExecution); // 시작 시간과 BatchStatus 즉시 반영
// ...
ExitStatus exitStatus = ExitStatus.EXECUTING; // ExitStatus는 아직 StepExecution에 반영되지 않음
doExecute(stepExecution); // TaskletStep.doExecute() 실행
// ...
}
BatchStatus는 설정할 때마다 즉시 메타데이터 저장소에 반영하는 패턴ExitStatus는 Step 실행 중에는 StepExecution에 반영되지 않고, 실행이 완료된 후에만 저장TaskletStep.doExecute()에서 ExecutionContext에 타입 정보를 저장한 후 청크 반복 처리를 시작ChunkTransactionCallback.doInTransaction()에서 트랜잭션 내에서 실행public RepeatStatus doInTransaction(TransactionStatus status) {
StepContribution contribution = stepExecution.createStepContribution(); // 청크 처리 결과 컨테이너
try {
result = tasklet.execute(contribution, chunkContext); // Tasklet 실행
}
finally {
stepExecution.apply(contribution); // 실패해도 그 시점까지의 통계를 StepExecution에 반영
}
// 청크 성공 시
stream.update(stepExecution.getExecutionContext()); // ItemStream 상태를 ExecutionContext에 저장
stepExecution.incrementCommitCount();
getJobRepository().update(stepExecution); // 메타데이터 저장소에 저장
}
AbstractStep으로 돌아온다.public final void execute(StepExecution stepExecution) {
// ...
doExecute(stepExecution); // TaskletStep.doExecute() 완료
// 정상 완료 시 ExitStatus와 BatchStatus를 COMPLETED로 설정
exitStatus = ExitStatus.COMPLETED.and(stepExecution.getExitStatus());
stepExecution.upgradeStatus(BatchStatus.COMPLETED);
}
TaskletStep의 rollback() 메서드가 호출된다.// TaskletStep.rollback()
private void rollback(StepExecution stepExecution) {
if (!rolledBack) { // 각 청크에 대해 rollbackCount는 한 번만 증가
stepExecution.incrementRollbackCount();
rolledBack = true;
}
}
rollback() 메서드는 트랜잭션을 롤백하는 것이 아니라 롤백 카운트만 증가시킨다.
TransactionTemplate이 수행AbstractStep.execute()의 catch 절까지 전파// AbstractStep.execute()
catch (Throwable e) {
stepExecution.upgradeStatus(determineBatchStatus(e)); // BatchStatus.FAILED
exitStatus = exitStatus.and(getDefaultExitStatusForFailure(e)); // ExitStatus.FAILED (로컬 변수에만 저장)
stepExecution.addFailureException(e);
}
BatchStatus가 결정되어 StepExecution에 즉시 반영ExitStatus는 이 시점에서는 로컬 변수에만 저장되고, StepExecution에는 아직 반영되지 않음// AbstractStep.execute()
finally {
stepExecution.setEndTime(LocalDateTime.now()); // 종료 시간 설정
// afterStep() 호출 전에 ExitStatus를 StepExecution에 반영 (afterStep()에서 참조할 수 있도록)
exitStatus = exitStatus.and(stepExecution.getExitStatus());
stepExecution.setExitStatus(exitStatus);
// StepExecutionListener.afterStep()은 ExitStatus를 반환하여 최종 ExitStatus를 변경
exitStatus = exitStatus.and(getCompositeListener().afterStep(stepExecution));
try {
getJobRepository().updateExecutionContext(stepExecution); // Step ExecutionContext 저장
}
catch (Exception e) {
stepExecution.setStatus(BatchStatus.UNKNOWN); // 메타데이터 저장소 통신 실패 시 UNKNOWN
exitStatus = exitStatus.and(ExitStatus.UNKNOWN);
stepExecution.addFailureException(e);
}
stepExecution.setExitStatus(exitStatus); // 최종 ExitStatus 반영
try {
getJobRepository().update(stepExecution); // StepExecution 최종 상태 저장
}
catch (Exception e) {
stepExecution.setStatus(BatchStatus.UNKNOWN);
stepExecution.setExitStatus(exitStatus.and(ExitStatus.UNKNOWN));
stepExecution.addFailureException(e);
}
close(stepExecution.getExecutionContext()); // ItemStream.close()
}
StepExecutionListener.afterStep()은 ExitStatus를 반환하므로, Step의 ExitStatus를 변경하는 수단이 될 수 있다.ExecutionContext 업데이트 중 예외 발생 시 BatchStatus와 ExitStatus를 모두 UNKNOWN으로 설정
Step 실행이 완료되면 SimpleStepHandler.handleStep()에서 JobExecution의 ExecutionContext를 메타데이터 저장소에 업데이트하고 StepExecution을 반환// SimpleStepHandler.handleStep()
step.execute(currentStepExecution);
currentStepExecution.getExecutionContext().put("batch.executed", true);
jobRepository.updateExecutionContext(execution); // Job ExecutionContext 저장
return currentStepExecution;
SimpleJob.doExecute()에서 마지막 StepExecution의 상태를 JobExecution에 반영// SimpleJob.doExecute()
if (stepExecution != null) {
execution.upgradeStatus(stepExecution.getStatus());
execution.setExitStatus(stepExecution.getExitStatus());
}
AbstractJob.execute()의 finally 절에서 최종 마무리// AbstractJob.execute()
finally {
execution.setEndTime(LocalDateTime.now()); // 종료 시간
listener.afterJob(execution); // JobExecutionListener.afterJob()
jobRepository.update(execution); // JobExecution 최종 상태 메타데이터 저장소에 저장
}
| 항목 | 내용 |
|---|---|
StepExecution 생성 |
매번 새로 생성. 재시작 시에도 새로 생성하되 이전 ExecutionContext를 복구 |
BatchStatus 반영 |
실행 상태 변경 시마다 즉시 메타데이터 저장소에 반영 |
ExitStatus 반영 |
Step 실행 완료 후에만 StepExecution에 설정되어 저장 |
StepContribution |
각 청크마다 생성. 실패해도 그 시점까지의 처리 정보는 안전하게 기록 |
| 롤백 처리 | 예외 시 롤백 카운트 증가. 실제 트랜잭션 롤백은 TransactionTemplate이 담당 |
| 마무리 | 종료 시간 설정 → afterStep() → ExecutionContext 저장 → StepExecution 저장 → ItemStream.close() |