TIL

Job과 메타데이터 저장소

Spring Batch의 메타데이터 저장소

Spring Batch 핵심 도메인

JobInstance

JobParameters

JobExecution

BatchStatus

JobInstance 재실행 제한

재실행 검사 메커니즘

메타데이터 테이블 초기화 설정

spring:
  sql:
    init:
      mode: always
      schema-locations: classpath:org/springframework/batch/core/schema-drop-postgresql.sql

JobParametersIncrementer

public interface JobParametersIncrementer {  
   JobParameters getNext(@Nullable JobParameters parameters);  
}
@Bean
public Job brutalizedSystemJob() {
    return new JobBuilder("brutalizedSystemJob", jobRepository)
            .incrementer(new RunIdIncrementer())
            .start(brutalizedSystemStep())
            .build();
}

JobParameter의 identifying 속성

{'chaos':'{value=true, type=class java.lang.Boolean, identifying=true}',
 'run.id':'{value=2, type=class java.lang.Long, identifying=true}'}
./gradlew bootRun --args='--spring.batch.job.name=brutalizedSystemJob chaos=true,java.lang.Boolean verbose=true,java.lang.String,false'

restartable 설정

@Bean
public Job brutalizedSystemJob() {
    return new JobBuilder("brutalizedSystemJob", jobRepository)
            .start(brutalizedSystemStep())
            .preventRestart()
            .build();
}

StepExecution

JobExecution {
    StepExecution("step1")
    StepExecution("step2")
    StepExecution("step3")
}

JobExecution과 StepExecution의 BatchStatus

실패와 재시작 시 StepExecution의 동작

JobExecution#1 (FAILED) {
    StepExecution#1 ("step1", COMPLETED)
    StepExecution#2 ("step2", FAILED)    // 실패
}

JobExecution#2 (COMPLETED) {
    // StepExecution#1 ("step1")은 이미 성공했으므로 다시 생성되지 않음
    StepExecution#3 ("step2", COMPLETED) // 실패한 step2부터 재시작
}

ExecutionContext

// ExecutionContext 예시
ExecutionContext {
    "processingIndex": 42500,              // 마지막으로 처리한 항목 인덱스
    "totalAmount": 2750000.00,             // 중간 집계 결과
    "lastProcessedId": "TRX-20240315-789", // 마지막으로 처리한 거래 ID
}

배치 메타데이터 테이블 구조

BATCH_JOB_INSTANCE

컬럼명 설명 키 타입
JOB_INSTANCE_ID Job 인스턴스의 고유 식별자 PK
JOB_NAME Job 이름. JobInstance 식별에 반드시 필요  
JOB_KEY JobParameters의 해시값  
VERSION 낙관적 락(Optimistic Lock) 버전. JobInstance의 경우 항상 0으로 유지됨  

BATCH_JOB_EXECUTION

컬럼명 설명 키 타입
JOB_EXECUTION_ID 작업 실행의 고유 식별자 PK
VERSION 낙관적 락 버전  
JOB_INSTANCE_ID 연관된 JobInstance의 ID FK
CREATE_TIME JobExecution 생성 시간  
START_TIME JobExecution 시작 시간  
END_TIME JobExecution 종료 시간  
STATUS JobExecution 현재 상태(BatchStatus)  
EXIT_CODE JobExecution 종료 코드  
EXIT_MESSAGE JobExecution 종료 메시지(오류 포함)  
LAST_UPDATED 마지막 업데이트 시간  

BATCH_JOB_EXECUTION_PARAMS

컬럼명 설명 키 타입
JOB_EXECUTION_ID 작업 실행의 ID FK
PARAMETER_NAME 파라미터 이름  
PARAMETER_TYPE 파라미터 타입  
PARAMETER_VALUE 파라미터 값  
IDENTIFYING JobInstance 식별에 사용 여부  

BATCH_STEP_EXECUTION

컬럼명 설명 키 타입
STEP_EXECUTION_ID StepExecution 고유 식별자 PK
VERSION 낙관적 락 버전  
STEP_NAME Step 이름  
JOB_EXECUTION_ID 연관된 JobExecution의 ID FK
CREATE_TIME 실행 레코드 생성 시간  
START_TIME StepExecution 시작 시간  
END_TIME StepExecution 종료 시간  
STATUS StepExecution의 현재 상태(BatchStatus)  
COMMIT_COUNT 커밋 횟수  
READ_COUNT 읽은 아이템 수  
FILTER_COUNT 필터링된 아이템 수  
WRITE_COUNT 쓴 아이템 수  
READ_SKIP_COUNT 읽기 건너뛴 수  
WRITE_SKIP_COUNT 쓰기 건너뛴 수  
PROCESS_SKIP_COUNT 처리 건너뛴 수  
ROLLBACK_COUNT 롤백 횟수  
EXIT_CODE StepExecution 종료 코드  
EXIT_MESSAGE StepExecution 종료 메시지  
LAST_UPDATED 마지막 업데이트 시간  

BATCH_JOB_EXECUTION_CONTEXT & BATCH_STEP_EXECUTION_CONTEXT

컬럼명 설명 키 타입
JOB_EXECUTION_ID 또는 STEP_EXECUTION_ID 각각 JobExecution 또는 StepExecution의 ID PK, FK
SHORT_CONTEXT 직렬화된 ExecutionContext의 문자열 버전  
SERIALIZED_CONTEXT 전체 컨텍스트, 직렬화된 형태  

Job Squad

JobLauncher: 배치 작업의 시작점

TaskExecutorJobLauncher.run() 메서드 분석

JobExecution lastExecution = jobRepository.getLastJobExecution(job.getName(), jobParameters);
if (lastExecution != null) {
    if (!job.isRestartable()) {
        throw new JobRestartException("JobInstance already exists and is not restartable");
    }
}
job.getJobParametersValidator().validate(jobParameters);
jobExecution = jobRepository.createJobExecution(job.getName(), jobParameters);

SimpleJobRepository.createJobExecution() 분석

public JobExecution createJobExecution(String jobName, JobParameters jobParameters) {
    JobInstance jobInstance = jobInstanceDao.getJobInstance(jobName, jobParameters);
    ExecutionContext executionContext;

    if (jobInstance != null) {
        // 재시작 케이스
    } else {
        // 첫 실행 케이스
    }
    ...
}
JobExecution jobExecution = new JobExecution(jobInstance, jobParameters);
jobExecution.setExecutionContext(executionContext);
jobExecution.setLastUpdated(LocalDateTime.now());

jobExecutionDao.saveJobExecution(jobExecution);
ecDao.saveExecutionContext(jobExecution);

return jobExecution;

Job 실행으로의 전환

// TaskExecutorJobLauncher.run()
taskExecutor.execute(new Runnable() {
    @Override
    public void run() {
        job.execute(jobExecution);
    }
});

AbstractJob.execute()

public final void execute(JobExecution execution) {
    JobSynchronizationManager.register(execution);  // JobScope 활성화

    try {
        jobParametersValidator.validate(execution.getJobParameters());

        if (execution.getStatus() != BatchStatus.STOPPING) {
            execution.setStartTime(LocalDateTime.now());
            updateStatus(execution, BatchStatus.STARTED);

            listener.beforeJob(execution);

            try {
                doExecute(execution);
            }
            catch (RepeatException e) {
                throw e.getCause();
            }
        }
        else {
            // STOPPING 상태 처리 (Running Job 중지 시나리오)
        }
    }
    ...
}

SimpleJob.doExecute()

protected void doExecute(JobExecution execution)
        throws JobInterruptedException, JobRestartException, StartLimitExceededException {

    StepExecution stepExecution = null;

    for (Step step : steps) {
        stepExecution = handleStep(step, execution);
        if (stepExecution.getStatus() != BatchStatus.COMPLETED) {
            break;
        }
    }

    if (stepExecution != null) {
        execution.upgradeStatus(stepExecution.getStatus());
        execution.setExitStatus(stepExecution.getExitStatus());
    }
}
@Bean
public Job exampleJob() {
    return new JobBuilder("exampleJob", jobRepository)
            .start(firstStep())
            .next(secondStep())
            .build();
}
protected final StepExecution handleStep(Step step, JobExecution execution)
        throws JobInterruptedException, JobRestartException, StartLimitExceededException {
    return stepHandler.handleStep(step, execution);
}

BatchStatus와 ExitStatus

구분 BatchStatus ExitStatus
의미 작업의 현재 진행 상태 실행이 종료된 시점의 결과
변화 STARTINGSTARTED → … 계속 변화 종료 시점에 최종 결과로 설정
테이블 컬럼 STATUS EXIT_CODE

AbstractJob.execute() - finally

finally {
    // JobExecution에 종료 시간 기록
    execution.setEndTime(LocalDateTime.now());
    
    // 사용자 정의 후처리 로직 실행
    listener.afterJob(execution);
    
    jobRepository.update(execution);
    
    // `JobScope` 비활성화
    JobSynchronizationManager.release();
}

Step Squad

SimpleStepHandler

public StepExecution handleStep(Step step, JobExecution execution) {
    JobInstance jobInstance = execution.getJobInstance();
    StepExecution lastStepExecution = jobRepository.getLastStepExecution(jobInstance, step.getName());
    
    // ...
    
    StepExecution currentStepExecution = lastStepExecution;
        
    if (shouldStart(lastStepExecution, execution, step)) {
        // ...
    }
    return currentStepExecution;
}

shouldStart() - Step 실행 가능 여부 판정

protected boolean shouldStart(StepExecution lastStepExecution, JobExecution jobExecution, Step step)
        throws JobRestartException, StartLimitExceededException {

    BatchStatus stepStatus;

    if (lastStepExecution == null) { // 이전 실행이 없으면 실행 가능
        stepStatus = BatchStatus.STARTING;
    }
    else {
        stepStatus = lastStepExecution.getStatus(); // 이전 실행 상태 있으면 할당
    }

    // 이전 실행이 COMPLETED이고 alowStartIfComplete가 false라면 실행하지 않음
    // 또는 이전 실행이 ABANDONED면 절대 재시작 X
    if ((stepStatus == BatchStatus.COMPLETED && !step.isAllowStartIfComplete())
            || stepStatus == BatchStatus.ABANDONED) {
        return false;
    }

    // 재시작 가능 상태면 startLimit 초과 여부를 검사
    if (jobRepository.getStepExecutionCount(jobExecution.getJobInstance(), step.getName()) < step.getStartLimit()) {
        return true;
    }
    else { // 초과하면 예외
        throw new StartLimitExceededException("...");
    }
}
// StepBuilder 설정
builder.allowStartIfComplete(true)  // 완료된 Step도 재실행 가능. 기본값: false
builder.startLimit(10)              // 최대 실행 시도 횟수 제한. 기본값: Integer.MAX_VALUE

SimpleStepHandler.handleStep() - Step 실행 준비

public StepExecution handleStep(Step step, JobExecution execution) {
    // ...
    StepExecution currentStepExecution = lastStepExecution;

    if (shouldStart(lastStepExecution, execution, step)) {
        // 재시작 여부를 떠나 StepExecution은 매번 새로 생성
        currentStepExecution = execution.createStepExecution(step.getName());

        boolean isRestart = (lastStepExecution != null
                && !lastStepExecution.getStatus().equals(BatchStatus.COMPLETED));

        if (isRestart) { // 재시작인 경우: 이전 `StepExecution`의 `ExecutionContext`를 그대로 사용
            currentStepExecution.setExecutionContext(lastStepExecution.getExecutionContext());
        }
        else { // 첫 실행인 경우: 새로운 `ExecutionContext`를 생성
            currentStepExecution.setExecutionContext(new ExecutionContext(executionContext));
        }

        // 메타데이터 저장소에 `StepExecution`과 `ExecutionContext` 즉시 저장
        jobRepository.add(currentStepExecution);

        try {
            step.execute(currentStepExecution); // 실제 `Step` 실행을 시작
            currentStepExecution.getExecutionContext().put("batch.executed", true);
        }
        catch (JobInterruptedException e) {
            // ...
        }

        // 실행 완료 후 `ExecutionContext`에 `batch.executed` 플래그를 설정하고 메타데이터 저장소에 반영
        jobRepository.updateExecutionContext(execution);
    }

    return currentStepExecution;
}

AbstractStep.execute() - Step 실행 시작

public final void execute(StepExecution stepExecution)
        throws JobInterruptedException, UnexpectedJobExecutionException {

    stepExecution.setStartTime(LocalDateTime.now());
    stepExecution.setStatus(BatchStatus.STARTED);
    getJobRepository().update(stepExecution); // 시작 시간과 BatchStatus 즉시 반영

    // ...

    ExitStatus exitStatus = ExitStatus.EXECUTING; // ExitStatus는 아직 StepExecution에 반영되지 않음

    doExecute(stepExecution); // TaskletStep.doExecute() 실행

    // ...
}

TaskletStep.doExecute() & ChunkTransactionCallback

public RepeatStatus doInTransaction(TransactionStatus status) {
    StepContribution contribution = stepExecution.createStepContribution(); // 청크 처리 결과 컨테이너

    try {
        result = tasklet.execute(contribution, chunkContext); // Tasklet 실행
    }
    finally {
        stepExecution.apply(contribution); // 실패해도 그 시점까지의 통계를 StepExecution에 반영
    }

    // 청크 성공 시
    stream.update(stepExecution.getExecutionContext()); // ItemStream 상태를 ExecutionContext에 저장
    stepExecution.incrementCommitCount();
    getJobRepository().update(stepExecution); // 메타데이터 저장소에 저장
}
public final void execute(StepExecution stepExecution) {
    // ...

    doExecute(stepExecution); // TaskletStep.doExecute() 완료

    // 정상 완료 시 ExitStatus와 BatchStatus를 COMPLETED로 설정
    exitStatus = ExitStatus.COMPLETED.and(stepExecution.getExitStatus());
    stepExecution.upgradeStatus(BatchStatus.COMPLETED);
}

예외 처리와 롤백

// TaskletStep.rollback()
private void rollback(StepExecution stepExecution) {
    if (!rolledBack) { // 각 청크에 대해 rollbackCount는 한 번만 증가
        stepExecution.incrementRollbackCount();
        rolledBack = true;
    }
}
// AbstractStep.execute()
catch (Throwable e) {
    stepExecution.upgradeStatus(determineBatchStatus(e)); // BatchStatus.FAILED
    exitStatus = exitStatus.and(getDefaultExitStatusForFailure(e)); // ExitStatus.FAILED (로컬 변수에만 저장)
    stepExecution.addFailureException(e);
}

AbstractStep.execute() - finally 절 (Step 종료 처리)

// AbstractStep.execute()
finally {
    stepExecution.setEndTime(LocalDateTime.now()); // 종료 시간 설정

    // afterStep() 호출 전에 ExitStatus를 StepExecution에 반영 (afterStep()에서 참조할 수 있도록)
    exitStatus = exitStatus.and(stepExecution.getExitStatus());
    stepExecution.setExitStatus(exitStatus);

    // StepExecutionListener.afterStep()은 ExitStatus를 반환하여 최종 ExitStatus를 변경
    exitStatus = exitStatus.and(getCompositeListener().afterStep(stepExecution));

    try {
        getJobRepository().updateExecutionContext(stepExecution); // Step ExecutionContext 저장
    }
    catch (Exception e) {
        stepExecution.setStatus(BatchStatus.UNKNOWN); // 메타데이터 저장소 통신 실패 시 UNKNOWN
        exitStatus = exitStatus.and(ExitStatus.UNKNOWN);
        stepExecution.addFailureException(e);
    }

    stepExecution.setExitStatus(exitStatus); // 최종 ExitStatus 반영

    try {
        getJobRepository().update(stepExecution); // StepExecution 최종 상태 저장
    }
    catch (Exception e) {
        stepExecution.setStatus(BatchStatus.UNKNOWN);
        stepExecution.setExitStatus(exitStatus.and(ExitStatus.UNKNOWN));
        stepExecution.addFailureException(e);
    }

    close(stepExecution.getExecutionContext()); // ItemStream.close()
}

Step 실행 완료 후 Job으로의 복귀

// SimpleStepHandler.handleStep()
step.execute(currentStepExecution);
currentStepExecution.getExecutionContext().put("batch.executed", true);
jobRepository.updateExecutionContext(execution); // Job ExecutionContext 저장

return currentStepExecution;
// SimpleJob.doExecute()
if (stepExecution != null) {
    execution.upgradeStatus(stepExecution.getStatus());
    execution.setExitStatus(stepExecution.getExitStatus());
}
// AbstractJob.execute()
finally {
    execution.setEndTime(LocalDateTime.now()); // 종료 시간
    listener.afterJob(execution); // JobExecutionListener.afterJob()
    jobRepository.update(execution); // JobExecution 최종 상태 메타데이터 저장소에 저장
}

Step Squad 핵심 요약

항목 내용
StepExecution 생성 매번 새로 생성. 재시작 시에도 새로 생성하되 이전 ExecutionContext를 복구
BatchStatus 반영 실행 상태 변경 시마다 즉시 메타데이터 저장소에 반영
ExitStatus 반영 Step 실행 완료 후에만 StepExecution에 설정되어 저장
StepContribution 각 청크마다 생성. 실패해도 그 시점까지의 처리 정보는 안전하게 기록
롤백 처리 예외 시 롤백 카운트 증가. 실제 트랜잭션 롤백은 TransactionTemplate이 담당
마무리 종료 시간 설정 → afterStep()ExecutionContext 저장 → StepExecution 저장 → ItemStream.close()