ItemReaderbatchSize 설정에 따라 다음 데이터가 조회된다.MongoCursorItemReader의 최적화
read() 메서드 호출 시 내부 버퍼를 활용해 비어있을 때만 DB에 요청batchSize만큼의 도큐먼트를 확보MongoCursorItemReader
│
├────── MongoTemplate
│ └─ (MongoDB 작업의 핵심 엔진)
│
├────── Query (org.springframework.data.mongodb.core.query.Query)
│ └─ (MongoDB 쿼리 조건 정의)
│
└────── Cursor
└─ (MongoDB 데이터를 순차적으로 읽어오는 스트리밍 객체)
MongoTemplate
MongoTemplate으로 데이터를 스트리밍으로 조회한다.Query
doOpen()에 최초로 쿼리를 생성, 커서를 얻는다.Cursor
read() 메서드가 호출될 때마다 데이터를 순차적으로 반환MongoCursorItemReader 처리 절차
read() 호출Document 반환read() 호출Document 요청 → Document 일괄 전송 (batchSize 단위)Document 반환@Bean
@StepScope
public MongoCursorItemReader<SecurityLog> securityLogReader(
@Value("#{jobParameters['searchDate']}") LocalDate searchDate
) {
Date startOfDay = Date.from(searchDate.atStartOfDay(ZoneId.systemDefault()).toInstant());
Date endOfDay = Date.from(searchDate.plusDays(1).atStartOfDay(ZoneId.systemDefault()).toInstant());
return new MongoCursorItemReaderBuilder<SecurityLog>()
.name("securityLogReader")
.template(mongoTemplate)
.collection("security_logs")
.jsonQuery("""
{
"label": "PENDING_ANALYSIS",
"timestamp": {
"$gte": ?0,
"$lt": ?1
}
}
""")
.parameterValues(List.of(startOfDay, endOfDay))
.sorts(Map.of("timestamp", Sort.Direction.ASC))
.targetType(SecurityLog.class)
.batchSize(10)
.build();
}
ItemReaderskip()과 limit() 연산자를 사용MongoCursorItemReader와 달리 매 페이지마다 새로운 쿼리를 실행한다.@Bean
@StepScope
public MongoPagingItemReader<SecurityLog> securityLogReader(
@Value("#{jobParameters['searchDate']}") LocalDate searchDate
) {
Query query = new Query()
.addCriteria(Criteria.where("label").is("PENDING_ANALYSIS"))
.addCriteria(Criteria.where("timestamp")
.gte(Date.from(searchDate.atStartOfDay(ZoneId.systemDefault()).toInstant()))
.lt(Date.from(searchDate.plusDays(1).atStartOfDay(ZoneId.systemDefault()).toInstant())))
.with(Sort.by(Sort.Direction.ASC, "timestamp"));
return new MongoPagingItemReaderBuilder<SecurityLog>()
.name("securityLogReader")
.template(mongoTemplate)
.collection("security_logs")
.query(query)
.sorts(Map.of("timestamp", Sort.Direction.ASC)) // 빌더의 버그 때문에 필요
.targetType(SecurityLog.class)
.pageSize(10)
.build();
}
MongoTemplate을 사용해 청크의 아이템을 추가/수정/삭제한다.bulkWrite는 JDBC의 batchUpdate와 유사한 컨셉으로 작동한다.bulkWrite를 호출한다.// bulkWrite에는 몽고DB에서 지원하는 다양한 쓰기 연산을 사용할 수 있다.
// 하지만 MongoItemWriter는 아래 세 가지 연산만 사용한다.
db.collection.bulkWrite(
[
{ insertOne : <document> },
{ replaceOne : <document> },
{ deleteMany : <document> }
]
)
MongoItemWriter
│
├────── MongoTemplate
│ └─ (MongoDB 작업을 수행하는 핵심 컴포넌트)
│
├────── collection
│ └─ (데이터를 저장할 MongoDB 컬렉션명)
│
└────── mode
└─ (INSERT/UPSERT/REMOVE - 도큐먼트 처리 방식 - 쓰기 연산 결정에 사용)
mode - MongoItemWriter가 도큐먼트를 어떻게 처리할지 결정
INSERT: 도큐먼트 추가 (insertOne)UPSERT: 기존 도큐먼트 수정, 존재하지 않으면 추가 (기본값) (replaceOne)REMOVE: 도큐먼트 삭제 (deleteMany)bulkWrite 도중 하나의 작업이 실패하면 후속 작업들을 실행되지 않는다.
MongoTransactionManaager
private final PlatformTransactionManager mongoTransactionManager;
public HackerPatternDetectionJob(
JobRepository jobRepository,
MongoTemplate mongoTemplate)
{
...
this.mongoTransactionManager = new MongoTransactionManager(mongoTemplate.getMongoDatabaseFactory());
}
...
// Step의 트랜잭션 매니저로 MongoTransactionManager 사용
<SecurityLog, SecurityLog>chunk(10, mongoTransactionManager)