JdbcCursorItemReader)
JdbcPagingItemReader)
ResultSet으로 데이터를 순차적으로 가져오는 방식
JdbcCursorItemReader가 초기화되며 SQL을 실행read()가 호출될 때마다 ResultSet.next()를 호출JdbcCursorItemReader
│
├────── DataSource
│ └─ (DB 연결 관리)
│
├────── SQL
│ └─ (데이터 조회 쿼리)
│
├────── RowMapper
│ └─ (ResultSet → Java 객체 변환)
│
├────── PreparedStatement
│ └─ (쿼리 실행 및 결과 조회)
│
└────── PreparedStatementSetter (optional)
└─ (파라미터 동적 바인딩)
DataSource
rowMapper
ResultSet)을 객체로 변환FiledSet과 유사RowMapper
BeanPropertyRowMapper - 전통적인 자바빈 규약 setter 기반 매핑DataClassRowMapper - Java Record나 Kotlin Data Class 같은 불변 객체를 위한 구현체로 생성자 파라미터 기반 매핑을 수행RowMapper - 별도의 복잡한 변환 로직 필요 시 직접 구현PreparedStatement
ResultSet으로 가져오는 JDBC의 핵심 컴포넌트PreparedStatementSetter (선택 사항)
PreparedStatement에 동적으로 파라미터 값을 주입하는 역할을 수행graph TD
subgraph "read()"
rsNext["rs.next()"]
subgraph "RowMapper"
mapRow["mapRow(rs, rowNum)"]
end
returnVictim["return "]
rsNext -->|"readCursor()"| mapRow
mapRow --> returnVictim
end
subgraph "openCursor()"
SQL["SQL<br/>'SELECT * FROM victims ...'"]
PreparedStatementSetter["PreparedStatementSetter"]
SQL -- "sql 전달" --> executeQuery
PreparedStatementSetter -- "파라미터 바인딩" --> executeQuery
subgraph "PreparedStatement"
executeQuery["executeQuery()"]
end
subgraph "ResultSet 초기화"
rsInit["this.rs = executeQuery()"]
end
executeQuery --> rsInit
end
@Bean
fun terminatedVictimReader(): JdbcCursorItemReader<Victim> =
JdbcCursorItemReaderBuilder<Victim>()
.name("terminatedVictimReader")
.dataSource(dataSource)
.sql("SELECT * FROM victims WHERE status = ? AND terminated_at <= ?")
.queryArguments(listOf("TERMINATED", LocalDateTime.now()))
.dataRowMapper(Victim::class.java)
.build()
.dataSource()- 빈 주입 받은 DataSource를 주입.sql()- 쿼리를 전달
?로 마스킹.queryArguments()
sql() 메서드에 지정한 쿼리의 바인딩 파라미터값을 전달ArgumentPreparedStatementSetter가 사용됨.dataRowMapper()
RowMapper는 ResultSet을 객체로 변환하는 역할을 수행dataRowMapper를 사용setter 기반 객체 매핑에는 .beanRowMapper()를 사용하면 된다.ResultSet 내부 버퍼링
ResultSet 내부 버퍼에 저장해 둔다.IteamReader.read() → ResultSet.next() → 버퍼에서 데이터 확인 후 반환 또는 네트워크 통신 후 반환MySQL 드라이버는
useCursorFetchconnection property 설정이 없으면 기본적으로 모든 쿼리 결과를 메모리에 가져온다. 대량 데이터를 다루려면userCursorFetch=true를 설정해야 분할 로딩이 가능하다.
fetchSize - JDBC 드라이버가 한 번에 가져올 row 개수를 지정JdbcCursorItemReaderBuilder.fetchSize로 설정 가능JdbcCursorItemReader는 기본적으로 Step 트랜잭션과 별도의 데이터베이스 커넥션을 사용한다.
JdbcCursorItemReader가 조회하는 데이터가 중간에 변경되어도 reader는 그 변화를 보지 못한다.JdbcCursorItemReader는 100개 그대로 읽어버린다.JdbcCursorItemReader에서 ORDER BY는 필수다.Step이 실패 시 JdbcCursorItemReader는 재시도를 위해 jumpToItem() 메서드를 통해 실패 지점을 찾는다.
ResultSet.next를 호출하여 실패 지점까지 커서를 이동시킨다.ORDER BY가 필요한 것
ORDER BY 절에는 유니크한 값(보통 PK)을 포함하도록 해야 한다.ItemReader 구현체JdbcPagingItemReader는 Keyset 기반 페이징을 수행한다.
SELECT * FROM victims ORDER BY id LIMIT 10 OFFSET 20
OFFSET + LIMIT만큼의 데이터를 먼저 스캔한 뒤, 앞의OFFSET만큼을 버리는 방식SELECT * FROM victims WHERE id > 1000 ORDER BY id LIMIT 10
id값을 기준으로 다음 데이터를 조회JdbcPagingItemReader
|
- DataSource
| └── (데이터베이스 연결)
|
- RowMapper
| └── (ResultSet → Java 객체 변환)
|
- NamedParameterJdbcTemplate
| └── (SQL 실행 및 파라미터 바인딩)
|
- PagingQueryProvider
├── (쿼리 생성 및 페이징 전략)
└── (DB별 SQL 최적화)
DataSource, RowMapper는 JdbcCursorItemReader와 비슷한 역할을 수행한다.NamedParameterJdbcTemplate
JdbcTemplate에서 한 단계 발전한 이름 기반 파라미터 매핑을 사용
WHERE status = :status?와 위치 기반 매칭을 하지 않아도 된다.PagingQueryProvider - 페이징 쿼리 생성
PostgresPagingQueryProvider, MySqlPagingQueryProviderJdbcPagingItemReaderBuilder로 JdbcPagingItemReader를 구성하는 코드다.@Bean
fun terminatedVictimReader(): JdbcPagingItemReader<Victim> =
JdbcPagingItemReaderBuilder<Victim>()
.name("terminatedVictimReader")
.dataSource(dataSource)
.pageSize(5)
.selectClause("SELECT id, name, process_id, terminated_at, status")
.fromClause("FROM victims")
.whereClause("WHERE status = :status AND terminated_at <= :terminatedAt")
.sortKeys(mapOf("id" to Order.ASCENDING))
.parameterValues(
mapOf(
"status" to "TERMINATED",
"terminatedAt" to LocalDateTime.now(),
),
).dataRowMapper(Victim::class.java)
.build()
pageSize() - 페이지 크기 지정
chunkSize와 동일한 값으로 하는 것을 권장한다.chunk 크기와 pageSize가 같으면 한 번의 청크 처리마다 한 번의 조회 쿼리가 호출된다.selectClause(), fromClause(), whereClause(), sortKeys()
PagingQueryProvider 구현체가 지정된다.queryProvider를 직접 따로 지정하면 위 설정들은 무시되기에 하나만 사용해야 한다.JdbcPagingItemReader이기에 정렬 키 지정이 필수다.JdbcBatchItemWriter는 Spring Batch에서 제공하는 가장 기본적인 쓰기 도구
NamedParameterJdbcTemplate 사용JdbcTemplate의 batchUpdate 사용INSERT INTO victims (id, name) VALUES (1, '김배치'), (2, '사불링');PreparedStatement를 재사용하여 쿼리 템플릿 하나와 여러 파라미터 세트를 함께 전송-- 패킷1
INSERT INTO victims (id, name) VALUES (?, ?)
-- 첫번째 레코드
ps.setLong(1, 1);
ps.setString(2, "김배치");
ps.addBatch();
-- 두번째 레코드
ps.setLong(1, 2);
ps.setString(2, "사불링");
ps.addBatch();
## application.yml
# MYSQL
url: jdbc:mysql://localhost:3306/mysql?rewriteBatchedStatements=true
#POSTGRESQL
url: jdbc:postgresql://localhost:5432/postgres?reWriteBatchedInserts=true
JdbcBatchItemWriter
│
├────── NamedParameterJdbcTemplate
│ └─ (쿼리 실행)
│
├────── SQL
│ └─ (데이터 삽입/수정을 위한 쿼리)
│
├────── ItemSqlParameterSourceProvider or ItemPreparedStatementSetter
└─ (Java 객체 → SQL 파라미터 매핑)
NamedParameterJdbcTemplate
JdbcTemplateItemSqlParameterSourceProvider
JdbcBatchItemWriterBuilder.beaMapped()를 사용하면 BeanPropertyItemSqlParameterSourceProvider 구현체가 사용된다. (자바빈)itemSqlParameterSourceProvider() 메서드를 통해 커스텀 구현체를 지정할 수도 있다.PreparedStatement 파라미터에 설정하는 담당public interface ItemPreparedStatementSetter<T> {
void setValues(T item, PreparedStatement ps) throws SQLException;
}
JdbcBatchItemWriter에 전달된다.NamedParameterJdbcTemplate이 각 도구를 사용하여 PreparedStatement에 파라미터를 설정, 설정된 PreparedStatement는 배치에 차례대로 추가된다.PreparedStatement가 단일 네트워크 호출로 DB에 전송된다.