Order
애그리거트를 이벤트 소싱으로 저장한다면 EVENTS
테이블의 여러 로우로 저장한다.event_id | event_type | entity_type | entity_id | event_data |
---|---|---|---|---|
102 | OrderCreated | Order | 101 | {…} |
103 | OrderApproved | Order | 101 | {…} |
104 | OrderShipped | Order | 101 | {…} |
105 | OrderDelivered | Order | 101 | {…} |
apply()
를 호출Class aggregateClass = ...;
Aggregate aggregate = aggregateClass.newInstance();
for (Event event : events) {
aggregate = aggregate.applyEvent(event);
}
// 애그리거트 사용
process()
, apply()
라는 이름으로 명명했다.// 기존 로직
public class Order {
public List<DomainEvent> revise(OrderRevision orderRevision) {
switch (state) {
case AUTHORIZED:
LineItemQuantityChange change =
orderLineItems.lineItemQuantityChange(orderRevision);
if (change.newOrderTotal.isGreaterThanOrEqual(orderMinimum) {
throw new OrderMinimumNotMetException();
}
this.state = REVISION_PENDING;
return ...;
default:
throw new UnsupportedStateTransitionException(state);
}
}
}
// 이벤트 소싱 로직
public class Order {
public List<Event> process(ReviseOrder command) { // Order 업데이트 없이 이벤트 반환
OrderRevision orderRevision = command.getOrderRevision();
switch (state) {
case AUTHORIZED:
LineItemQuantityChange change =
orderLineItems.lineItemQuantityChange(orderRevision);
if (change.newOrderTotal.isGreaterThanOrEqual(orderMinimum) {
throw new OrderMinimumNotMetException();
}
return singletonList(
new OrderRevisionProposed(...));
default:
throw new UnsupportedStateTransitionException(state);
}
}
public void apply(OrderRevisionProposed event) { // 이벤트를 적용하여 Order 업데이트
this.state = REVISION_PENDING;
}
}
process()
를 호출하여 새 이벤트 발생apply()
를 호출하여 애그리거트 수정apply()
호출process()
를 호출하여 새 이벤트를 발생시킴apply()
를 호출하여 애그리거트 수정OUTBOX
테이블에 일시적으로 저장하는 것이 아닌 영구 저장한다는 중요한 차이점이 있다.SELECT
문으로 새 이벤트를 조회해 메시지 브로커에 발행하게 된다.EVENT_ID
를 기록할 수 있지만 문제가 존재한다.
EVENTS
테이블에 이벤트 발행 여부를 추적하는 컬럼을 추가하면 이벤트 누락을 해결할 수 있다.
PUBLISHED
컬럼이 0
인 미발행 이벤트 검색PUBLISHED = 1
로 변경Order
같은) 애그리거트는 이벤트를 쿼리해서 재구성하는 것이 효율적이다.Account
(계좌)Class aggregateClass = ...;
Snapshot = snapshot = ...;
Aggregate aggregate = recreateFromSnapshot(aggregateClass, snapshot);
for (Event event : events) {
aggregate = aggregate.applyEvent(event);
}
// 애그리거트 사용
PROCESSED_MESSAGS
테이블에, 이벤트는 EVENTS
테이블에 삽입하여 중복을 솎아 낸다.수준 | 변경 | 하위 호환성 |
---|---|---|
스키마 | 새 애그리거트 타입 정의 | 예 |
애그리거트 삭제 | 기존 애그리거트 삭제 | 아니요 |
애그리거트 개명 | 애그리거트 타입명 변경 | 아니요 |
애그리거트 | 새 이벤트 타입 추가 | 예 |
이벤트 삭제 | 이벤트 타입 삭제 | 아니요 |
이벤트 개명 | 이벤트 타입명 변경 | 아니요 |
이벤트 | 새 필드 추가 | 예 |
필드 삭제 | 필드 삭제 | 아니요 |
필드 개명 | 필드명 변경 | 아니요 |
필드 타입 변경 | 필드 타입 변경 | 아니요 |
create table events (
event_id varchar(1000) PRIMARY KEY,
event_type varchar(1000),
event_data varchar(1000) NOT NULL,
entity_type VARCHAR(1000) NOT NULL,
entity_id VARCHAR(1000) NOT NULL,
triggering_vent VARCHAR(1000) -- 중복 이벤트/메시지를 발견하는 용도
);
create table entities (
entity_type VARCHAR(1000),
entity_id VARCHAR(1000),
entity_version VARCHAR(1000) NOT NULL,
PRIMARY KEY(entity_type, entity_id)
);
create table snapshots (
entity_type VARCHAR(1000),
entity_id VARCHAR(1000),
entity_version VARCHAR(1000),
snapshot_type VARCHAR(1000) NOT NULL,
snapshot_json VARCHAR(1000), NOT NULL,
triggering_events VARCHAR(100),
PRIMARY KEY(entity_type, entity_id, entity_version)
);
events
: 이벤트를 저장entities
: 엔티티당 로우 하나
snapshots
: 스냅샷을 저장
find
, create
, update
3개의 작업을 지원find
- 가장 최근 스냅샷을 조회 후 존재하면 events
테이블에서 event_id
가 스냅샷의 entity_version
보다 크거나 같은 이벤트를 모두 찾고, 스냅샷이 존재하지 않으면 주어진 엔티티 이벤트를 모두 조회하고 entity
테이블에서 엔티티 현재 버전을 가져온다.create
- entities
테이블에 새 로우를 삽입하고 events
테이블에 이벤트를 삽입update
- events
테이블에 이벤트 삽입 및 entities
테이블에 버전을 업데이트해 낙관적 잠금 체크를 수행ReflectiveMutableCommandProcessingAggregate
public class Order extends ReflectiveMutableCommandProcessingAggregate<Order, OrderCommand> {
public List<Event> process(CreateOrderCommand command) { ... }
public void apply(OrderCreateEvent event) { ... }
// ...
}
public interface OrderCommand extends Command {}
public class CreateOrderCommand implements OrderCommand { ... }
Event
를 상속public interface OrderEvent extends Event {}
public class OrderCreated implements OrderEvent { ... }
AggregateRepository
save()
, find()
, update()
메서드가 오버로드되어 있다.save()
process()
를 호출하여 커맨드를 처리apply()
를 호출하여 생성된 이벤트 적용update()
public class OrderService {
private AggregateRepository<Order, OrderCommand> orderRepository;
public EntityWithIdAndVersion<Order> createOrder(OrderDetails orderDetails) {
return orderRepository.save(new CreateOrder(orderDetails));
}
}
@EventSubscriber(id="orderServiceEventHandlers")
public class OrderServiceEventHandlers {
@EventHandlerMethod
public void creditReserved(EventHandlerContext<CreditReserved> ctx) {
CreditReserved event = ctx.getEvent();
// ...
}
}
@EventSubscriber
로 이벤트를 처리할 스프링 빈 지정@EventHandlerMethod
는 메서드를 이벤트 핸들러로 식별시킴EventHandlerContext
매개변수를 받는다.
@Autowired
private SagaManager<CreateOrderSagaState> createOrderSagaManager;
@Transactional // 한 트랜잭션에서 수행
public EntityWithIdAndVersion<Order> createOrder(OrderDetails orderDetails) {
EntityWithIdAndVersion<Order> order = orderRepository.save(new CreateOrder(orderDetails)); // 애그리거트 생성
CreateOrderSagaState data = new CreateOrderSagaState(order.getId(), orderDetails); // 사가 생성
createOrderSagaManager.create(data, Order.class, order.getId());
return order;
}
// ...
CreateOrderSaga
생성 과정
OrderCreated
이벤트를 이벤트 저장소에 저장CreateOrderSaga
생성SagaReplyRequested
를 모두 이벤트 저장소에 저장SagaReplyRequested
이벤트 핸들러는 이벤트의 데이터로 응답 메시지를 만들어 오케스트레이터 응답 채널에 출력SagaCommandDispatcher
가 AccountingServiceCommandHandler
호출
AccountingServiceCommandHandler
는 Accounting
애그리거트로 커맨드를 전송AccountAuthorizedEvent
와 SagaReplyRequestedEvent
두 이벤트 발행SagaReplyRequested
이벤트 핸들러는 주문 생성 사가에 응답 메시지를 전송하여 처리AccountingServiceCommandHandler
의 커맨드 메시지 처리 코드public class AccountingServiceCommandHandler {
@Autowired
private AggregateRepository<Account, AccountCommand> accountRepository;
public void authorize(CommandMessage<AuthorizeCommand> cm) {
AuthorizeCommand command = cm.getCommand();
AccountRepository.update(command.getOrderId(), command,
replyingTo(cm)
.catching(AccountDisabledException.class,
() -> withFailure(new AccountDisabledReply()))
.build());
}
}
update()
메서드의 세 번째 옵션은 UpdateOptions
SagaReplyRequestedEvent
를 추가하여 이벤트 저장소에 저장AccountDisabledException
을 던지면 기본 에러 응답 대신 AccountDisabledReply
를 전송SagaOrchestratorCreated
SagaOrchestratorUpdated
SagaCommandEvent
를 저장하는 방식으로 접근할 수 있다.
SagaCommandEvent
를 발행하여 이벤트 저장소에 저장SagaCommandEvent
처리 후 커맨드 메시지를 목적지 메시지 채널로 전송