TIL

ItemStream

// MultiResourceItemReader
private ResourceAwareItemReaderItemStream<? extends T> delegate;

// MultiResourceItemWriter
private ResourceAwareItemWriterItemStream<? super T> delegate;

// CompositeItemReader
private final List<ItemStreamReader<? extends T>> delegates;
public interface ItemStream {
  default void open(ExecutionContext executionContext) throws ItemStreamException {}

  default void update(ExecutionContext executionContext) throws ItemStreamException {}

  default void close() throws ItemStreamException {}
}

자원 초기화 및 해제

FlatFileItemReader.doOpen()

@Override
protected void doOpen() throws Exception {
  if (!resource.exists()) { // 읽을 파일 존재 여부 확인
    if (strict) {
      throw new IllegalStateException("Input resource must exist (reader is in 'strict' mode): " + resource);
    }
    logger.warn("Input resource does not exist " + resource.getDescription());
    return;
  }
// ...
  reader = bufferedReaderFactory.create(resource, encoding); // 데이터를 읽어들일 BufferedReader 준비
}
@Override
protected void doClose() throws Exception {
    // ...
    reader.close(); // BufferedReader.close()를 호출하여 파일을 닫음
    // ...
}

JdbcCursorItemReader.doOpen

@Override
protected void doOpen() throws Exception {
   // ...
   // DB 연결 초기화 -> this.con = dataSource.getConnection();
   initializeConnection(); 
   openCursor(con); // 커서를 열어 데이터를 읽을 준비
   // ... 
}
protected void doClose() throws Exception {
  // ...
  JdbcUtils.closeResultSet(this.rs); // ResultSet 닫기
  rs = null;
  cleanupOnClose(con); // 커서 close
  // ...
  JdbcUtils.closeConnection(this.con); // DB 커넥션 닫기
}
@Override
protected void doOpen() throws Exception {
   // ...
   // DB 연결 초기화 -> this.con = dataSource.getConnection();
   initializeConnection(); 
   openCursor(con); // 커서를 열어 데이터를 읽을 준비
   // ... 
}
protected void doClose() throws Exception {
    // ...
    JdbcUtils.closeResultSet(this.rs); // ResultSet 닫기
    rs = null;
    cleanupOnClose(con); // 커서 close
    // ...
    JdbcUtils.closeConnection(this.con); // DB 커넥션 닫기
}

메타데이터 관리 및 상태 추적

open() (실행 정보 복원)

public void open(ExecutionContext executionContext) throws ItemStreamException {
   // ...
   if (itemCount > 0 && itemCount < maxItemCount) {
       try {
           jumpToItem(itemCount);
       }
       catch (Exception e) {
           throw new ItemStreamException("Could not move to stored position on restart", e);
       }
   }
   currentItemCount = itemCount;
}
@Override
protected void jumpToItem(int itemIndex) throws Exception {
   if (driverSupportsAbsolute) {
      //...
      rs.absolute(itemIndex); // ResultSet.absolute()
      //...
   }
   else {
      moveCursorToRow(itemIndex);
   }
}
@Override
protected void jumpToItem(int itemIndex) throws Exception {
   this.lock.lock();
   try {
      page = itemIndex / pageSize;
      current = itemIndex % pageSize;
   }
   finally {
      this.lock.unlock();
   }
   // ...
}

update() (상태 저장)

default void update(ExecutionContext executionContext) throws ItemStreamException {
}
@Override
public void update(ExecutionContext executionContext) throws ItemStreamException {
    // ...
    executionContext.putInt(getExecutionContextKey(READ_COUNT), currentItemCount);

    if (maxItemCount < Integer.MAX_VALUE) {
        executionContext.putInt(getExecutionContextKey(READ_COUNT_MAX), maxItemCount);
    }
    // ...
}

재시작 불가 사례: RedisItemReader

SCAN- 대량의 key를 효율적으로 탐색하기 위한 용도로, 서버를 블로킹하지 않고 점진적으로 키를 반환하는 명령어 (KEYS 명령어는 전체 키를 검색하지만 서버를 블로킹할 수 있다.)

ItemStream의 위임 구조

Override
public void open(ExecutionContext executionContext) throws ItemStreamException {
    for (ItemStreamReader<? extends T> delegate : delegates) {
       delegate.open(executionContext);
    }
}

@Override
public void update(ExecutionContext executionContext) throws ItemStreamException {
    if (this.currentDelegate != null) {
        this.currentDelegate.update(executionContext);
    }
}

@Override
public void close() throws ItemStreamException {
    for (ItemStreamReader<? extends T> delegate : delegates) {
        delegate.close();
    }
}

다른 위임 패턴들의 ItemStream 구현

@Bean
public Step systemLogProcessingStep() {
    return new StepBuilder("systemLogProcessingStep", jobRepository)
            .<SystemLog, SystemLog>chunk(10, transactionManager)
            .reader(systemLogProcessingReader())
            // 위임 대상으로 criticalLogWrite와 normalLogWriter를 가지는 ClassifierCompositeItemWriter
            .writer(classifierWriter())
            .stream(criticalLogWriter()) // ItemStream 구현체 직접 전달
            .stream(normalLogWriter()) // ItemStream 구현체 직접 전달
            .build();
}

ItemStream 자동 등록 메커니즘

@StepScope 메서드 반환 타입에 따른 ItemStream 누락

@Bean
@StepScope
public ItemReader<SuspiciousDevice> cafeSquadReader() {
    // ...
}
@Bean
@StepScope
public MongoCursorItemReader<SuspiciousDevice> cafeSquadReader() {
    ...
}