프로세스 간 데이터를 교환할 때 사용되는 통신 방법 중 하나로 메시지를 임시로 저장하는 간단한 버퍼를 생각하면 된다.
생산자(Producer)가 메시지를 메시지 큐에 추가하면 소비자(Consumer)가 메시지를 검색하고 이를 통해 어떤 작업을 수행한다.
Apache Kafka는 고성능 데이터 파이프라인, 스트리밍 분석, 데이터 통합 및 미션 크리티컬 애플리케이션을 위해 수천 개의 회사에서 사용하는 오픈 소스 분산 이벤트 스트리밍 플랫폼입니다. https://kafka.apache.org/
- 아파치 재단이 스칼라로 개발한 오픈 소스 메시지 브로커 프로젝트다.
- 높은 처리량을 요구하는 실시간 데이터 처리를 목표로 하며 TCP 프로토콜을 사용한다.



Properties prop = new Properties();
prop.put("bootstrap.servers", "kafka01:9092,kafka01:9902,kafka01:9902");
prop.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");
prop.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");
KafkaProxucer<Integer, String> producer = new KafkaProxucerM<>(prop);
producer.send(new ProducerRecord<>("topicname", "key", "value"));
producer.send(new ProducerRecord<>("topicname", "value"));
producer.close();
Properties - 프로듀서가 사용할 속성을 지정할 수 있다.
KafkaProducer - send() 제공
ProducerRecord를 통해 메시지 전달

send() 호출Serializer로 byte 배열로 변환Partitioner로 어떤 토픽의 파티션에 전할지 결정Sender가 배치를 차례대로 가져와 브로커에 전달한다.Sender는 별도 스레드로 동작한다.
Sender가 메시지를 보내는 동안 send()로 들어오는 레코드들은 배치에 누적된다.batchSize - 배치 최대 크기를 설정, 크기만큼 차면 바로 전송
liners.ms - 전송 대기 시간
대기 시간이 있으면 시간만큼 기다렸다가 배치를 전송
→ 한 번에 많은 메시지를 보낼 수 있게 되고 처리량이 높이지게 된다.
producer.send(new ProducerRecord>("topic", "value"));
Futre<RecordMetadata> future = producer.send(new ProducerRecord<>("topic", "value"));
try {
RecordMetadata meta = future.get(); // 블로킹
} catch (ExecutionException e) {
}
producer.send(new ProducerRecord<>("topic", "value"),
new Callback() {
@Override
public void onCompletion(RecordMetadata medatada, Exception ex) {
}
});
min.insync.replicas 설정에 따라 달라짐min.insync.replicas
min.insync.replicas = 2
min.insync.replicas = 1
min.insync.replicas = 3
send() 재호출 (catch)send() 재호출Future.get()이나 콜백에서의 예외를 받아서 기록enable.idempotence 속성으로 중복 전송 가능성을 줄일 수 있음max.in.flight.requests.per.connection

Properties prop = new Properties();
prop.put("bootstrap.servers", "localhost9902");
prop.put("group.id", "group1");
prop.put("key.deserializer", "org.apache.kafka.common.serialization.StringSerializer");
porp.put("value.deserializer", "org.apache.kafka.common.serialization.StringSerializer");
KafkaConsumer<String, String> consumer = new KafkaConsumer<String, String>(prop);
consumer.subscibe(Collections.singleton("simple")); //토픽 구독
while(조건) {
ConsumerRecores<String, String> recores = consumer.poll(Duration.ofMills(100));
for (ConsumerRecoreM<String, String> record : records) {
// ...
}
}
consumer.close();
Properties
Deserializer 등록KafkaConsumer
subscribe() 메서드로 토픽 구독poll() 메서드로 ConsumerRecords 받아옴 (100ms 동안 대기)

poll()을 반복하여 순서대로 읽어나간다.auto.offset.reset 설정 사용
fetch.min.bytes - 조회 시 브로커가 전송할 최소 데이터 크기
fetch.max.wait.ms - 데이터가 최소 크기가 될 때까지 기다릴 시간
poll() 메서드의 대기 시간과 다름max.partition.fetch.bytes - 파티션 당 브로커가 리턴할 수 있는 최대 크기
enable.auto.commit
auto.commit.interval.ms - 자동 커밋 주기
poll(), close() 메서드 호출 시 자동 커밋 실행consumer.commitSync();
consumer.commitAsync();
Callback 사용