TIL

NoSQL 읽고 쓰기 - MongoDB

Document DB

MonogoDB 개요

MongoCursorItemReader

MongoCursorItemReader 해부

MongoCursorItemReader 
  │ 
  ├────── MongoTemplate 
  │         └─ (MongoDB 작업의 핵심 엔진)
  │ 
  ├────── Query (org.springframework.data.mongodb.core.query.Query) 
  │         └─ (MongoDB 쿼리 조건 정의) 
  │
  └────── Cursor 
             └─ (MongoDB 데이터를 순차적으로 읽어오는 스트리밍 객체)
@Bean
@StepScope
public MongoCursorItemReader<SecurityLog> securityLogReader(
        @Value("#{jobParameters['searchDate']}") LocalDate searchDate
) {
    Date startOfDay = Date.from(searchDate.atStartOfDay(ZoneId.systemDefault()).toInstant());
    Date endOfDay = Date.from(searchDate.plusDays(1).atStartOfDay(ZoneId.systemDefault()).toInstant());

    return new MongoCursorItemReaderBuilder<SecurityLog>()
            .name("securityLogReader")
            .template(mongoTemplate)
            .collection("security_logs")
            .jsonQuery("""
            {
                "label": "PENDING_ANALYSIS",
                "timestamp": {
                    "$gte": ?0,
                    "$lt": ?1
                }
            }
            """)
            .parameterValues(List.of(startOfDay, endOfDay))
            .sorts(Map.of("timestamp", Sort.Direction.ASC))
            .targetType(SecurityLog.class)
            .batchSize(10)
            .build();
}

MongoPagingItemReader

MongoPagingItemReader 구성 요소

@Bean
@StepScope
public MongoPagingItemReader<SecurityLog> securityLogReader(
    @Value("#{jobParameters['searchDate']}") LocalDate searchDate
) {
    Query query = new Query()
            .addCriteria(Criteria.where("label").is("PENDING_ANALYSIS"))
            .addCriteria(Criteria.where("timestamp")
                    .gte(Date.from(searchDate.atStartOfDay(ZoneId.systemDefault()).toInstant()))
                    .lt(Date.from(searchDate.plusDays(1).atStartOfDay(ZoneId.systemDefault()).toInstant())))
            .with(Sort.by(Sort.Direction.ASC, "timestamp"));

    return new MongoPagingItemReaderBuilder<SecurityLog>()
            .name("securityLogReader")
            .template(mongoTemplate)
            .collection("security_logs")
            .query(query)
            .sorts(Map.of("timestamp", Sort.Direction.ASC))  // 빌더의 버그 때문에 필요
            .targetType(SecurityLog.class)
            .pageSize(10)
            .build();
}

MongoItemWriter

bulkWrite

// bulkWrite에는 몽고DB에서 지원하는 다양한 쓰기 연산을 사용할 수 있다. 
// 하지만 MongoItemWriter는 아래 세 가지 연산만 사용한다.
db.collection.bulkWrite(
   [
      { insertOne : <document> },
      { replaceOne : <document> },
      { deleteMany : <document> }
   ]
)

MongoItemWriter 해부

MongoItemWriter
│
├────── MongoTemplate
│       └─ (MongoDB 작업을 수행하는 핵심 컴포넌트)
│
├────── collection
│       └─ (데이터를 저장할 MongoDB 컬렉션명)
│
└────── mode
        └─ (INSERT/UPSERT/REMOVE - 도큐먼트 처리 방식 - 쓰기 연산 결정에 사용)

MongoTransactionManager

private final PlatformTransactionManager mongoTransactionManager;

public HackerPatternDetectionJob(
        JobRepository jobRepository,
        MongoTemplate mongoTemplate) 
{
    ...
    this.mongoTransactionManager = new MongoTransactionManager(mongoTemplate.getMongoDatabaseFactory());
}

...
// Step의 트랜잭션 매니저로 MongoTransactionManager 사용
<SecurityLog, SecurityLog>chunk(10, mongoTransactionManager)