TIL

JPA ItemReader / ItemWriter

JpaCursorItemReader

JpaCursorItemReader 해부

JpaCursorItemReader
    │
    ├────── queryString(JPQL) or JpaQueryProvider  
    │        └─ (Query 생성에 사용됨)  
    │
    ├────── EntityManager  
    │        └─ (JPA 핵심 엔진)
    │        
    └────── Query  
             └─ (EntityManager가 생성하는 실행 가능한 쿼리 인스턴스)

JpaCursorItemReader 구성 예제

@Bean
@StepScope
fun postBlockReader(
    @Value("#{jobParameters['startDateTime']}") startDateTime: LocalDateTime,
    @Value("#{jobParameters['endDateTime']}") endDateTime: LocalDateTime,
): JpaCursorItemReader<Post> =
    JpaCursorItemReaderBuilder<Post>()
        .name("postBlockReader")
        .entityManagerFactory(entityManagerFactory)
        .queryString(
            """
                SELECT p FROM Post p JOIN FETCH p.reports r
                WHERE r.reportedAt >= :startDateTime AND r.reportedAt < :endDateTime
            """.trimIndent()
        )
        .parameterValues(
            mapOf(
                "startDateTime" to startDateTime,
                "endDateTime" to endDateTime
            )
        )
        .build()

JpaQueryProvider를 사용한 쿼리 설정

// JpaNamedQueryProvider 예제

@Entity
@Table(name = "posts")
@NamedQuery(
    name = "Post.findByReportsReportedAtBetween",
    query = "SELECT p FROM Post p JOIN FETCH p.reports r WHERE r.reportedAt >= :startDateTime AND r.reportedAt < :endDateTime",
)
class Post() { ... }

@Bean
@StepScope
fun postBlockReader(
    @Value("#{jobParameters['startDateTime']}") startDateTime: LocalDateTime,
    @Value("#{jobParameters['endDateTime']}") endDateTime: LocalDateTime,
): JpaCursorItemReader<Post> =
    JpaCursorItemReaderBuilder<Post>()
        .name("postBlockReader")
        .entityManagerFactory(entityManagerFactory)
        .queryProvider(createQueryProvider())
        .parameterValues(
            mapOf(
                "startDateTime" to startDateTime,
                "endDateTime" to endDateTime
            )
        )
        .build()
        
private fun createQueryProvider(): JpaNamedQueryProvider<Post> {
    val queryProvider = JpaNamedQueryProvider<Post>()
    queryProvider.setEntityClass(Post::class.java)
    queryProvider.setNamedQuery("Post.findByReportsReportedAtBetween")
    return queryProvider
}

JpaPagingItemReader

offset 기반 페이징

JpaPagingItemReader 해부

JpaPagingItemReader 
    │ 
    ├────── queryString(JPQL) or JpaQueryProvider 
    │        └─ (Query 생성에 사용됨) 
    │ 
    ├────── EntityManager 
             └─ (JPA 핵심 엔진)

JpaPagingItemReader 예제

@Bean
@StepScope
fun postBlockReader(
    @Value("#{jobParameters['startDateTime']}") startDateTime: LocalDateTime,
    @Value("#{jobParameters['endDateTime']}") endDateTime: LocalDateTime,
): JpaPagingItemReader<Post> =
    JpaPagingItemReaderBuilder<Post>()
        .name("postBlockReader")
        .entityManagerFactory(entityManagerFactory)
        .queryString(
            """
            SELECT DISTINCT p FROM Post p
            JOIN p.reports r
            WHERE r.reportedAt >= :startDateTime AND r.reportedAt < :endDateTime
            ORDER BY p.id ASC
            """.trimIndent(),
        ).parameterValues(
            mapOf(
                "startDateTime" to startDateTime,
                "endDateTime" to endDateTime,
            ),
        ).pageSize(5)
        .build()

일대다 관계에서의 BatchSize 사용 시, FetchTypeEAGER로 변경해야 IN 절 쿼리를 통한 일괄 조회가 가능하다. 이는 JpaPagingItemReader에서 데이터를 읽고, 실제로 Report 객체가 필요한 ItemProcessor에 넘기기 전에 커밋을 해버리기 때문이다. BatchSize는 트랜잭션 범위 내에서만 동작하므로 이 시점에선 N+1이 발생한다. 때문에 EAGER 전략을 통해 ItemReader 트랜잭션 안에 있을 때 미리 페치를 끝내놔야 한다.

transacted 필드와 시스템 안정성 확보

if (transacted) {
    tx = entityManager.getTransaction();
    tx.begin();
    entityManager.flush(); // 잠재적 위험
    entityManager.clear();
} // end if

JpaItemWriter

@Bean
fun postBlockWriter(): JpaItemWriter<BlockedPost> {
    return JpaItemWriterBuilder<BlockedPost>()
        .entityManagerFactory(entityManagerFactory)
        .usePersist(true) // persist를 하도록 설정
        .build()
}

IDENTITY 전략 사용 시 배치 처리 제약 사항