FlatFileItemReader
FlatFileItemWriter
read()
메서드 동작은 크게 두 단계로 이루어진다.
// FlatFileItemReader.doRead()
// ...
String line = readLine();
// ...
return lineMapper.mapLine(line, lineCount);
LineMapper
라는 컴포넌트가 핵심을 담당한다.
RowMapper
와 유사public interface LineMapper<T> {
T mapLine(String line, int lineNumber) throws Exception;
}
DefaultLineMapper
를 제공한다.DefaultLineMapper
동작도 크게 두 단계로 이루어진다.
@Override
public T mapLine(String line, int lineNumber) throws Exception {
FieldSet fieldSet = tokenizer.tokenize(line); // 1단계: 토큰화
return fieldSetMapper.mapFieldSet(fieldSet); // 2단계: 객체 매핑
}
LineTokenizer
LineTokenizer
컴포넌트가 사용된다.LineTokenizer
구현체는 대표적으로 다음 두 가지가 있다.
DelimitedLineTokenizer
- 쉼표(,
) 구분자로 데이터를 토큰화
ERR001,2024-01-19,CRITICAL
-> ["ERR001", "2024-01-19", "CRITICAL"]
FixedLengthTokenizer
- 고정된 길이로 구분된 데이터를 토큰화
ERR00120240119CRITICAL
-> ["ERR001", "20240119", "CRITICAL"]
(각 6, 8, 8자리)FlatFileItemReader
구성 시점에 선택 가능하다.FieldSet
객체로 만들어 변환한다.public class DefaultFieldSet implements FieldSet {
private final String[] tokens; // 토큰화된 데이터
private List<String> names; // 각 데이터를 객체의 어떤 프로퍼티에 매핑할지 나타내는 프로퍼티 이름 목록
// ...
}
"ERR001,2024-01-19 10:15:23,CRITICAL,1234,SYSTEM_CRASH”
tokens
: ["ERR001", "2024-01-19 10:15:23", "CRITICAL", "1234", "SYSTEM_CRASH"]
names
: ["errorId", "errorDateTime", "severity", "processId", "errorMessage"]
FieldSetMapper
FieldSetMapper
가 사용된다.public interface FieldSetMapper<T> {
T mapFieldSet(FieldSet fieldSet) throws BindException;
}
BeanWrapperFieldSetMapper
가 사용된다.
setter
메서드를 호출해서 데이터를 설정한다.FieldSet
필드 이름과 객체 프로퍼티 이름이 일치해야 한다.@Bean
fun systemFailureStep(
systemFailureItemReader: FlatFileItemReader<SystemFailure>,
systemFailureStdoutItemWriter: SystemFailureStdoutItemWriter,
): Step {
return StepBuilder("systemFailureStep", jobRepository)
.chunk<SystemFailure, SystemFailure>(10, transactionManager)
.reader(systemFailureItemReader)
.writer(systemFailureStdoutItemWriter)
.build()
}
chunk
의 제네릭 타입을 SystemFailure
, 즉 변환할 객체로 설정한다.// 기본 생성자가 필요하여 모두 기본값을 설정
// setter가 필요하여 var로 선언
data class SystemFailure(
var errorId: String = "",
var errorDateTime: String = "",
var severity: String = "",
var processId: Int = 0,
var errorMessage: String = "",
)
FlatFileItemReaderBuilder
라는 전용 빌더 클래스를 사용해 구성@Bean
@StepScope
fun systemFailureItemReader(
@Value("#{jobParameters['inputFile']}") inputFile: String,
): FlatFileItemReader<SystemFailure> {
return FlatFileItemReaderBuilder<SystemFailure>()
.name("systemFailureItemReader")
.resource(ClassPathResource(inputFile))
.delimited()
.delimiter(",")
.names(
"errorId",
"errorDateTime",
"severity",
"processId",
"errorMessage"
)
.targetType(SystemFailure::class.java)
.linesToSkip(1)
.build()
}
name()
: ItemReader
식별자 지정
resource()
: 읽어 들일 Resource
를 지정delimited()
파일 형식 지정
delimited
를 호출하면 DefaultLineMapper
가 사용되어 DelimitedLineTokenizer
가 지정된다.delimiter()
: 구분자 지정
names()
: 프로퍼티 매핑targetTypes()
: 매핑 대상 클래스 지정
FieldSetMapper
구현체인 BeanWrapperFieldSetMapper
가 사용된다.BeanWrapperFieldSetMapper
가 새 인스턴스를 생성하려면 tartetType
설정이 필요하다.linesToSkip()
: 헤더 처리
linesToSkip(1)
)strict()
: 파일 검증 강도 설정
true
로 파일 누락 시 배치를 중단한다.false
면 파일이 없어도 경고만 남기고 진행한다.
read()
메서드에서 null
반환@Bean
@StepScope
fun systemFailureItemReader(
@Value("#{jobParameters['inputFile']}") inputFile: String,
): FlatFileItemReader<SystemFailure> {
return FlatFileItemReaderBuilder<SystemFailure>()
.name("systemFailureItemReader")
.resource(ClassPathResource(inputFile))
.fixedLength()
.columns(
Range(1, 8), // errorId: ERR001 + 공백 2칸
Range(9, 29), // errorDateTime: 날짜시간 + 공백 2칸
Range(30, 39), // severity: CRITICAL/FATAL + 패딩
Range(40, 45), // processId: 1234 + 공백 2칸
Range(46, 66) // errorMessage: 메시지 + \n
)
.names("errorId", "errorDateTime", "severity", "processId", "errorMessage")
.targetType(SystemFailure::class.java)
.build()
}
fixedLength()
: 고정 길이 형식임을 알리는 설정
LineTokenizer
구현체로 FixedLengthTokenizer
가 지정된다.columns()
: Range
배열로 각 필드의 시작과 끝을 정의strict()
names()
, targetType()
BeanWrapperFieldSetMapper
은 기본 타입 외엔 변환을 지원하지 않는다.customEditors()
메서드로 커스텀 PropertyEditor
를 등록할 수 있다.@Bean
@StepScope
fun systemFailureItemReader(
@Value("#{jobParameters['inputFile']}") inputFile: String,
): FlatFileItemReader<SystemFailure> {
return FlatFileItemReaderBuilder<SystemFailure>()
// ...
.customEditors(mapOf(LocalDateTime::class.java to dateTimeEditor()))
.build()
private fun dateTimeEditor(): PropertyEditor {
return object : PropertyEditorSupport() {
override fun setAsText(text: String) {
val formatter = DateTimeFormatter.ofPattern("yyyy-MM-dd HH:mm:ss")
setValue(LocalDateTime.parse(text, formatter))
}
}
}
[WARNING][Thread-156][CPU: 78%] Thread pool saturation detected - 45/50 threads in use…
RegexLineTokenizer
는 정규식을 활용한 토큰 파싱 도구다.@Bean
@StepScope
fun logItemReader(
@Value("#{jobParameters['inputFile']}") inputFile: String,
): FlatFileItemReader<LogEntry> =
FlatFileItemReaderBuilder<LogEntry>()
.name("logItemReader")
.resource(ClassPathResource(inputFile))
.lineTokenizer(
RegexLineTokenizer().apply {
setRegex("\\[\\w+\\]\\[Thread-(\\d+)\\]\\[CPU: \\d+%\\] (.+)")
}
)
.fieldSetMapper { fieldSet: FieldSet ->
LogEntry(
fieldSet.readString(0),
fieldSet.readString(1)
)
}
.build()
data class LogEntry(
var threadNum: String = "",
var message: String = "",
)
lineTokenizer
등록
.lineTokenizer()
로 구성해주면 등록된다.filedSetMapper
등록
FlatFileItemReaderBuilder
는 커스텀 filedSetMapper
도 등록할 수 있다..targetType()
메서드를 호출하면 안 된다.RegexLineTokenizer
는 하나의 형식만 지원하기에 파일의 모든 라인이 해당 정규식을 따르는 경우 유용하다.
PatternMatchingCompositeLineMapper
LineTokenizer
와 FieldSetMapper
를 적용할 수 있다.각 로그 타입별로 아래와 같은 형식을 지원한다고 가정하자
// ERROR,mysql-prod,OOM,2024-01-24T09:30:00,heap space killing spree,85%,/var/log/mysql
// ABORT,spring-batch,MemoryLeak,2024-01-24T10:15:30,forced termination,-1,/usr/apps/batch,TERMINATED
// COLLECT,heap-dump,PID-9012,2024-01-24T11:00:15,/tmp/heapdump
sealed interface SystemLog {
val type: String
val timestamp: String
data class Error(
override val type: String,
override val timestamp: String,
val application: String,
val errorType: String,
val message: String,
val resourceUsage: String,
val logPath: String,
) : SystemLog
data class Abort(
override val type: String,
override val timestamp: String,
val application: String,
val errorType: String,
val message: String,
val exitCode: String,
val processPath: String,
val status: String,
) : SystemLog
data class Collect(
override val type: String,
override val timestamp: String,
val dumpType: String,
val processId: String,
val dumpPath: String,
) : SystemLog
}
LineTokenizer
, FieldSetMapper
를 한 번에 설정한 LineMapper
를 설정할 수 있다. @Bean
@StepScope
fun systemLogReader(
@Value("#{jobParameters['inputFile']}") inputFile: String,
): FlatFileItemReader<SystemLog> {
return FlatFileItemReaderBuilder<SystemLog>()
// ...
.lineMapper(systemLogLineMapper())
.build()
}
systemLogLineMapper
빈 정의는 아래와 같다.
setTokenizers
, setFieldSetMappers
을 통해 각 패턴별 필요한 구현체들을 설정한다.@Bean
fun systemLogLineMapper(): PatternMatchingCompositeLineMapper<SystemLog> =
PatternMatchingCompositeLineMapper<SystemLog>()
.apply {
setTokenizers(
mapOf(
"ERROR*" to errorLineTokenizer(),
"ABORT*" to abortLineTokenizer(),
"COLLECT*" to collectLineTokenizer(),
)
)
setFieldSetMappers(
mapOf(
"ERROR*" to ErrorFieldSetMapper(),
"ABORT*" to AbortFieldSetMapper(),
"COLLECT*" to CollectFieldSetMapper(),
)
)
}
@Bean
fun errorLineTokenizer(): DelimitedLineTokenizer =
DelimitedLineTokenizer(",").apply {
setNames("type", "application", "errorType", "timestamp", "message", "resourceUsage", "logPath")
}
// ...
class ErrorFieldSetMapper : FieldSetMapper<SystemLog> {
override fun mapFieldSet(fieldSet: FieldSet): SystemLog =
SystemLog.Error(
type = fieldSet.readString("type"),
timestamp = fieldSet.readString("timestamp"),
application = fieldSet.readString("application"),
errorType = fieldSet.readString("errorType"),
message = fieldSet.readString("message"),
resourceUsage = fieldSet.readString("resourceUsage"),
logPath = fieldSet.readString("logPath"),
)
}
// ...
MultiResourceItemReader
ItemReader
구현체MultiResourceItemReader
가 실제로 파일 읽기를 수행하지 않는다.ItemReader
가 수행한다.MultiResourceItemReaderBuilder
전용 빌더를 통해 생성할 수 있다. @Bean
@StepScope
fun multiSystemFailureItemReader(
@Value("#{jobParameters['inputFilePath']}") inputFilePath: String,
): MultiResourceItemReader<SystemFailure> =
MultiResourceItemReaderBuilder<SystemFailure>()
.name("multiSystemFailureItemReader")
.resources( // 읽을 대상 Resource 목록을 배열 형태로 구성
ClassPathResource("$inputFilePath/critical-failures.csv"),
ClassPathResource("$inputFilePath/normal-failures.csv"),
).delegate(systemFailureReader()) // 위임 ItemReader 지정
.build()
comparator
를 활용할 수 있다..comparator { r1, r2 -> r2.filename!!.compareTo(r1.filename!!) }
.build()
delegate
로 지정한 위임 대상 ItemReader
는 다른 파일 읽기 ItemReader
와 다르게 resource
정보를 파라미터로 받지 않는다.
MultiResourceItemReader
가 직접 지정해주기 때문 @Bean
fun systemFailureReader(): FlatFileItemReader<SystemFailure> =
FlatFileItemReaderBuilder<SystemFailure>()
.name("systemFailureReader")
// ...