TIL

FlatFileItemReader

파일 기반 배치 처리

파일 처리의 실체

파일 기반 배치 처리

Flat 파일이란

FlatFileItemReader

// FlatFileItemReader.doRead()
// ...
String line = readLine();
// ...
return lineMapper.mapLine(line, lineCount); 
public interface LineMapper<T> {
    T mapLine(String line, int lineNumber) throws Exception;
}

DefaultLineMapper

@Override
public T mapLine(String line, int lineNumber) throws Exception {
    FieldSet fieldSet = tokenizer.tokenize(line);  // 1단계: 토큰화
    return fieldSetMapper.mapFieldSet(fieldSet);  // 2단계: 객체 매핑	 
}

1단계: 토큰화 - LineTokenizer

public class DefaultFieldSet implements FieldSet {
   private final String[] tokens; // 토큰화된 데이터
   private List<String> names; // 각 데이터를 객체의 어떤 프로퍼티에 매핑할지 나타내는 프로퍼티 이름 목록
   // ...
}

2단계: 객체 매핑 - FieldSetMapper

public interface FieldSetMapper<T> {
    T mapFieldSet(FieldSet fieldSet) throws BindException;
}

구분자로 분리된 형식의 파일 읽기

Step 구성

@Bean
fun systemFailureStep(
    systemFailureItemReader: FlatFileItemReader<SystemFailure>,
    systemFailureStdoutItemWriter: SystemFailureStdoutItemWriter,
): Step {
    return StepBuilder("systemFailureStep", jobRepository)
        .chunk<SystemFailure, SystemFailure>(10, transactionManager)
        .reader(systemFailureItemReader)
        .writer(systemFailureStdoutItemWriter)
        .build()
}
// 기본 생성자가 필요하여 모두 기본값을 설정
// setter가 필요하여 var로 선언
data class SystemFailure(
    var errorId: String = "",
    var errorDateTime: String = "",
    var severity: String = "",
    var processId: Int = 0,
    var errorMessage: String = "",
)

FlatFileItemReader 구성 해부

@Bean
@StepScope
fun systemFailureItemReader(
    @Value("#{jobParameters['inputFile']}") inputFile: String,
): FlatFileItemReader<SystemFailure> {
    return FlatFileItemReaderBuilder<SystemFailure>()
        .name("systemFailureItemReader")
        .resource(ClassPathResource(inputFile))
        .delimited()
        .delimiter(",")
        .names(
            "errorId",
            "errorDateTime",
            "severity",
            "processId",
            "errorMessage"
        )
        .targetType(SystemFailure::class.java)
        .linesToSkip(1)
        .build()
}

고정 길이 형식 파일 읽기

@Bean
@StepScope
fun systemFailureItemReader(
    @Value("#{jobParameters['inputFile']}") inputFile: String,
): FlatFileItemReader<SystemFailure> {
    return FlatFileItemReaderBuilder<SystemFailure>()
        .name("systemFailureItemReader")
        .resource(ClassPathResource(inputFile))
        .fixedLength()
        .columns(
            Range(1, 8),  // errorId: ERR001 + 공백 2칸
            Range(9, 29),  // errorDateTime: 날짜시간 + 공백 2칸
            Range(30, 39),  // severity: CRITICAL/FATAL + 패딩
            Range(40, 45),  // processId: 1234 + 공백 2칸
            Range(46, 66) // errorMessage: 메시지 + \n
        )
        .names("errorId", "errorDateTime", "severity", "processId", "errorMessage")
        .targetType(SystemFailure::class.java)
        .build()
}

프로퍼티 타입에 LocalDateTime 사용

@Bean
@StepScope
fun systemFailureItemReader(
    @Value("#{jobParameters['inputFile']}") inputFile: String,
): FlatFileItemReader<SystemFailure> {
    return FlatFileItemReaderBuilder<SystemFailure>()
        // ...
        .customEditors(mapOf(LocalDateTime::class.java to dateTimeEditor()))
        .build()
        
private fun dateTimeEditor(): PropertyEditor {
    return object : PropertyEditorSupport() {
        override fun setAsText(text: String) {
            val formatter = DateTimeFormatter.ofPattern("yyyy-MM-dd HH:mm:ss")
            setValue(LocalDateTime.parse(text, formatter))
        }
    }
}       

RegexLineTokenizer

RegexLineTokenizer 구성

@Bean
@StepScope
fun logItemReader(
    @Value("#{jobParameters['inputFile']}") inputFile: String,
): FlatFileItemReader<LogEntry> =
    FlatFileItemReaderBuilder<LogEntry>()
        .name("logItemReader")
        .resource(ClassPathResource(inputFile))
        .lineTokenizer(
            RegexLineTokenizer().apply {
                setRegex("\\[\\w+\\]\\[Thread-(\\d+)\\]\\[CPU: \\d+%\\] (.+)")
            }
        )
        .fieldSetMapper { fieldSet: FieldSet ->
            LogEntry(
                fieldSet.readString(0),
                fieldSet.readString(1)
            )
        }
        .build()

data class LogEntry(
    var threadNum: String = "",
    var message: String = "",
)    

PatternMatchingCompositeLineMapper

LineMapper

    @Bean
    @StepScope
    fun systemLogReader(
        @Value("#{jobParameters['inputFile']}") inputFile: String,
    ): FlatFileItemReader<SystemLog> {
        return FlatFileItemReaderBuilder<SystemLog>()
            // ...
            .lineMapper(systemLogLineMapper())
            .build()
    }

systemLogLineMapper 정의

@Bean
fun systemLogLineMapper(): PatternMatchingCompositeLineMapper<SystemLog> =
    PatternMatchingCompositeLineMapper<SystemLog>()
        .apply {
            setTokenizers(
                mapOf(
                    "ERROR*" to errorLineTokenizer(),
                    "ABORT*" to abortLineTokenizer(),
                    "COLLECT*" to collectLineTokenizer(),
                )
            )
            setFieldSetMappers(
                mapOf(
                    "ERROR*" to ErrorFieldSetMapper(),
                    "ABORT*" to AbortFieldSetMapper(),
                    "COLLECT*" to CollectFieldSetMapper(),
                )
            )
        }

@Bean
fun errorLineTokenizer(): DelimitedLineTokenizer =
    DelimitedLineTokenizer(",").apply {
        setNames("type", "application", "errorType", "timestamp", "message", "resourceUsage", "logPath")
    }
// ...

class ErrorFieldSetMapper : FieldSetMapper<SystemLog> {
    override fun mapFieldSet(fieldSet: FieldSet): SystemLog =
        SystemLog.Error(
            type = fieldSet.readString("type"),
            timestamp = fieldSet.readString("timestamp"),
            application = fieldSet.readString("application"),
            errorType = fieldSet.readString("errorType"),
            message = fieldSet.readString("message"),
            resourceUsage = fieldSet.readString("resourceUsage"),
            logPath = fieldSet.readString("logPath"),
        )
}

// ...

MultiResourceItemReader: 여러 파일 읽기

    @Bean
    @StepScope
    fun multiSystemFailureItemReader(
        @Value("#{jobParameters['inputFilePath']}") inputFilePath: String,
    ): MultiResourceItemReader<SystemFailure> =
        MultiResourceItemReaderBuilder<SystemFailure>()
            .name("multiSystemFailureItemReader")
            .resources( // 읽을 대상 Resource 목록을 배열 형태로 구성
                ClassPathResource("$inputFilePath/critical-failures.csv"),
                ClassPathResource("$inputFilePath/normal-failures.csv"),
            ).delegate(systemFailureReader()) // 위임 ItemReader 지정
            .build()
.comparator { r1, r2 -> r2.filename!!.compareTo(r1.filename!!) }
.build()
    @Bean
    fun systemFailureReader(): FlatFileItemReader<SystemFailure> =
        FlatFileItemReaderBuilder<SystemFailure>()
            .name("systemFailureReader")
            // ...