TIL

배치 실행 진입점과 Spring Batch 자동 구성

JobLauncherApplicationRunner

./gradlew bootRun --args='--spring.batch.job.name=brutalizedSystemJob chaos=true,java.lang.Boolean'

run() 메서드

public void run(ApplicationArguments args) throws Exception {
    String[] jobArguments = args.getNonOptionArgs().toArray(new String[0]);
    run(jobArguments);
}

public void run(String... args) throws JobExecutionException {
    // "="를 delimiter로 사용해 key-value Properties로 변환
    launchJobFromProperties(StringUtils.splitArrayElementsIntoProperties(args, "="));
}

DefaultJobParametersConverter

// getJobParameters()에서 각 Properties 엔트리를 JobParameter로 변환
for (Entry<Object, Object> entry : properties.entrySet()) {
    String parameterName = (String) entry.getKey();         // "chaos"
    String encodedJobParameter = (String) entry.getValue(); // "true,java.lang.Boolean"
    JobParameter<?> jobParameter = decode(encodedJobParameter); // Boolean 타입의 true로 변환
    jobParametersBuilder.addJobParameter(parameterName, jobParameter);
}
return jobParametersBuilder.toJobParameters();
protected void launchJobFromProperties(Properties properties) throws JobExecutionException {
    JobParameters jobParameters = this.converter.getJobParameters(properties);
    executeLocalJobs(jobParameters);
    executeRegisteredJobs(jobParameters);
}

executeLocalJobs()

private void executeLocalJobs(JobParameters jobParameters) throws JobExecutionException {
    for (Job job : this.jobs) {
        if (StringUtils.hasText(this.jobName)) {
            if (!this.jobName.equals(job.getName())) {
                continue; // 이름이 일치하지 않으면 건너뜀
            }
        }
        execute(job, jobParameters);
    }
}
// JobLauncherApplicationRunner
// ...
private Collection<Job> jobs = Collections.emptySet();

@Autowired(required = false)
public void setJobs(Collection<Job> jobs) {
    this.jobs = jobs;
}

executeRegisteredJobs()

private void executeRegisteredJobs(JobParameters jobParameters) throws JobExecutionException {
    if (this.jobRegistry != null && StringUtils.hasText(this.jobName)) {
        if (!isLocalJob(this.jobName)) { // 로컬 Job이 아닌 경우에만 실행
            Job job = this.jobRegistry.getJob(this.jobName);
            execute(job, jobParameters);
        }
    }
}

getNextJobParameters()

// execute(Job job, JobParameters jobParameters)
JobParameters parameters = getNextJobParameters(job, jobParameters);
JobExecution execution = this.jobLauncher.run(job, parameters);
// ...
private JobParameters getNextJobParameters(Job job, JobParameters jobParameters) {
    if (jobRepository.isJobInstanceExists(job.getName(), jobParameters)) {
        return getNextJobParametersForExisting(job, jobParameters); // 1) 이미 존재하는 JobInstance
    }
    if (job.getJobParametersIncrementer() == null) { // 2) Incrementer 없음
        return jobParameters;                                      
    }
    
    // 3) Incrementer 있음
    JobParameters nextParameters = new JobParametersBuilder(jobParameters, this.jobExplorer)
            .getNextJobParameters(job).toJobParameters();
    return merge(nextParameters, jobParameters);                 
}

1) 동일한 JobInstance가 이미 존재하는 경우

2) JobParametersIncrementer가 없는 경우

3) JobParametersIncrementer가 설정된 경우

JobExplorer

public interface JobExplorer {
    List<JobInstance> getJobInstances(String jobName, int start, int count);
    JobInstance getLastJobInstance(String jobName);
    JobExecution getJobExecution(Long executionId);
    JobExecution getLastJobExecution(JobInstance jobInstance);
    Set<JobExecution> findRunningJobExecutions(String jobName);
    List<String> getJobNames();
    long getJobInstanceCount(String jobName);
    /*...*/
}

execute() - JobLauncher 호출

protected void execute(Job job, JobParameters jobParameters) {
    JobParameters parameters = getNextJobParameters(job, jobParameters);
    JobExecution execution = this.jobLauncher.run(job, parameters); // Job Squad 진입점
}

BatchAutoConfiguration

JobLauncherApplicationRunner 자동 구성

@Bean
@ConditionalOnMissingBean
@ConditionalOnProperty(prefix = "spring.batch.job", name = "enabled", havingValue = "true", matchIfMissing = true)
public JobLauncherApplicationRunner jobLauncherApplicationRunner(JobLauncher jobLauncher, JobExplorer jobExplorer,
       JobRepository jobRepository, BatchProperties properties) {
    JobLauncherApplicationRunner runner = new JobLauncherApplicationRunner(jobLauncher, jobExplorer, jobRepository);
    String jobName = properties.getJob().getName();
    if (StringUtils.hasText(jobName)) {
       runner.setJobName(jobName);
    }
    return runner;
}

BatchProperties

@ConfigurationProperties(prefix = "spring.batch")
public class BatchProperties {
    private final Job job = new Job(); // spring.batch.job.*

    public static class Job {
        private String name = ""; // spring.batch.job.name
    }
    /*...*/
}

JobExecutionExitCodeGenerator

@Bean
@ConditionalOnMissingBean(ExitCodeGenerator.class)
public JobExecutionExitCodeGenerator jobExecutionExitCodeGenerator() {
    return new JobExecutionExitCodeGenerator();
}

DefaultBatchConfiguration

@Configuration(proxyBeanMethods = false)
static class SpringBootBatchConfiguration extends DefaultBatchConfiguration {
    /*...*/
}
@Configuration(proxyBeanMethods = false)
@Import(ScopeConfiguration.class)
public class DefaultBatchConfiguration implements ApplicationContextAware {

    @Bean
    public JobRepository jobRepository() throws BatchConfigurationException { /*...*/ }

    @Bean
    public JobLauncher jobLauncher(JobRepository jobRepository) throws BatchConfigurationException { /*...*/ }

    @Bean
    public JobExplorer jobExplorer() throws BatchConfigurationException { /*...*/ }

    @Bean
    public JobRegistry jobRegistry() throws BatchConfigurationException { /*...*/ }

    @Bean
    public JobRegistrySmartInitializingSingleton jobRegistrySmartInitializingSingleton(JobRegistry jobRegistry) throws BatchConfigurationException { /*...*/ }
}

protected 메서드를 통한 커스터마이징

protected DataSource getDataSource() { /*...*/ }
protected PlatformTransactionManager getTransactionManager() { /*...*/ }
protected String getTablePrefix() { /*...*/ }
protected TaskExecutor getTaskExecutor() { /*...*/ }
@Configuration
public class KillBatchCustomConfiguration extends DefaultBatchConfiguration {
    @Override
    protected ExecutionContextSerializer getExecutionContextSerializer() {
        return new Jackson2ExecutionContextStringSerializer(); // 기본 직렬화 대신 JSON 직렬화
    }
}

@EnableBatchProcessing

// DefaultBatchConfiguration 상속 방식
@Configuration
public class BatchConfig extends DefaultBatchConfiguration { /*...*/ }

// @EnableBatchProcessing 방식
@Configuration
@EnableBatchProcessing
public class BatchConfig { /*...*/ }
@Import({ BatchRegistrar.class, ScopeConfiguration.class /*...*/ })
public @interface EnableBatchProcessing {
    String dataSourceRef() default "dataSource";
    String transactionManagerRef() default "transactionManager";
    String executionContextSerializerRef() default "executionContextSerializer";
    /*...*/
}
class BatchRegistrar implements ImportBeanDefinitionRegistrar {
    @Override
    public void registerBeanDefinitions(AnnotationMetadata importingClassMetadata, BeanDefinitionRegistry registry) {
        EnableBatchProcessing batchAnnotation = importingClassMetadata.getAnnotations()
            .get(EnableBatchProcessing.class)
            .synthesize();

        registerJobRepository(registry, batchAnnotation);
        registerJobExplorer(registry, batchAnnotation);
        registerJobLauncher(registry, batchAnnotation);
        registerJobRegistry(registry);
        /*...*/
    }
}
@Configuration
@EnableBatchProcessing(executionContextSerializerRef = "jacksonExecutionContextSerializer")
public class BatchConfig {
    @Bean
    public ExecutionContextSerializer jacksonExecutionContextSerializer() {
        return new Jackson2ExecutionContextStringSerializer();
    }
}

ScopeConfiguration

@Configuration(proxyBeanMethods = false)
public class ScopeConfiguration {
    private static final StepScope stepScope;
    private static final JobScope jobScope;

    static {
       jobScope = new JobScope();
       jobScope.setAutoProxy(false);

       stepScope = new StepScope();
       stepScope.setAutoProxy(false);
    }

    @Bean
    public static StepScope stepScope() { return stepScope; }

    @Bean
    public static JobScope jobScope() { return jobScope; }
}
private StepContext getContext() {
    StepContext context = StepSynchronizationManager.getContext();
    if (context == null) {
       throw new IllegalStateException("No context holder available for step scope");
    }
    return context;
}

JobContext/StepContext - Late-Binding

public Map<String, Object> getStepExecutionContext() { /*...*/ }

public Map<String, Object> getJobExecutionContext() { /*...*/ }

public Map<String, Object> getJobParameters() { /*...*/ }
SpEL 표현식 StepContext 메서드
#{jobParameters} getJobParameters()
#{stepExecutionContext} getStepExecutionContext()
#{jobExecutionContext} getJobExecutionContext()
@Override
public Object resolveContextualObject(String key) {
    StepContext context = getContext();
    return new BeanWrapperImpl(context).getPropertyValue(key);
}
@Value("#{stepName}") String stepName,
@Value("#{jobName}") String jobName,
@Value("#{jobInstanceId}") Long jobInstanceId,
@Value("#{systemProperties['os.name']}") String osName

@JobScope 빈에서는 stepName, jobInstanceId를 조회할 수 없다. JobScopeStepContext가 아닌 JobContext를 사용하기 때문이다.

BatchAutoConfiguration 비활성화

@ConditionalOnMissingBean(value = DefaultBatchConfiguration.class, annotation = EnableBatchProcessing.class)
public class BatchAutoConfiguration

SpringBootBatchConfiguration

SpringBootBatchConfiguration(
        DataSource dataSource, 
        @BatchDataSource ObjectProvider<DataSource> batchDataSource,
       PlatformTransactionManager transactionManager,
       @BatchTransactionManager ObjectProvider<PlatformTransactionManager> batchTransactionManager,
       @BatchTaskExecutor ObjectProvider<TaskExecutor> batchTaskExecutor, BatchProperties properties,
       ObjectProvider<BatchConversionServiceCustomizer> batchConversionServiceCustomizers,
       ObjectProvider<ExecutionContextSerializer> executionContextSerializer) {
    // ...
}
// DefaultBatchConfiguration 직접 상속 없이 빈만 정의하면 자동 적용
@Configuration
public class KillBatchCustomConfiguration {
    @Bean
    public ExecutionContextSerializer executionContextSerializer() {
        return new Jackson2ExecutionContextStringSerializer();
    }
}

@BatchXXX 어노테이션

@Bean
@BatchTaskExecutor
public TaskExecutor taskExecutor() {
    return new SimpleAsyncTaskExecutor();
}

별도 구성이 없을 경우 TaskExecutorJobLauncherSyncTaskExecutor(동기식)를 사용한다.

TransactionManager 분리

@Configuration
public class TransactionManagerConfig {
    @Bean
    @Primary
    public DataSource dataSource() { /*...*/ } // 비즈니스 데이터용

    @Bean
    @BatchDataSource
    public DataSource batchDataSource() { /*...*/ } // 배치 메타데이터용

    @Bean
    @Primary
    public PlatformTransactionManager transactionManager(EntityManagerFactory emf) {
        return new JpaTransactionManager(emf); // 비즈니스 데이터용
    }

    @Bean
    @BatchTransactionManager
    public PlatformTransactionManager batchTransactionManager(@BatchDataSource DataSource dataSource) {
        return new JdbcTransactionManager(dataSource); // 배치 메타데이터용
    }
}

프로세스 종료

ExitCode

// 기본 - Job 실패해도 exit code 0
public static void main(String[] args) {
    SpringApplication.run(KillBatchSystemApplication.class, args);
}

// 수정 - Job 실패 시 적절한 exit code 반환
public static void main(String[] args) {
    System.exit(SpringApplication.exit(SpringApplication.run(KillBatchSystemApplication.class, args)));
}
@Override
public int getExitCode() {
    for (JobExecution execution : this.executions) {
        if (execution.getStatus().ordinal() > 0) {
            return execution.getStatus().ordinal();
        }
    }
    return 0;
}
BatchStatus ordinal (종료 코드)
COMPLETED 0
STARTING 1
STARTED 2
STOPPING 3
STOPPED 4
FAILED 5
ABANDONED 6
UNKNOWN 7

ExitStatus를 활용한 종료 코드 제어

Tasklet에서 커스텀 ExitStatus 설정

// Tasklet 내에서 예외 타입별로 커스텀 ExitStatus 설정
catch (IllegalStateException e) {
    contribution.setExitStatus(new ExitStatus("SKULL_FRACTURE", e.getMessage()));
    throw e;
} catch (ValidationException e) {
    contribution.setExitStatus(new ExitStatus("SYSTEM_BRUTALIZED", e.getMessage()));
    throw e;
}

커스텀 ExitCodeGenerator 구현

@Component
public class BrutalizedSystemExitCodeGenerator implements JobExecutionListener, ExitCodeGenerator {
    private final SimpleJvmExitCodeMapper exitCodeMapper = new SimpleJvmExitCodeMapper();
    private int exitCode = 0;

    public BrutalizedSystemExitCodeGenerator() {
        exitCodeMapper.setMapping(Map.of(
                "SKULL_FRACTURE", 3,
                "SYSTEM_BRUTALIZED", 4,
                "UNKNOWN_CHAOS", 5));
    }

    @Override
    public void afterJob(JobExecution jobExecution) {
        String exitStatus = jobExecution.getExitStatus().getExitCode();
        this.exitCode = exitCodeMapper.intValue(exitStatus);
    }

    @Override
    public int getExitCode() { return exitCode; }
}
@Bean
public Job brutalizedSystemJob() {
    return new JobBuilder("brutalizedSystemJob", jobRepository)
            .start(brutalizedSystemStep())
            .listener(brutalizedSystemExitCodeGenerator) // 리스너 등록
            .build();
}