TIL

Flow - 조건부 Step 실행 흐름 제어

순차 실행의 한계

Flow란?

상태 머신(State Machine) 원리

              [ExitCode="COMPLETED"]
┌──────────────┐ ───────────────▶ ┌──────────────┐
│ 강의 분석      │                  │  강의 게시     │ ──▶ [작업 종료]
│   Step       │                  │   Step       │
└──────────────┘                  └──────────────┘
         │ [ExitCode="FAILED"]
         ▼
┌──────────────┐
│ 오류 처리      │ ──▶ [작업 종료]
│   Step       │
└──────────────┘

Flow 3대 핵심 요소

상태(State)

전이 조건(ExitCode)

public class ExitStatus {
    public static final ExitStatus COMPLETED = new ExitStatus("COMPLETED");
    public static final ExitStatus FAILED = new ExitStatus("FAILED");

    private final String exitCode; // 이 값이 Flow의 전이 분기를 결정
}

전이 규칙(Transition)

각 상태에서 ExitCode 결과 + 미리 정의된 Transition 규칙 = 다음 목적지 결정

용어 참고: “실행 상태(Execution State)”는 Spring Batch 공식 용어가 아니라 강의에서 설명 편의를 위해 사용하는 표현이다. EndState가 아닌 모든 상태(StepState, DecisionState 등)를 통칭한다.

Flow 기본 메커니즘

┌─────────┐    ExitCode     ┌─────────┐    ExitCode      ┌─────────┐
│ 시작 상태 │ ─────────────▶ │실행 상태1 │ ───────────────▶ │실행 상태2 │
└─────────┘  + Transition   └─────────┘   + Transition   └─────────┘
                               │                            │
                               │ ExitCode + Transition      │ ExitCode + Transition
                               ▼                            ▼
                          ┌─────────┐                  ┌──────────┐
                          │실행 상태3 │ ───────────────▶ │ 종료 상태  │
                          └─────────┘                  │(EndState)│
                                                       └──────────┘
return new JobBuilder("conditionalJob", jobRepository)
    .start(analyzeStep)          // 시작 상태 설정
    .on("COMPLETED")             // ExitCode 조건
    .to(publishStep)             // 전이: publishStep으로 이동
    .from(analyzeStep)           // 다시 analyzeStep 기준으로
    .on("FAILED")                // ExitCode 조건
    .to(failureStep)             // 전이: failureStep으로 이동
    .end()                       // Flow 정의 종료
    .build();

커스텀 ExitCode를 활용한 Flow 분기

커스텀 ExitStatus 설정 방법

contribution.setExitStatus(new ExitStatus("CUSTOM_CODE", "설명"));
// AbstractStep.execute() 내부
exitStatus = exitStatus.and(getCompositeListener().afterStep(stepExecution));
@Override
public ExitStatus afterStep(StepExecution stepExecution) {
    int skipCount = stepExecution.getSkipCount();
    if (skipCount >= 10) {
        return new ExitStatus("CRITICAL_SKIP");  // 커스텀 ExitCode
    } else if (skipCount >= 5) {
        return new ExitStatus("WARNING_SKIP");
    }
    return stepExecution.getExitStatus();  // 기존 ExitStatus 유지
}

커스텀 ExitCode 기반 Flow 분기 구현 예제

return new JobBuilder("lectureReviewJob", jobRepository)
    .start(analyzeLectureStep)
    .on("APPROVED").to(approveStep)                // 승인 → 게시 처리
    .from(analyzeLectureStep)
    .on("PLAGIARISM_DETECTED").to(containmentStep) // 표절 → 격리 처리
    .from(analyzeLectureStep)
    .on("QUALITY_SUBSTANDARD").to(rejectionStep)   // 품질 미달 → 반려
    .from(analyzeLectureStep)
    .on("TOO_EXPENSIVE").to(pricePunishStep)       // 과다 가격 → 가격 조정 요구
    .from(analyzeLectureStep)
    .on("666_UNKNOWN_PANIC").to(adminCheckStep)    // 미확인 위험 → 관리자 수동 검토
    .end()
    .build();

Flow 설계 주의사항

빠진 조건과 FlowExecutionException

FlowExecutionException: Next state not found in flow=lectureReviewJob
    for state=...step0 with exit status=QUALITY_SUBSTANDARD

터미널 상태(Terminal State)와 암시적 전이 규칙

용어 참고: “터미널 상태(Terminal State)”는 강의에서 설명 편의를 위해 만든 용어로, Spring Batch 공식 문서에는 없다.

// lowQualityRejectionStep에 .on()을 추가하는 순간 터미널 상태 아님
.on("QUALITY_SUBSTANDARD").to(lowQualityRejectionStep).on("GPT_DETECTED").to(gptAlertStep)

// 이제 lowQualityRejectionStep의 나머지 ExitCode도 반드시 정의해야 함
.from(lowQualityRejectionStep)
.on("COMPLETED").to(rejectionCompletedStep)
// 누락 시 → FlowExecutionException

와일드카드 패턴 매칭

* (애스터리스크)

// 나머지 모든 ExitCode 처리
.from(analyzeLectureStep)
.on("*").to(processUnknownStateStep)

// ERROR_로 시작하는 모든 ExitCode 일괄 처리
.from(analyzeLectureStep)
.on("ERROR_*").to(systemErrorHandlingStep)

? (물음표)

.on("ERROR_?")      // ERROR_1, ERROR_2 등 (한 글자)
.on("QUALITY_???")  // QUALITY_LOW, QUALITY_BAD 등 (정확히 3글자)

조합

.on("LECTURE_??_*") // LECTURE_01_BASIC, LECTURE_02_ADVANCED 등

전이 우선순위

높음 ◀──────────────────────────────────▶ 낮음
정확한 문자열 > ? 포함 패턴 > ? 전용 패턴 > * 포함 패턴 > *

예: ERROR_1 > ERROR_? > ??? > ERROR_* > *

Flow 내부 실행 과정: StepState

// StepState.handle()
public FlowExecutionStatus handle(FlowExecutor executor) {
    return new FlowExecutionStatus(executor.executeStep(step)); // exitCode → FlowExecutionStatus로 변환
}

// JobFlowExecutor.executeStep()
public String executeStep(Step step) {
    StepExecution stepExecution = stepHandler.handleStep(step, execution);
    return stepExecution.getExitStatus().getExitCode(); // ExitStatus의 exitCode 문자열 반환
}

FlowExecutionStatus

public static final FlowExecutionStatus COMPLETED = new FlowExecutionStatus("COMPLETED");
public static final FlowExecutionStatus STOPPED   = new FlowExecutionStatus("STOPPED");
public static final FlowExecutionStatus FAILED    = new FlowExecutionStatus("FAILED");
public static final FlowExecutionStatus UNKNOWN   = new FlowExecutionStatus("UNKNOWN");

public FlowExecutionStatus(String status) {
    this.name = status; // exitCode 문자열이 그대로 name이 된다
}

EndState

EndState가 Job의 최종 상태를 결정하는 과정

// EndState
private final FlowExecutionStatus status;

public FlowExecutionStatus handle(FlowExecutor executor) {
    return status; // 이 값이 Job의 최종 상태를 결정
}
// FlowJob.doExecute()
executor.updateJobExecutionStatus(flow.start(executor).getStatus()); // EndState의 FlowExecutionStatus 전달

// JobFlowExecutor.updateJobExecutionStatus()
public void updateJobExecutionStatus(FlowExecutionStatus status) {
    execution.setStatus(findBatchStatus(status));                    // 1. BatchStatus 설정
    execution.setExitStatus(exitStatus.and(new ExitStatus(status.getName()))); // 2. ExitStatus 설정
}

암시적 전이와 EndState의 관계

앞서 설명한 터미널 상태의 암시적 전이 규칙이 이제 구체적으로 이해된다:

명시적 EndState 전이: end(), fail(), stop()

// end(): 결과와 무관하게 COMPLETED EndState로 전이
.from(step).on("TOO_EXPENSIVE").to(punishmentStep).on("*").end()

// stop(): 결과와 무관하게 STOPPED EndState로 전이
.from(step).on("666_UNKNOWN_PANIC").to(adminCheckStep).on("*").stop()

// fail(): 즉시 FAILED EndState로 전이 (다음 Step 없음)
.from(step).on("PLAGIARISM_DETECTED").fail()
.from(step).on("*").end()  // → COMPLETED EndState로 전이 (TransitionBuilder.end())
.end()                     // → Flow 정의 종료 (FlowBuilder.end())
.build();

커스텀 EndState: end(String status)

.on("APPROVED").to(approveStep).on("*").end("COMPLETED_BY_SYSTEM")
// JobFlowExecutor.findBatchStatus()
protected BatchStatus findBatchStatus(FlowExecutionStatus status) {
    for (BatchStatus batchStatus : BatchStatus.values()) {
        if (status.getName().startsWith(batchStatus.toString())) {
            return batchStatus; // name이 BatchStatus 열거형 값으로 시작하면 해당 값 반환
        }
    }
    return BatchStatus.UNKNOWN; // 매칭 실패 시 UNKNOWN
}
커스텀 상태 BatchStatus ExitCode
COMPLETED_BY_SYSTEM COMPLETED COMPLETED_BY_SYSTEM
FAILED_VALIDATION FAILED FAILED_VALIDATION
CUSTOM_STATUS UNKNOWN CUSTOM_STATUS

JobExecutionDecider

Step에서 ExitCode를 결정하는 방식의 한계

JobExecutionDecider란

사용 방법

// 1. JobExecutionDecider 구현
public class StudentReviewDecider implements JobExecutionDecider {
    @Override
    public FlowExecutionStatus decide(JobExecution jobExecution, StepExecution stepExecution) {
        int reviewScore = stepExecution.getExecutionContext().getInt("reviewScore");
        if (reviewScore >= 8) return new FlowExecutionStatus("EXCELLENT_COURSE");
        if (reviewScore >= 5) return new FlowExecutionStatus("AVERAGE_COURSE");
        return new FlowExecutionStatus("NEEDS_IMPROVEMENT");
    }
}

// 2. Flow DSL에서 next()로 Decider 연결
return new JobBuilder("studentReviewJob", jobRepository)
    .start(analyzeStudentReviewStep)
    .next(studentReviewDecider)  // Decider가 전이 조건 결정
    .on("EXCELLENT_COURSE").to(promoteCourseStep)
    .from(studentReviewDecider).on("AVERAGE_COURSE").to(normalManagementStep)
    .from(studentReviewDecider).on("NEEDS_IMPROVEMENT").to(improvementRequiredStep)
    .end()
    .build();

DecisionState

Flow 외부화

Flow를 빈으로 정의

@Bean
public Flow lectureValidationFlow(Step validateContentStep,
                                  Step checkPlagiarismStep,
                                  Step verifyPricingStep) {
    return new FlowBuilder<Flow>("lectureValidationFlow")
            .start(validateContentStep)
            .next(checkPlagiarismStep)
            .on("PLAGIARISM_DETECTED").fail()
            .from(checkPlagiarismStep).on("COMPLETED").to(verifyPricingStep)
            .from(verifyPricingStep).on("*").end()
            .build();
}

재사용 패턴 3가지

1. Job에 Flow 직접 주입

return new JobBuilder("newCourseReviewJob", jobRepository)
    .start(lectureValidationFlow)  // Flow를 Job의 시작점으로 사용
    .next(notifyInstructorStep)
    .end().build();

2. FlowStep: Flow를 하나의 Step으로 래핑

return new StepBuilder("validationStep", jobRepository)
    .flow(lectureValidationFlow)  // 내부적으로 FlowStep 생성
    .build();

3. Flow를 실행 상태(FlowState)로 활용

return new JobBuilder("courseUpdateJob", jobRepository)
    .start(checkModificationStep)
    .on("MAJOR_UPDATE").to(lectureValidationFlow)  // 조건에 따라 Flow 실행
    .from(checkModificationStep)
    .on("MINOR_UPDATE").to(publishUpdateStep)
    .end().build();

Flow 설계 주의사항