프로세스 간 데이터를 교환할 때 사용되는 통신 방법 중 하나로 메시지를 임시로 저장하는 간단한 버퍼를 생각하면 된다.
생산자(Producer)가 메시지를 메시지 큐에 추가하면 소비자(Consumer)가 메시지를 검색하고 이를 통해 어떤 작업을 수행한다.
Apache Kafka는 고성능 데이터 파이프라인, 스트리밍 분석, 데이터 통합 및 미션 크리티컬 애플리케이션을 위해 수천 개의 회사에서 사용하는 오픈 소스 분산 이벤트 스트리밍 플랫폼입니다. https://kafka.apache.org/
- 아파치 재단이 스칼라로 개발한 오픈 소스 메시지 브로커 프로젝트다.
- 높은 처리량을 요구하는 실시간 데이터 처리를 목표로 하며 TCP 프로토콜을 사용한다.
Properties prop = new Properties();
prop.put("bootstrap.servers", "kafka01:9092,kafka01:9902,kafka01:9902");
prop.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");
prop.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");
KafkaProxucer<Integer, String> producer = new KafkaProxucerM<>(prop);
producer.send(new ProducerRecord<>("topicname", "key", "value"));
producer.send(new ProducerRecord<>("topicname", "value"));
producer.close();
Properties
- 프로듀서가 사용할 속성을 지정할 수 있다.
KafkaProducer
- send()
제공
ProducerRecord
를 통해 메시지 전달
send()
호출Serializer
로 byte 배열로 변환Partitioner
로 어떤 토픽의 파티션에 전할지 결정Sender
가 배치를 차례대로 가져와 브로커에 전달한다.Sender
는 별도 스레드로 동작한다.
Sender
가 메시지를 보내는 동안 send()
로 들어오는 레코드들은 배치에 누적된다.batchSize
- 배치 최대 크기를 설정, 크기만큼 차면 바로 전송
liners.ms
- 전송 대기 시간
대기 시간이 있으면 시간만큼 기다렸다가 배치를 전송
→ 한 번에 많은 메시지를 보낼 수 있게 되고 처리량이 높이지게 된다.
producer.send(new ProducerRecord>("topic", "value"));
Futre<RecordMetadata> future = producer.send(new ProducerRecord<>("topic", "value"));
try {
RecordMetadata meta = future.get(); // 블로킹
} catch (ExecutionException e) {
}
producer.send(new ProducerRecord<>("topic", "value"),
new Callback() {
@Override
public void onCompletion(RecordMetadata medatada, Exception ex) {
}
});
min.insync.replicas
설정에 따라 달라짐min.insync.replicas
min.insync.replicas
= 2
min.insync.replicas
= 1
min.insync.replicas
= 3
send()
재호출 (catch)send()
재호출Future.get()
이나 콜백에서의 예외를 받아서 기록enable.idempotence
속성으로 중복 전송 가능성을 줄일 수 있음max.in.flight.requests.per.connection
Properties prop = new Properties();
prop.put("bootstrap.servers", "localhost9902");
prop.put("group.id", "group1");
prop.put("key.deserializer", "org.apache.kafka.common.serialization.StringSerializer");
porp.put("value.deserializer", "org.apache.kafka.common.serialization.StringSerializer");
KafkaConsumer<String, String> consumer = new KafkaConsumer<String, String>(prop);
consumer.subscibe(Collections.singleton("simple")); //토픽 구독
while(조건) {
ConsumerRecores<String, String> recores = consumer.poll(Duration.ofMills(100));
for (ConsumerRecoreM<String, String> record : records) {
// ...
}
}
consumer.close();
Properties
Deserializer
등록KafkaConsumer
subscribe()
메서드로 토픽 구독poll()
메서드로 ConsumerRecords
받아옴 (100ms 동안 대기)poll()
을 반복하여 순서대로 읽어나간다.auto.offset.reset
설정 사용
fetch.min.bytes
- 조회 시 브로커가 전송할 최소 데이터 크기
fetch.max.wait.ms
- 데이터가 최소 크기가 될 때까지 기다릴 시간
poll()
메서드의 대기 시간과 다름max.partition.fetch.bytes
- 파티션 당 브로커가 리턴할 수 있는 최대 크기
enable.auto.commit
auto.commit.interval.ms
- 자동 커밋 주기
poll()
, close()
메서드 호출 시 자동 커밋 실행consumer.commitSync();
consumer.commitAsync();
Callback
사용