TIL

5.4 정확히 한 번 전송

5.4.1 디자인

5.4.2 프로듀서 예제 코드

import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.ProducerConfig;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.clients.producer.Producer;
import org.apache.kafka.common.serialization.StringSerializer;
import java.util.Properties;

public class ExactlyOnceProducer {
    public static void main(String[] args) {
        Properties props = new Properties();
        
        // 정확히 한 번 전송을 위한 설정
        props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
        props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
        props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
        props.put(ProducerConfig.ACKS_CONFIG, "all");
        props.put(ProducerConfig.ENABLE_IDEMPOTENCE_CONFIG, "true");
        props.put(ProducerConfig.TRANSACTIONAL_ID_CONFIG, "my-transactional-id-001");
        props.put(ProducerConfig.MAX_IN_FLIGHT_REQUESTS_PER_CONNECTION, "5"); 
        props.put(ProducerConfig.RETRIES_CONFIG, "5");

        Producer<String, String> producer = new KafkaProducer<>(props);

        producer.initTransactions(); // 프로듀서 트랜잭션 초기화

        try {
            producer.beginTransaction(); // 프로듀서 트랜잭션 시작
            producer.send(new ProducerRecord<>("test-topic", "key1", "value1"));
            producer.send(new ProducerRecord<>("test-topic", "key2", "value2"));
            producer.flush();
            producer.commitTransaction(); // 프로듀서 트랜잭션 커밋
        } catch (Exception e) {
            producer.abortTransaction(); // 프로듀서 트랜잭션 중단
            e.printStackTrace();
        } finally {
            producer.close();
        }
    }
}

5.4.3 단계별 동작

트랜잭션 코디네이터 찾기

graph LR
    Producer -- FindCoordinatorRequest --> Broker

프로듀서 초기화

graph LR
    Producer -- InitPidRequest --> TransactionCoordinator

    subgraph Broker
        TransactionCoordinator
        TID_PID_Mapping["트랜잭션 로그"]
        TransactionCoordinator -- "TID ↔ PID 매핑" --> TID_PID_Mapping
    end

트랜잭션 시작

트랜잭션 상태 추가

메시지 전송

graph LR
    Producer -- 메시지 --> Blocker

    subgraph Broker
        TransactionCoordinator
    end

트랜잭션 종료

graph LR
    Producer -- EndTxnRequest --> TransactionCoordinator

    subgraph Broker
        TransactionCoordinator
    end

graph LR
		Producer
		
    TransactionCoordinator -- WriteTxnMarkerRequest --> Blocker

    subgraph Broker
        TransactionCoordinator
    end

트랜잭션 완료