TIL

관계형 데이터베이스 읽고 쓰기

Spring Batch의 두 가지 전략

JdbcCursorItemReader

JdbcCursorItemReader 해부

JdbcCursorItemReader
    │
    ├────── DataSource  
    │        └─ (DB 연결 관리)  
    │
    ├────── SQL  
    │        └─ (데이터 조회 쿼리)  
    │
    ├────── RowMapper  
    │        └─ (ResultSet → Java 객체 변환)  
    │
    ├────── PreparedStatement  
    │        └─ (쿼리 실행 및 결과 조회)  
    │
    └────── PreparedStatementSetter (optional)  
             └─ (파라미터 동적 바인딩) 
graph TD
    subgraph "read()"
        rsNext["rs.next()"]

        subgraph "RowMapper"
            mapRow["mapRow(rs, rowNum)"]
        end
        returnVictim["return "]

        rsNext -->|"readCursor()"| mapRow
        mapRow --> returnVictim
    end

    subgraph "openCursor()"
        SQL["SQL<br/>'SELECT * FROM victims ...'"]
        PreparedStatementSetter["PreparedStatementSetter"]

        SQL -- "sql 전달" --> executeQuery
        PreparedStatementSetter -- "파라미터 바인딩" --> executeQuery

        subgraph "PreparedStatement"
            executeQuery["executeQuery()"]
        end

        subgraph "ResultSet 초기화"
            rsInit["this.rs = executeQuery()"]
        end
        executeQuery --> rsInit
    end

JdbcCursorItemReader 구성하기

@Bean
fun terminatedVictimReader(): JdbcCursorItemReader<Victim> =
    JdbcCursorItemReaderBuilder<Victim>()
        .name("terminatedVictimReader")
        .dataSource(dataSource)
        .sql("SELECT * FROM victims WHERE status = ? AND terminated_at <= ?")
        .queryArguments(listOf("TERMINATED", LocalDateTime.now()))
        .dataRowMapper(Victim::class.java)
        .build()

JDBC 드라이버의 내부 최적화

MySQL 드라이버는 useCursorFetch connection property 설정이 없으면 기본적으로 모든 쿼리 결과를 메모리에 가져온다. 대량 데이터를 다루려면 userCursorFetch=true를 설정해야 분할 로딩이 가능하다.

커서 연속성

스냅샷 읽기

JdbcCursorItemReader의 ORDER BY 설정

JdbcPagingItemReader

JdbcPagingItemReader 해부

JdbcPagingItemReader
    |
    - DataSource   
    |        └── (데이터베이스 연결)
    |
    - RowMapper
    |        └── (ResultSet → Java 객체 변환) 
    |
    - NamedParameterJdbcTemplate
    |        └── (SQL 실행 및 파라미터 바인딩)
    |
    - PagingQueryProvider
					   ├── (쿼리 생성 및 페이징 전략)
             └── (DB별 SQL 최적화)

JdbcPagingItemReader 구성하기

@Bean
fun terminatedVictimReader(): JdbcPagingItemReader<Victim> =
    JdbcPagingItemReaderBuilder<Victim>()
        .name("terminatedVictimReader")
        .dataSource(dataSource)
        .pageSize(5)
        .selectClause("SELECT id, name, process_id, terminated_at, status")
        .fromClause("FROM victims")
        .whereClause("WHERE status = :status AND terminated_at <= :terminatedAt")
        .sortKeys(mapOf("id" to Order.ASCENDING))
        .parameterValues(
            mapOf(
                "status" to "TERMINATED",
                "terminatedAt" to LocalDateTime.now(),
            ),
        ).dataRowMapper(Victim::class.java)
        .build()

JdbcBatchItemWriter

일반 INSERT vs Multi-value INSERT vs Batch Update

-- 패킷1
INSERT INTO victims (id, name) VALUES (?, ?)
-- 첫번째 레코드
ps.setLong(1, 1);
ps.setString(2, "김배치");
ps.addBatch();

-- 두번째 레코드
ps.setLong(1, 2);
ps.setString(2, "사불링");
ps.addBatch();
## application.yml

# MYSQL
url: jdbc:mysql://localhost:3306/mysql?rewriteBatchedStatements=true 
#POSTGRESQL
url: jdbc:postgresql://localhost:5432/postgres?reWriteBatchedInserts=true 

JdbcBatchItemWriter 해부

JdbcBatchItemWriter
    │
    ├────── NamedParameterJdbcTemplate  
    │        └─ (쿼리 실행)  
    │
    ├────── SQL  
    │        └─ (데이터 삽입/수정을 위한 쿼리)  
    │
    ├──────  ItemSqlParameterSourceProvider or ItemPreparedStatementSetter
             └─ (Java 객체 → SQL 파라미터 매핑)
public interface ItemPreparedStatementSetter<T> {
		void setValues(T item, PreparedStatement ps) throws SQLException;
}