FlatFileItemReaderFlatFileItemWriterread() 메서드 동작은 크게 두 단계로 이루어진다.
// FlatFileItemReader.doRead()
// ...
String line = readLine();
// ...
return lineMapper.mapLine(line, lineCount);
LineMapper라는 컴포넌트가 핵심을 담당한다.
RowMapper와 유사public interface LineMapper<T> {
T mapLine(String line, int lineNumber) throws Exception;
}
DefaultLineMapper를 제공한다.DefaultLineMapper 동작도 크게 두 단계로 이루어진다.
@Override
public T mapLine(String line, int lineNumber) throws Exception {
FieldSet fieldSet = tokenizer.tokenize(line); // 1단계: 토큰화
return fieldSetMapper.mapFieldSet(fieldSet); // 2단계: 객체 매핑
}
LineTokenizerLineTokenizer 컴포넌트가 사용된다.LineTokenizer 구현체는 대표적으로 다음 두 가지가 있다.
DelimitedLineTokenizer - 쉼표(,) 구분자로 데이터를 토큰화
ERR001,2024-01-19,CRITICAL -> ["ERR001", "2024-01-19", "CRITICAL"]FixedLengthTokenizer - 고정된 길이로 구분된 데이터를 토큰화
ERR00120240119CRITICAL -> ["ERR001", "20240119", "CRITICAL"] (각 6, 8, 8자리)FlatFileItemReader 구성 시점에 선택 가능하다.FieldSet 객체로 만들어 변환한다.public class DefaultFieldSet implements FieldSet {
private final String[] tokens; // 토큰화된 데이터
private List<String> names; // 각 데이터를 객체의 어떤 프로퍼티에 매핑할지 나타내는 프로퍼티 이름 목록
// ...
}
"ERR001,2024-01-19 10:15:23,CRITICAL,1234,SYSTEM_CRASH”
tokens: ["ERR001", "2024-01-19 10:15:23", "CRITICAL", "1234", "SYSTEM_CRASH"]names: ["errorId", "errorDateTime", "severity", "processId", "errorMessage"]FieldSetMapperFieldSetMapper가 사용된다.public interface FieldSetMapper<T> {
T mapFieldSet(FieldSet fieldSet) throws BindException;
}
BeanWrapperFieldSetMapper가 사용된다.
setter 메서드를 호출해서 데이터를 설정한다.FieldSet 필드 이름과 객체 프로퍼티 이름이 일치해야 한다.@Bean
fun systemFailureStep(
systemFailureItemReader: FlatFileItemReader<SystemFailure>,
systemFailureStdoutItemWriter: SystemFailureStdoutItemWriter,
): Step {
return StepBuilder("systemFailureStep", jobRepository)
.chunk<SystemFailure, SystemFailure>(10, transactionManager)
.reader(systemFailureItemReader)
.writer(systemFailureStdoutItemWriter)
.build()
}
chunk의 제네릭 타입을 SystemFailure, 즉 변환할 객체로 설정한다.// 기본 생성자가 필요하여 모두 기본값을 설정
// setter가 필요하여 var로 선언
data class SystemFailure(
var errorId: String = "",
var errorDateTime: String = "",
var severity: String = "",
var processId: Int = 0,
var errorMessage: String = "",
)
FlatFileItemReaderBuilder라는 전용 빌더 클래스를 사용해 구성@Bean
@StepScope
fun systemFailureItemReader(
@Value("#{jobParameters['inputFile']}") inputFile: String,
): FlatFileItemReader<SystemFailure> {
return FlatFileItemReaderBuilder<SystemFailure>()
.name("systemFailureItemReader")
.resource(ClassPathResource(inputFile))
.delimited()
.delimiter(",")
.names(
"errorId",
"errorDateTime",
"severity",
"processId",
"errorMessage"
)
.targetType(SystemFailure::class.java)
.linesToSkip(1)
.build()
}
name(): ItemReader 식별자 지정
resource(): 읽어 들일 Resource를 지정delimited() 파일 형식 지정
delimited를 호출하면 DefaultLineMapper가 사용되어 DelimitedLineTokenizer가 지정된다.delimiter(): 구분자 지정
names(): 프로퍼티 매핑targetTypes(): 매핑 대상 클래스 지정
FieldSetMapper 구현체인 BeanWrapperFieldSetMapper가 사용된다.BeanWrapperFieldSetMapper가 새 인스턴스를 생성하려면 tartetType 설정이 필요하다.linesToSkip(): 헤더 처리
linesToSkip(1))strict(): 파일 검증 강도 설정
true로 파일 누락 시 배치를 중단한다.false면 파일이 없어도 경고만 남기고 진행한다.
read() 메서드에서 null 반환@Bean
@StepScope
fun systemFailureItemReader(
@Value("#{jobParameters['inputFile']}") inputFile: String,
): FlatFileItemReader<SystemFailure> {
return FlatFileItemReaderBuilder<SystemFailure>()
.name("systemFailureItemReader")
.resource(ClassPathResource(inputFile))
.fixedLength()
.columns(
Range(1, 8), // errorId: ERR001 + 공백 2칸
Range(9, 29), // errorDateTime: 날짜시간 + 공백 2칸
Range(30, 39), // severity: CRITICAL/FATAL + 패딩
Range(40, 45), // processId: 1234 + 공백 2칸
Range(46, 66) // errorMessage: 메시지 + \n
)
.names("errorId", "errorDateTime", "severity", "processId", "errorMessage")
.targetType(SystemFailure::class.java)
.build()
}
fixedLength(): 고정 길이 형식임을 알리는 설정
LineTokenizer 구현체로 FixedLengthTokenizer가 지정된다.columns(): Range 배열로 각 필드의 시작과 끝을 정의strict()
names(), targetType()
BeanWrapperFieldSetMapper은 기본 타입 외엔 변환을 지원하지 않는다.customEditors() 메서드로 커스텀 PropertyEditor를 등록할 수 있다.@Bean
@StepScope
fun systemFailureItemReader(
@Value("#{jobParameters['inputFile']}") inputFile: String,
): FlatFileItemReader<SystemFailure> {
return FlatFileItemReaderBuilder<SystemFailure>()
// ...
.customEditors(mapOf(LocalDateTime::class.java to dateTimeEditor()))
.build()
private fun dateTimeEditor(): PropertyEditor {
return object : PropertyEditorSupport() {
override fun setAsText(text: String) {
val formatter = DateTimeFormatter.ofPattern("yyyy-MM-dd HH:mm:ss")
setValue(LocalDateTime.parse(text, formatter))
}
}
}
[WARNING][Thread-156][CPU: 78%] Thread pool saturation detected - 45/50 threads in use…RegexLineTokenizer는 정규식을 활용한 토큰 파싱 도구다.@Bean
@StepScope
fun logItemReader(
@Value("#{jobParameters['inputFile']}") inputFile: String,
): FlatFileItemReader<LogEntry> =
FlatFileItemReaderBuilder<LogEntry>()
.name("logItemReader")
.resource(ClassPathResource(inputFile))
.lineTokenizer(
RegexLineTokenizer().apply {
setRegex("\\[\\w+\\]\\[Thread-(\\d+)\\]\\[CPU: \\d+%\\] (.+)")
}
)
.fieldSetMapper { fieldSet: FieldSet ->
LogEntry(
fieldSet.readString(0),
fieldSet.readString(1)
)
}
.build()
data class LogEntry(
var threadNum: String = "",
var message: String = "",
)
lineTokenizer 등록
.lineTokenizer()로 구성해주면 등록된다.filedSetMapper 등록
FlatFileItemReaderBuilder는 커스텀 filedSetMapper도 등록할 수 있다..targetType() 메서드를 호출하면 안 된다.RegexLineTokenizer는 하나의 형식만 지원하기에 파일의 모든 라인이 해당 정규식을 따르는 경우 유용하다.
PatternMatchingCompositeLineMapper
LineTokenizer와 FieldSetMapper를 적용할 수 있다.각 로그 타입별로 아래와 같은 형식을 지원한다고 가정하자
// ERROR,mysql-prod,OOM,2024-01-24T09:30:00,heap space killing spree,85%,/var/log/mysql
// ABORT,spring-batch,MemoryLeak,2024-01-24T10:15:30,forced termination,-1,/usr/apps/batch,TERMINATED
// COLLECT,heap-dump,PID-9012,2024-01-24T11:00:15,/tmp/heapdump
sealed interface SystemLog {
val type: String
val timestamp: String
data class Error(
override val type: String,
override val timestamp: String,
val application: String,
val errorType: String,
val message: String,
val resourceUsage: String,
val logPath: String,
) : SystemLog
data class Abort(
override val type: String,
override val timestamp: String,
val application: String,
val errorType: String,
val message: String,
val exitCode: String,
val processPath: String,
val status: String,
) : SystemLog
data class Collect(
override val type: String,
override val timestamp: String,
val dumpType: String,
val processId: String,
val dumpPath: String,
) : SystemLog
}
LineTokenizer, FieldSetMapper를 한 번에 설정한 LineMapper를 설정할 수 있다. @Bean
@StepScope
fun systemLogReader(
@Value("#{jobParameters['inputFile']}") inputFile: String,
): FlatFileItemReader<SystemLog> {
return FlatFileItemReaderBuilder<SystemLog>()
// ...
.lineMapper(systemLogLineMapper())
.build()
}
systemLogLineMapper 빈 정의는 아래와 같다.
setTokenizers, setFieldSetMappers을 통해 각 패턴별 필요한 구현체들을 설정한다.@Bean
fun systemLogLineMapper(): PatternMatchingCompositeLineMapper<SystemLog> =
PatternMatchingCompositeLineMapper<SystemLog>()
.apply {
setTokenizers(
mapOf(
"ERROR*" to errorLineTokenizer(),
"ABORT*" to abortLineTokenizer(),
"COLLECT*" to collectLineTokenizer(),
)
)
setFieldSetMappers(
mapOf(
"ERROR*" to ErrorFieldSetMapper(),
"ABORT*" to AbortFieldSetMapper(),
"COLLECT*" to CollectFieldSetMapper(),
)
)
}
@Bean
fun errorLineTokenizer(): DelimitedLineTokenizer =
DelimitedLineTokenizer(",").apply {
setNames("type", "application", "errorType", "timestamp", "message", "resourceUsage", "logPath")
}
// ...
class ErrorFieldSetMapper : FieldSetMapper<SystemLog> {
override fun mapFieldSet(fieldSet: FieldSet): SystemLog =
SystemLog.Error(
type = fieldSet.readString("type"),
timestamp = fieldSet.readString("timestamp"),
application = fieldSet.readString("application"),
errorType = fieldSet.readString("errorType"),
message = fieldSet.readString("message"),
resourceUsage = fieldSet.readString("resourceUsage"),
logPath = fieldSet.readString("logPath"),
)
}
// ...
MultiResourceItemReader
ItemReader 구현체MultiResourceItemReader가 실제로 파일 읽기를 수행하지 않는다.ItemReader가 수행한다.MultiResourceItemReaderBuilder 전용 빌더를 통해 생성할 수 있다. @Bean
@StepScope
fun multiSystemFailureItemReader(
@Value("#{jobParameters['inputFilePath']}") inputFilePath: String,
): MultiResourceItemReader<SystemFailure> =
MultiResourceItemReaderBuilder<SystemFailure>()
.name("multiSystemFailureItemReader")
.resources( // 읽을 대상 Resource 목록을 배열 형태로 구성
ClassPathResource("$inputFilePath/critical-failures.csv"),
ClassPathResource("$inputFilePath/normal-failures.csv"),
).delegate(systemFailureReader()) // 위임 ItemReader 지정
.build()
comparator를 활용할 수 있다..comparator { r1, r2 -> r2.filename!!.compareTo(r1.filename!!) }
.build()
delegate로 지정한 위임 대상 ItemReader는 다른 파일 읽기 ItemReader와 다르게 resource 정보를 파라미터로 받지 않는다.
MultiResourceItemReader가 직접 지정해주기 때문 @Bean
fun systemFailureReader(): FlatFileItemReader<SystemFailure> =
FlatFileItemReaderBuilder<SystemFailure>()
.name("systemFailureReader")
// ...