TIL

카프카 (Kafka)

메시지 큐란?

프로세스 간 데이터를 교환할 때 사용되는 통신 방법 중 하나로 메시지를 임시로 저장하는 간단한 버퍼를 생각하면 된다.

생산자(Producer)가 메시지를 메시지 큐에 추가하면 소비자(Consumer)가 메시지를 검색하고 이를 통해 어떤 작업을 수행한다.

메시지 큐의 이점

**Apache Kafka**

Apache Kafka는 고성능 데이터 파이프라인, 스트리밍 분석, 데이터 통합 및 미션 크리티컬 애플리케이션을 위해 수천 개의 회사에서 사용하는 오픈 소스 분산 이벤트 스트리밍 플랫폼입니다. https://kafka.apache.org/

Kafka 기본 구조

img.png

토픽과 파티션

img.png

파티션과 오프셋, 메시지 순서

여러 파티션과 컨슈머

img.png

성능

리플리카 - 복제

프로듀서(Producer)

Properties prop = new Properties();
prop.put("bootstrap.servers", "kafka01:9092,kafka01:9902,kafka01:9902");
prop.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");
prop.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");

KafkaProxucer<Integer, String> producer = new KafkaProxucerM<>(prop);

producer.send(new ProducerRecord<>("topicname", "key", "value"));
producer.send(new ProducerRecord<>("topicname", "value"));

producer.close();

프로듀서 기본 흐름

img.png

  1. send() 호출
  2. Serializer로 byte 배열로 변환
  3. Partitioner로 어떤 토픽의 파티션에 전할지 결정
  4. 보낼 메시지를 배치로 묶어서 저장한다.
  5. Sender가 배치를 차례대로 가져와 브로커에 전달한다.

Sender의 기본 동작

처리량 관련 주요 속성

전송 결과를 확인X

producer.send(new ProducerRecord>("topic", "value"));

전송 결과 확인 - Future

Futre<RecordMetadata> future = producer.send(new ProducerRecord<>("topic", "value"));
try {
    RecordMetadata meta = future.get(); // 블로킹
} catch (ExecutionException e) {
}

전송 결과 확인 - Callback

producer.send(new ProducerRecord<>("topic", "value"),
		new Callback() {
				@Override
				public void onCompletion(RecordMetadata medatada, Exception ex) {
				}
		});

전송 보장과 ack

ack + min.insync.replicas

에러 유형

실패 대응 1 - 재시도

실패 대응 2 - 기록

재시도와 메시지 중복 전송 가능성

재시도와 순서

컨슈머

Properties prop = new Properties();
prop.put("bootstrap.servers", "localhost9902");
prop.put("group.id", "group1");
prop.put("key.deserializer", "org.apache.kafka.common.serialization.StringSerializer");
porp.put("value.deserializer", "org.apache.kafka.common.serialization.StringSerializer");

KafkaConsumer<String, String> consumer = new KafkaConsumer<String, String>(prop);
consumer.subscibe(Collections.singleton("simple")); //토픽 구독
while(조건) {
    ConsumerRecores<String, String> recores = consumer.poll(Duration.ofMills(100));
    for (ConsumerRecoreM<String, String> record : records) {
        // ...
    }
}

consumer.close();

토픽 파티션은 그룹 단위 할당

img.png

커밋과 오프셋

img.png

컨슈머 설정

자동 커밋/수동 커밋

수동 커밋 - 동기/비동기 커밋

재처리와 순서

주의 - 스레드 안전하지 않음

참고

https://www.youtube.com/watch?v=0Ssx7jJJADI

https://www.youtube.com/watch?v=geMtm17ofPY