3.4 컨슈머의 기본 동작과 예제 맛보기
3.4.1 컨슈머의 기본 동작
- 컨슈머를 통해 토픽에 저장된 메시지를 가져올 수 있다.
- 컨슈머는 반드시 컨슈머 그룹에 속하게 된다.
- 컨슈머 그룹은 각 파티션의 리더에게 카프카 토픽의 메시지를 가져오기 위한 요청을 보낸다.
- 파티션 수와 (컨슈머 그룹 내의) 컨슈머 수는 일대일로 매핑되는 것이 이상적이다.
- 반드시 그래야 하는건 아니지만 파티션 수보다 컨슈머 수가 많다고 더 처리량이 좋아지는 것도 아니다.
- 매핑되지 않는 컨슈머는 대기 상태로만 존재하기 때문
3.4.2 컨슈머의 주요 옵션
- bootstrap.servers
- fetch.min.bytes
- 한 번에 가져올 최소 데이터 크기
- 데이터가 누적되다가 값에 달하면 가져온다.
- group.id
- 컨슈머 그룹을 식별하는 식별자로 그룹 내 컨슈머들에게 공유된다.
- heartbeat.interval.ms
- 하트비트가 있다는 것은 컨슈머가 액티브하다는 것
- session.timeout.ms보다 낮은 값으로 설정해야 한다. (일반적으로 1/3)
- max.partition.fetch.bytes
- session.timeout.ms
- 하트비트를 이 시간까지 받지 못하면 종료된 것으로 간주
- 컨슈머 그룹에서 제외하고 리밸런싱을 시작한다.
- enable.auto.commit
- auto.offset.reset
- 초기 오프셋이 없거나 현재 오프셋이 더 이상 존재하지 않는 경우에 다음 옵션으로 reset
- earlist - 초기 오프셋값
- latest - 가장 마지막의 오프셋값으로 설정
- none - 이전 오프셋값을 찾지 못하면 에러를 낸다.
- fetch.max.bytes
- group.instance.id
- 컨슈머의 고유 식별자
- 설정한다면 static 멤버로 간주되어 불필요한 리밸런싱을 하지 않는다.
- isolation.level
- read_uncommitted - 기본값으로 모든 메시지를 읽음
- read_committed - 트랜잭션이 완료된 메시지만 읽는다.
- max.poll.records
- 한 번의 poll() 요청으로 가져오는 최대 메시지 수
- partition.assignment.strategy
- fetch.max.wait.ms
- fetch.min.bytes에 설정된 데이터 크기를 기다리는 최대 시간