ItemStream
- 앞서 다룬
ItemReader, ItemWriter들 중에선 실제 데이터 처리를 다른 컴포넌트에게 위임하는 것들이 있었다.
MultiResourceItemReader, MultiResourceItemWriter, CompositeItemReader, CompositeItemWriter
- 각 위임자 내부 코드엔
delegate 필드가 존재한다.
// MultiResourceItemReader
private ResourceAwareItemReaderItemStream<? extends T> delegate;
// MultiResourceItemWriter
private ResourceAwareItemWriterItemStream<? super T> delegate;
// CompositeItemReader
private final List<ItemStreamReader<? extends T>> delegates;
- 위임 대상들은 모두 단순
ItemReader/ItemWriter가 아닌 ItemStream을 사용하고 있다.
- 실제로 Spring Batch 대부분의 구현체들은
ItemStream 인터페이스를 공통적으로 구현하고 있다.
- 대부분의
ItemReader는 ItemStream
- 하지만
ItemWriter의 경우 파일 기반 구현체를 제외하고는 대부분 ItemStream이 아니다.
public interface ItemStream {
default void open(ExecutionContext executionContext) throws ItemStreamException {}
default void update(ExecutionContext executionContext) throws ItemStreamException {}
default void close() throws ItemStreamException {}
}
ItemStream은 다음 두 역할을 담당한다.
- 자원 초기화 및 해제
- 메타데이터 관리 및 상태 추적
자원 초기화 및 해제
- 자원 관리의 중요성
- 배치 작업은 파일, db 커넥션, 메모리 버퍼 등 다양한 자원을 다룬다.
- 이러한 자원들은 반드시 적절한 사전 준비와 사용 후 정리가 필요하다.
- 자원을 적절히 해제하지 않으면 치명적 문제가 발생한다.
- 파일 핸들 누수
- db 커넥션 누수
- 메모리 누수
- 이러한 자원 초기화와 해제는
ItemStream의 open(), close() 메서드를 통해 처리된다.
- Spring Batch에선 스텝이
open, close 메서드를 호출한다.
FlatFileItemReader.doOpen()
- 부모 클래스인
AbstractItemCountingItemStreamItemReader의 open() 메서드에서 doOpen 호출
@Override
protected void doOpen() throws Exception {
if (!resource.exists()) { // 읽을 파일 존재 여부 확인
if (strict) {
throw new IllegalStateException("Input resource must exist (reader is in 'strict' mode): " + resource);
}
logger.warn("Input resource does not exist " + resource.getDescription());
return;
}
// ...
reader = bufferedReaderFactory.create(resource, encoding); // 데이터를 읽어들일 BufferedReader 준비
}
@Override
protected void doClose() throws Exception {
// ...
reader.close(); // BufferedReader.close()를 호출하여 파일을 닫음
// ...
}
JdbcCursorItemReader.doOpen
- 부모 클래스인
AbstractItemCountingItemStreamItemReader의 open() 메서드에서 doOpen 호출
@Override
protected void doOpen() throws Exception {
// ...
// DB 연결 초기화 -> this.con = dataSource.getConnection();
initializeConnection();
openCursor(con); // 커서를 열어 데이터를 읽을 준비
// ...
}
protected void doClose() throws Exception {
// ...
JdbcUtils.closeResultSet(this.rs); // ResultSet 닫기
rs = null;
cleanupOnClose(con); // 커서 close
// ...
JdbcUtils.closeConnection(this.con); // DB 커넥션 닫기
}
JdbcCursorItemReader.doOpen
- 부모 클래스인
AbstractItemCountingItemStreamItemReader의 open() 메서드에서 doOpen 호출
@Override
protected void doOpen() throws Exception {
// ...
// DB 연결 초기화 -> this.con = dataSource.getConnection();
initializeConnection();
openCursor(con); // 커서를 열어 데이터를 읽을 준비
// ...
}
protected void doClose() throws Exception {
// ...
JdbcUtils.closeResultSet(this.rs); // ResultSet 닫기
rs = null;
cleanupOnClose(con); // 커서 close
// ...
JdbcUtils.closeConnection(this.con); // DB 커넥션 닫기
}
메타데이터 관리 및 상태 추적
ItemStream은 스텝의 실행 정보를 관리(저장, 복구)하는 역할도 맡는다.
- 메타데이터 관리가 필요한 이유
- 배치 작업의 예상치 못한 실패 시 적절한 재처리를 위해 필요
- 메타데이터가 없다면 실패 시 처음부터 다시 처리해야할 것이다.
open() (실행 정보 복원)
open() 메서드는 작업을 실패한 지점부터 이어할 수 있도록 상태를 복원한다.
ExecutionContext를 입력으로 받는다.
- 이전 스텝 실행 정보 포함
- 처음 실행되는 경우엔 빈 상태로 전달된다.
FlatFileItemReader의 부모 클래스인 AbstractItemCountingItemStreamItemReader의 open()
ExecutionContext에서 maxItemCount와 itemCount를 읽어온다.
maxItemCount - 최대 몇 개 아이템을 읽을지 설정 값
itemCount - 이전 실행에서 몇 개 읽었는지에 대한 정보
- 이전 읽었던
itemCount만큼 jumpToItem() 메서드로 읽기 위치를 이동시킨다.
jumpToItem() 호출 시엔 readLine()을 호출하여 순차적으로 읽어 나간다.
public void open(ExecutionContext executionContext) throws ItemStreamException {
// ...
if (itemCount > 0 && itemCount < maxItemCount) {
try {
jumpToItem(itemCount);
}
catch (Exception e) {
throw new ItemStreamException("Could not move to stored position on restart", e);
}
}
currentItemCount = itemCount;
}
JdbcCursorItemReader 또한 AbstractItemCountingItemStreamItemReader을 상속한다.
- 따라서
ExecutionContext에서 itemCount와 maxItemCount를 읽어오는 방식이 동일하다.
- 아래는
JdbcUrsorItemReader의 부모 클래스인 AbstractCursorItemReader의 jumpToItem() 구현이다.
@Override
protected void jumpToItem(int itemIndex) throws Exception {
if (driverSupportsAbsolute) {
//...
rs.absolute(itemIndex); // ResultSet.absolute()
//...
}
else {
moveCursorToRow(itemIndex);
}
}
driverSupportsAbsolute
JdbcCursorItemReaderBuilder에서 설정 가능하며 기본값은 false
true → ResultsSet.absolute()를 사용해 즉시 원하는 위치로 커서를 이동시킬 수 있다.
false → itemIndex만큼 ResultSet.next()를 순차적으로 호출하여 커서까지 이동시킨다.
- 하지만
JdbcCursorItemReader는 ResultSet.TYPE_FORWARD_ONLY를 사용한다.
ResultSet.TYPE_FORWARD_ONLY - 커서의 순방향 이동만 지원하는 설정
- JDBC 드라이버 지원 여부와 관계없이
ResultSet.absolute()를 사용할 수 없다.
- 따라서
driverSupportsAbsolute는 false로 유지하는 것이 안전하다.
JdbcPagingItemReader의 부모 클래스인 AbstractPagingItemReader의 jumpToItem은 효율적인 구현을 제공한다.
- 간단한 수식으로 어떤 위치부터 읽어야 하는지 즉시 계산할 수 있다.
@Override
protected void jumpToItem(int itemIndex) throws Exception {
this.lock.lock();
try {
page = itemIndex / pageSize;
current = itemIndex % pageSize;
}
finally {
this.lock.unlock();
}
// ...
}
update() (상태 저장)
update() 메서드는 현재 작업이 어디까지 진행되었는지를 저장한다.
- Spring batch의
Step은 여기서 메타데이터 저장소에 실행 정보를 저장한다.
- 트랜잭션 커밋 직전에
update()가 호출된다.
default void update(ExecutionContext executionContext) throws ItemStreamException {
}
- 아래는
AbstractItemCountingItemStreamItemReader의 update() 코드이다.
@Override
public void update(ExecutionContext executionContext) throws ItemStreamException {
// ...
executionContext.putInt(getExecutionContextKey(READ_COUNT), currentItemCount);
if (maxItemCount < Integer.MAX_VALUE) {
executionContext.putInt(getExecutionContextKey(READ_COUNT_MAX), maxItemCount);
}
// ...
}
currentItemCount, maxItemCount를 저장하여 open() 메서드에서 복원 시 사용한다.
재시작 불가 사례: RedisItemReader
RedisItemReader의 경우 재시작을 지원하지 않는다.
SCAN- 대량의 key를 효율적으로 탐색하기 위한 용도로, 서버를 블로킹하지 않고 점진적으로 키를 반환하는 명령어 (KEYS 명령어는 전체 키를 검색하지만 서버를 블로킹할 수 있다.)
ItemStream의 위임 구조
CompositeItemReader에 대해 복습하자면 데이터 읽기를 다른 ItemReader에 위임하는 구현체였다.
CompositeItemReader는 자신의 위임 대상 ItemReader에게 ItemStream의 open(), update(), close()를 전달(bypass)한다.
- 이 같은 위임이 가능한 이유는
ItemReader, ItemStream 모두 구현했기 때문
Override
public void open(ExecutionContext executionContext) throws ItemStreamException {
for (ItemStreamReader<? extends T> delegate : delegates) {
delegate.open(executionContext);
}
}
@Override
public void update(ExecutionContext executionContext) throws ItemStreamException {
if (this.currentDelegate != null) {
this.currentDelegate.update(executionContext);
}
}
@Override
public void close() throws ItemStreamException {
for (ItemStreamReader<? extends T> delegate : delegates) {
delegate.close();
}
}
- 사실 위 같은
close() 메서드는 문제가 존재한다.
delegates 중 하나의 close()라도 실패하면 즉시 중단되어 나머지 delegates의 자원이 정리되지 않는다.
- Spring Batch 5.2.2에서 이 문제가 수정되었다.
다른 위임 패턴들의 ItemStream 구현
- 다른 위임 패턴을 가지는
ItemReader나 ItemWriter들도 ItemStream을 구현하여 자원 관리를 위임 대상에게 전달(bypass)한다.
- ex)
CompositeItemWriter, MultiResourceItemReader, MultiResourceItemWriter
- 다만
ClassifierCompositeItemWriter는 ItemStream울 구현하지 않았다.
- 이 경우 Spring Batch가 위임 대상들의
open(), update(), close()를 호출하도록 별도 구성을 해줘야 한다.
- 아래 구성을 통해 Step에 의해
open(), update(), close()가 호출된다.
@Bean
public Step systemLogProcessingStep() {
return new StepBuilder("systemLogProcessingStep", jobRepository)
.<SystemLog, SystemLog>chunk(10, transactionManager)
.reader(systemLogProcessingReader())
// 위임 대상으로 criticalLogWrite와 normalLogWriter를 가지는 ClassifierCompositeItemWriter
.writer(classifierWriter())
.stream(criticalLogWriter()) // ItemStream 구현체 직접 전달
.stream(normalLogWriter()) // ItemStream 구현체 직접 전달
.build();
}
ItemStream 자동 등록 메커니즘
- 위 경우와 달리
ItemStream 구현체라면 별도로 stream() 메서드에 등록하지 않아도 된다.
- 스텝 빌더는
reader(), writer()에 컴포넌트를 전달하면 build 시점에 ItemStream을 구현하는지 검사한다.
- 컴포넌트가
ItemStream이라면 스텝 빌더는 관리할 ItemStream 목록에 자동 등록한다.
- 등록된
ItemStream 목록을 사용해 open(), update(), close()를 호출한다.
@StepScope 메서드 반환 타입에 따른 ItemStream 누락
@Bean
@StepScope
public ItemReader<SuspiciousDevice> cafeSquadReader() {
// ...
}
- 위 코드처럼
@StepScope를 사용할 때 인터페이스 타입을 반환하는 경우 예외가 발생한다.
- 실제로는
MongoCursorItemReader 구현체를 반환했다고 가정
java.lang.NullPointerException: Cannot invoke "org.springframework.data.util.CloseableIterator.hasNext()" because "this.cursor" is null
- 이는 Spring이 인터페이스 타입의 프록시를 만들 때 내부적으로 JDK Dynamic Proxy를 사용하기 때문이다.
- JDK Dynamic Proxy는 인터페이스에서 바로 프록시 객체를 생성한다.
MongoCursorItemReader가 ItemStream을 구현하더라도 프록시는 별개의 객체가 되어버린다.
- 즉 Spring Batch의 스텝이 이를
ItemStream으로 인식하지 않아 open(), update(), close()를 호출하지 않게 된다.
- 결과적으로
open()이 호출되지 않은채 doRead()가 호출되며 cursor에 NPE가 발생한 것이다.
@StepScope를 적용할 때 반드시 구체 클래스를 반환 타입으로 지정해야 한다.
@Bean
@StepScope
public MongoCursorItemReader<SuspiciousDevice> cafeSquadReader() {
...
}