6장 레디스를 메시지 브로커로 사용하기
- 서비스 간 커넥션 실패는 언제든 발생할 수 있기에 느슨한 결합이 중요할 수 있다.
- 모듈 간 통신은 비동기 통신을 사용하는 것이 바람직
- 메시지 브로커는 크게 메시징 큐와 이벤트 스트림 두 가지 형태가 존재한다.
메시징 큐와 이벤트 스트림
- 메시징 큐
- 생산자(producer)와 소비자(consumer)
- 이벤트 스트림
- 발행자(publisher)와 구독자(subscriber)
메시징 큐와 이벤트 스트림의 차이
- 방향성
- 메시징 큐 생산자는 소비자 큐로 데이터를 직접 푸시한다.
- 스트림 발행자는 특정 저장소에 하나의 메시지를 보내고, 구독자들이 이 메시지를 pull한다.
- 데이터 영속성
- 메시징 큐는 소비자가 데이터를 읽으면 큐에서 삭제한다.
- 이벤트 스트림에선 구독자가 읽은 데이터는 바로 삭제되지 않고 저장소 설정에 따라 특정 기간 저장될 수 있다.
- 메시징 큐는 일대일 상황에서, 스트림은 다대다 상황에서 유리하다.
레디스를 메시지 브로커로 사용하기
- 레디스의 pub/sub을 사용하면 빠르고 간단하게 메시지를 전달할 수 있다.
- 발행자가 특정 채널에 데이터를 전송하면 모든 채널 전체에 전파된 뒤 삭제되는 일회성을 가진다.
- 메시지 전달 보장은 하지 않는다.
- fire-and-forget 패턴에서 간단한 알림에선 유용하다.
fire-and-forget - 비동기 프로그래밍에서 사용되는 디자인 패턴. 작업 실행 후 그 결과에 대한 처리가 필요하지 않은 경우 유용하다. 신뢰성이 필요한 경우엔 사용하지 않아야 한다. ex) 로깅, 이벤트 발행, 통계 데이터 수집 등.
- 레디스의 list 자료 구조는 메시징 큐로 알맞다.
- list의 데이터는 push와 pop이 가능
- list에 새 데이터가 들어오면 대기하다 읽어갈 수 있는 블로킹 기능 제공
- 레디스의 stream을 사용하면 레디스를 완벽한 스트림 플랫폼으로 사용할 수 있다.
- 데이터를 계속 추가하는 방식 (append-only)
- 소비자와 소비자 그룹을 통해 데이터 분산 처리를 구현할 수 있다.
- stream의 메시지를 실시간 리스닝 가능
- 메시지를 시간대별로 검색 가능
레디스의 pub/sub
- 레디스는 아주 가벼운 pub/sub을 제공한다.
- 모든 레디스 클라이언트는 발행자와 구독자가 될 수 있다.
- 레디서 pub/sub은 최소한의 전달 기능만 제공한다.
- 한 번 전파된 데이터는 레디스에 저장되지 않는다.
- 특정 구독자에게 장애로 메시지가 전달되지 못해도 그 사실을 알 수 없다.
메시지 publish하기
> PUBLISH hello world
(integer) 1
PUBLISH
커맨드는 hello라는 채널을 수신하는 모든 구독자에 world라는 메시지를 전파한다.
- 전파 후엔 구독자 수가 반환된다.
메시지 구독하기
> SUBSCRIBE event1 event2
Reading messages...
1) "subscribe"
2) "event1"
3) (integer) 1
1) "subscribe"
2) "event2"
3) (integer) 2
SUBSCRIBE
커맨드로 event1과 event2 채널을 동시에 구독하기 시작한다.
- 클라이언트가 구독자로 동작할 때는 pub/sub과 관련되지 않은 커맨드는 사용할 수 없다.
PSUBSCRIBE
커맨드로 특정 패턴에 해당하는 모든 채널을 한 번에 구독할 수 있다.
PSUBSCRIBE mail-*
mail-
로 시작하는 모든 채널에 전파된 메시지 모두 수신 가능
message
가 아닌 pmessage
타입으로 전달됨
SUBSCRIBE
로 구독하는 방식과 구분되기에 같이 사용하면 2개의 메시지를 받게 된다.
클러스터 구조에서의 pub/sub
- 레디스 클러스터에서 pub/sub을 사용하면 해당 메시지는 클러스터의 모든 노드에 전달된다.
- 클러스터의 아무 노드에 연결해 구독하면 데이터를 수신할 수 있다.
- 하나의 노드에 메시지를 발행하면 메시지는 모든 노드에 전파된다.
- 하지만 대규모 서비스에서 클러스터 모든 노드에 데이터를 복제하는 것은 비효율적이다.
sharded pub/sub
- 레디스 클러스터의 pub/sub 비효율을 해결하기 위해 레디스 7.0에서 sharded pub/sub이 도입되었다.
- sharded 환경에서 각 채널은 슬롯에 매핑된다.
- 같은 슬롯을 가진 노드 간에만 pub/sub 메시지를 전파한다.
- 모든 노드에 메시지를 전파하지 않아 불필요한 복제를 줄여 자원을 절약할 수 있다.
레디스 list를 메시징 큐로 사용하기
- 레디스에는 list 큐의 tail과 head에서 데이터를 넣고 뺄 수 있는 명령어를 제공한다.
list의 EX 기능
- 트위터는 각 유저 타임라인을 레디스 list에서 관리한다.
- SNS에는 각 유저별 타임라인이 존재한다.
- 유저를 팔로우한 다른 유저들은 그 타임라인을 볼 수 있다.
- 모든 유저는 제각기 다른 타임라인을 가짐을 뜻한다.
- ex) 유저 A가 새 트윗을 작성하면 A를 팔로우하는 유저 B의 타임라인 캐시 list에 새 아이템을 추가한다.
- 각 타임라인 캐시에 데이터 저장 시
RPUSH
가 아닌 RPUSHX
커맨드를 사용한다.
RPUSHX
: list가 이미 존재할 때만 아이템을 추가하는 커맨드
- 자주 트위터를 들어오지 않는 유저에 대한 캐시를 굳이 관리할 필요가 없기 때문
- 사용자 캐시가 이미 존재하는지의 유무를 애플리케이션에서 확인하지 않아도 되서 성능을 향상시킬 수 있다.
list의 블로킹 기능
- list의 블로킹 기능을 통해 불필요한 폴링 프로세스로 리소스를 소모할 필요 없이 이벤트를 처리할 수 있다.
BRPOP
, BLPOP
명령어로 블로킹 기능을 사용 가능
- 데이터 요청 시 데이터가 있다면 즉시 반환하고, 없으면 대기 후 반환하거나 타임아웃만큼 대기하고 nill을 반환한다.
# 데이터가 입력될 때까지 5초 동안 대기
> BRPOP queue:a 5
1) "queue:a"
2) "data"
- 여러 클라이언트가 동시에 블로킹될 수 있으며 데이터가 입력되면 가장 먼저 온 클라이언트가 데이터를 가져간다.
BRPOP
은 BPOP
과 달리 데이터 키와 값을 둘 다 반환한다.
- 동시에 여러 리스트에서 대기할 수 있기 때문이다.
BRPOP queue:a queue:b queue:c
list를 이용한 원형 큐
RPOPLPUSH
명령어로 원형 큐를 사용할 수 있다.
- 특정 아이템을 계속 반복 접근해야 하거나
- 여러 클라이언트가 병렬적으로 같은 아이템에 접근해야 하는 경우
> LPUSH clist A
(integer) 1
> LPUSH clist B
(integer) 2
> LPUSH clist C
(integer) 3
> LRANGE clist 0 -1
1) "C"
2) "B"
3) "A"
# clist에서 RPOP해서 clist에 LPUSH
> RPOPLPUSH clist clist
> LRANGE clist 0 -1
1) "A"
2) "C"
3) "B"
Stream
레디스의 Stream과 아파치 카프카
- Stream
- 레디스 5.0에 새로 추가된 append-only 자료 구조
- 대규모 메시징 데이터를 빠르게 처리 가능
- Stream 사용 목적
- 대량의 데이터를 효율적으로 처리하는 플랫폼
- 여러 생산자가 생성한 데이터를 다양한 소비자가 처리할 수 있게 지원하는 저장소 및 큐잉 시시스템
- 레디스 Stream은 카프카의 영향을 많이 받았다.
- 카프카와 유사한 점이 많음
- 일부 기능은 카프카보다 뛰어난 처리가 가능
스트림이란?
- 스트림이란
- 연속적인 데이터 흐름, 일정한 데이터 조각의 연속을 의미
- 카프카나 레디스의 stream과 같은 서비스를 통해 스트림 데이터 처리를 더 쉽고 정확하게 할 수 있다.
데이터의 저장
- 메시지의 저장과 식별
- 카프카
- 스트림 데이터는 토픽이라는 개념에 저장되는데 이는 같은 데이터를 관리하는 하나의 그룹을 의미
- 각 메시지는 0부터 시작해 증가하는 시퀸스 넘버로 식별한다.
- 시퀸스 넘버는 토픽 내 파티션 안에서만 유니크하기에 토픽 내 1개 이상 파티션이 있다면 유니크하게 식별되지 않는다.
- 레디스 stream
- 레디스 다른 자료 구조와 카찬가지로 하나의 키에 연결된 자료 구조
- 각 메시지는 시간과 관련된 유니크한 ID를 가진다. (
<millisecondsTime>-<sequenceNumber>
)
millsecondsTime
은 저장 시점의 레디스 노드 로컬 시간이다
sequenceNumber
는 같은 시간에 저장된 데이터들의 순서를 의미
- 레디스 stream의 모든 데이터는 유니크한 ID를 가지며 이 ID가 곧 시간이기에 시간을 통한 검색이 가능
- 스트림 생성과 데이터 입력
- 카프카에선 데이터를 저장하기 위해 토픽을 먼저 생성한 후 프로듀서를 이용해 메시지를 보낼 수 있다.
- 레디스에선
XADD
커맨드로 stream 자료 구조를 생성할 수 있다.
- 기존에 존재하던 키를 사용하면 기존 stream에 데이터를 추가한다.
# '*'은 레디스가 자동으로 생성하는 타임스탬프 ID를 사용하겠다는 것을 의미
# Email stream에 subject 키에 first, body 키에 hello? 값을 저장
> XADD Email * subject "first" body "hello?"
"1659114481311-0" # ID 반환
- 데이터의 조회
- 카프카의 실시간 리스닝
- 소비자는 특정 토픽에 저장되는 데이터를 실시간으로 전달받는다.
- 기본적으로는 리스닝을 시작한 시점부터 토픽에 새로 저장되는 메시지를 반환 받는다.
- 더 이상 읽을 데이터가 없다면 기다린다.
- 레디스 stream은 실시간 리스닝도 가능하지만 ID로 특정 데이터 검색하는 방식을 이용할 수 있다.
- ID로 원하는 시간대의 데이터를 조회 가능
- 메시지가 저장된 시점을 이용해 데이터를 조회할 수도 있다.
소비자
- 카프카에선 같은 토픽을 여러 소비자가 읽어가게 할 수 있다.
- 팬아웃(fan-out) - 같은 데이터를 여러 소비자에게 전달하는 것
- 레디스 stream에서도
XREAD
커맨드로 팬아웃이 가능하다.
- 여러 소비자가 stream에 저장된 똑같은 데이터를 읽어간다.
- 레디스 stream은 한 번에 여러 이벤트를 여러 소비자가 병렬적으로 처리하도록 구성할 수 있다.
- 메시지 순서가 중요한 경우에도 stream은 데이터마다 고유한 ID를 부여하여 저장하기 때문에 순서를 보장할 수 있다.
- 반면 카프카의 유니크 키는 파티션 내에서만 보장되기에 여러 파티션에서 토픽을 읽어갈 때는 데이터 순서를 보장할 수 없다.
- 소비자가 데이터를 소비할 땐 토픽 내 전체 파티션에서 데이터를 읽어 오기 때문
- 카프카에서 순서를 보장하려면 소비자 그룹을 사용해야 한다.
소비자 그룹
- 카프카에선 소비자 그룹에 여러 소비자를 추가할 수 있다.
- 소비자는 토픽 내 파티션과 일대일로 연결된다.
- 파티션 내부에선 메시지 순서가 보장되기 때문에 한 파티션과 연결된 소비자 그룹 안의 한 소비자는 순서가 보장된 메시지를 읽을 수 있다.
- 레디스 stream에도 소비자 그룹 개념이 존재하지만 카프카와는 다르다.
- stream에선 메시지 전달 순서를 신경쓰지 않아도 되기 때문
- stream의 소비자 그룹 내 한 소비자는 다른 소비자가 아직 읽지 않은 데이터만을 읽어 간다.
- 부하 분산 관점에서 카프카와 레디스 stream의 비교
- 카프카는 파티션 개념을 통해 소비자의 부하 분산을 관리
- stream은 파티션 없이 소비자 그룹 개념을 이용해 여러 소비자에게 stream 데이터를 분산
- stream과 소비자 그룹은 독립적으로 동작한다.
- stream 메시지를 읽는 소비자 그룹은 다수 존재 가능
- 한 소비자 그룹 안의 소비자들은 서로 같은 데이터를 읽지 않지만, 각 소비자 그룹들은 같은 데이터를 읽어간다.
- 한 소비자 그룹에서 여러 stream을 리스닝하는 것도 가능하다.
ACK와 보류 리스트
- 메시지 브로커는 각 소비자에게 어떤 메시지까지 전달했고, 전달된 메시지의 처리 유무를 알고 있어야한다.
- 시스템이 장애로 종료됐을 경우 이를 인지하고 재처리할 수 있야 하기 때문
- 레디스 stream의 보류 리스트 (pending list)
- 소비자 그룹의 소비자가 메시지를 읽으면 소비자별 읽어간 메시지 리스트를 새로 생성 (보류 리스트)
- 소비자 그룹에서 마지막으로 읽어간 데이터의 ID로
last_delivered_id
값을 업데이트
- 소비자 그룹의 소비자가 데이터를 처리하고 ACK를 전송하면 stream은 해당 소비자의 보류 리스트에서 해당 메시지를 삭제
- 시스템 장애 후 재부팅했을 때 보류 리스트에 남아 있는 데이터를 먼저 선행적으로 수행하게 된다.
- 카프카도 레디스 stream처럼 파티션별 오프셋을 관리한다.
- 소비자가 토픽의 특정 파티션의 메시지를 읽으면 소비자 그룹, 토픽, 파티션 내용이 통합돼 저장된다.
메시지의 재할당
- 만약 소비자 서버에 장애가 발생해 복구되지 않는다면 다른 소비자에게 보류 리스트의 메시지를 재할당할 수 있다.
XCLAIM
커맨드 사용
- 최소 대기 시간을 설정해 해당 대기 시간을 초과한 경우에만 소유권을 변경하도록 할 수 있다.
XCLAIM <key> <group> <consumer> <min-idle-time> <ID-1> <ID-2> …
메시지의 자동 재할당
- 매번 보류 중인 메시지를 확인하고 특정 소비자에게 직접 소유권을 재할당하는 작업은 번거롭다.
XAUTOCLAIM
커맨드로 자동 재할당이 가능하다.
- 지정한 소비자 그룹에서 최소 대기 시간을 만족하는 보류 메시지가 있다면 지정한 소비자에 소유권을 재할당
XCLAIM
커맨드와 달리 직접 재할당 메시지를 입력해주지 않아도 된다.
XAUTOCLAIM <key> <group> <consumer> <min-idle-time> <start> [COUNT count] [JUSTID]
메시지의 수동 재할당
- stream 내 각 메시지는 counter 값을 각각 가진다.
- 소비자에게 할당되거나 재할당될 경우 1씩 증가한다.
- 메시지에 문제가 있어 재할당이 반복된다면 counter 값이 계속 증가할 것이다.
- counter가 특정 값에 도달한 메시지는 특수한 다른 stream에 보내 추후 관리하는 것이 현명하다.
- 이런 메시지를 dead letter라 부른다.
stream 상태 확인
- 레디스 stream의 여러 상태를 확인할 수 있다.
- 어떤 소비자가 활성화 됐는지
- 보류된 메시지는 어떤 건지
- 어떤 소비자 그룹이 메시지를 처리하고 있는지 등
XINFO
커맨드로 사용 가능하며 help
커맨드로 기능을 확인할 수 있다.
- XINFO consumers <소비자 그룹="" 이름=""> - 소비자 그룹의 소비자 정보소비자>
- XINFO GROUPS - 전체 소비자 그룹 list
- XINFO STREAM - stream 자체의 정보 (인코딩 방법, 첫 번째와 마지막 메시지 ID 등)