JpaCursorItemReader는 내부적으로 entityManager를 통해 데이터를 읽는다.JpaCursorItemReader
│
├────── queryString(JPQL) or JpaQueryProvider
│ └─ (Query 생성에 사용됨)
│
├────── EntityManager
│ └─ (JPA 핵심 엔진)
│
└────── Query
└─ (EntityManager가 생성하는 실행 가능한 쿼리 인스턴스)
queryString
JpaCursorItemReaderBuilder.queryString() 메서드를 통해 생성JpaCursorItemReaderBuilder.queryProvider()로 커스텀 JpaQueryProvider 전달 가능EntityManager
JpaCursorItemReaderBuilder.entityManagerFactory()로 EntityManagerFactory를 전달하면 내부적으로 EntityManager를 생성해 사용된다.Query
JpaCursorItemReader가 EntityManager로 Query 객체를 생성getResultStream() 메서드를 호출JpaCursortemReader 초기화
doOpen() 호출Query 객체 생성getResultStream() 호출하여 커서를 순회할 Iterator 준비Iterator를 통해 실제 데이터를 한 건씩 읽어온다.
iterator.hasNext()로 데이터가 있다면 iterator.next()로 데이터 반환null 반환, 읽기 종료@Bean
@StepScope
fun postBlockReader(
@Value("#{jobParameters['startDateTime']}") startDateTime: LocalDateTime,
@Value("#{jobParameters['endDateTime']}") endDateTime: LocalDateTime,
): JpaCursorItemReader<Post> =
JpaCursorItemReaderBuilder<Post>()
.name("postBlockReader")
.entityManagerFactory(entityManagerFactory)
.queryString(
"""
SELECT p FROM Post p JOIN FETCH p.reports r
WHERE r.reportedAt >= :startDateTime AND r.reportedAt < :endDateTime
""".trimIndent()
)
.parameterValues(
mapOf(
"startDateTime" to startDateTime,
"endDateTime" to endDateTime
)
)
.build()
entityManagerFactory()
EntityManagerFactory 구성queryString()
parameterValues로 받은 파라미터와 함께 쿼리 정의JpaQueryProvider 구현체가 사용된다.JpaQueryProvider 구현체로 더 유연한 쿼리 처리가 가능하다.
JpaNamedQueryProvider: 엔티티 등에 미리 정의된 Named Query를 사용JpaNativeQueryProvider: Native SQL을 사용하여 데이터를 조회// JpaNamedQueryProvider 예제
@Entity
@Table(name = "posts")
@NamedQuery(
name = "Post.findByReportsReportedAtBetween",
query = "SELECT p FROM Post p JOIN FETCH p.reports r WHERE r.reportedAt >= :startDateTime AND r.reportedAt < :endDateTime",
)
class Post() { ... }
@Bean
@StepScope
fun postBlockReader(
@Value("#{jobParameters['startDateTime']}") startDateTime: LocalDateTime,
@Value("#{jobParameters['endDateTime']}") endDateTime: LocalDateTime,
): JpaCursorItemReader<Post> =
JpaCursorItemReaderBuilder<Post>()
.name("postBlockReader")
.entityManagerFactory(entityManagerFactory)
.queryProvider(createQueryProvider())
.parameterValues(
mapOf(
"startDateTime" to startDateTime,
"endDateTime" to endDateTime
)
)
.build()
private fun createQueryProvider(): JpaNamedQueryProvider<Post> {
val queryProvider = JpaNamedQueryProvider<Post>()
queryProvider.setEntityClass(Post::class.java)
queryProvider.setNamedQuery("Post.findByReportsReportedAtBetween")
return queryProvider
}
ItemReaderJpaPagingItemReader는 JdbcPagingItemReader와 달리 offset 기반으로 페이징을 수행한다.
SELECT * FROM victims ORDER BY id LIMIT 10 OFFSET 0JpaPagingItemReader
│
├────── queryString(JPQL) or JpaQueryProvider
│ └─ (Query 생성에 사용됨)
│
├────── EntityManager
└─ (JPA 핵심 엔진)
JpaCursorItemReader와 비슷하지만 읽기 방식에 근본적인 차이가 있다.
JpaCursorItemReader와 달리 매 doReadPage() 메서드마다 새로운 쿼리를 생성하고 실행한다.getResultList()를 통해 페이지 데이터를 가져온다.@Bean
@StepScope
fun postBlockReader(
@Value("#{jobParameters['startDateTime']}") startDateTime: LocalDateTime,
@Value("#{jobParameters['endDateTime']}") endDateTime: LocalDateTime,
): JpaPagingItemReader<Post> =
JpaPagingItemReaderBuilder<Post>()
.name("postBlockReader")
.entityManagerFactory(entityManagerFactory)
.queryString(
"""
SELECT DISTINCT p FROM Post p
JOIN p.reports r
WHERE r.reportedAt >= :startDateTime AND r.reportedAt < :endDateTime
ORDER BY p.id ASC
""".trimIndent(),
).parameterValues(
mapOf(
"startDateTime" to startDateTime,
"endDateTime" to endDateTime,
),
).pageSize(5)
.build()
pageSize()
LIMIT 절의 값으로 사용)queryString() 수정
JpaCursorItemReader 때와 다르게 fetch join 제거했다.
Post -* Report 관계처럼 일대다 관계에서 페치 조인과 limit/offset 페이징을 함께 사용하면 메모리 낭비가 발생한다.@BatchSize를 적용하면 지정된 수만큼 IN 절로 조회하는 최적화를 사용할 수 있다. (@BatchSize(size = 5))ORDER BY 추가
ORDER BY가 없으면 읽을 때마다 데이터 순서가 보장되지 않아 누락과 중복이 발생할 수 있다.일대다 관계에서의
BatchSize사용 시,FetchType을EAGER로 변경해야IN절 쿼리를 통한 일괄 조회가 가능하다. 이는JpaPagingItemReader에서 데이터를 읽고, 실제로Report객체가 필요한ItemProcessor에 넘기기 전에 커밋을 해버리기 때문이다.BatchSize는 트랜잭션 범위 내에서만 동작하므로 이 시점에선 N+1이 발생한다. 때문에 EAGER 전략을 통해ItemReader트랜잭션 안에 있을 때 미리 페치를 끝내놔야 한다.
BatchSize가 무효화되는 문제는 JpaPagingItemReader에서 transacted = true이기 때문에 발생한다.transacted가 true일 때 JpaPagingItemReader는 페이지를 읽기 전에 entityManager.flush()를 호출한다.
ItemProcessor에서 엔티티 변경이 생긴다면 다음 데이터를 읽어들이는 doReadPage() 시점에 의도치 않은 DB 변경이 발생할 수 있다.if (transacted) {
tx = entityManager.getTransaction();
tx.begin();
entityManager.flush(); // 잠재적 위험
entityManager.clear();
} // end if
JpaPagingItemReaderBuilder.transacted(false) 설정으로 해당 설정을 끌 수 있다.
detach) ItemPrcoessor에서의 lazy loading이 불가능해진다FetchType.EAGER가 더 적절할 때가 많다.PlatformTransactionManager의 구현체가 JpaTransactionManager로 변경된다.
JpaItemWriter가 동작할 수 있게 된다.JpaItemWriter는 단순하다.
@Bean
fun postBlockWriter(): JpaItemWriter<BlockedPost> {
return JpaItemWriterBuilder<BlockedPost>()
.entityManagerFactory(entityManagerFactory)
.usePersist(true) // persist를 하도록 설정
.build()
}
JpaItemWriterBuilder로 간단하게 구성할 수 있다.
persistmerge를 사용하면 된다. (userPsersist(false), false가 기본값이다.)INSERT 쿼리의 배치 처리를 할 수 없다.
INSERT를 먼저 실행해야 하기 때문