Order 애그리거트를 이벤트 소싱으로 저장한다면 EVENTS 테이블의 여러 로우로 저장한다.| event_id | event_type | entity_type | entity_id | event_data |
|---|---|---|---|---|
| 102 | OrderCreated | Order | 101 | {…} |
| 103 | OrderApproved | Order | 101 | {…} |
| 104 | OrderShipped | Order | 101 | {…} |
| 105 | OrderDelivered | Order | 101 | {…} |
apply()를 호출Class aggregateClass = ...;
Aggregate aggregate = aggregateClass.newInstance();
for (Event event : events) {
aggregate = aggregate.applyEvent(event);
}
// 애그리거트 사용
process(), apply()라는 이름으로 명명했다.// 기존 로직
public class Order {
public List<DomainEvent> revise(OrderRevision orderRevision) {
switch (state) {
case AUTHORIZED:
LineItemQuantityChange change =
orderLineItems.lineItemQuantityChange(orderRevision);
if (change.newOrderTotal.isGreaterThanOrEqual(orderMinimum) {
throw new OrderMinimumNotMetException();
}
this.state = REVISION_PENDING;
return ...;
default:
throw new UnsupportedStateTransitionException(state);
}
}
}
// 이벤트 소싱 로직
public class Order {
public List<Event> process(ReviseOrder command) { // Order 업데이트 없이 이벤트 반환
OrderRevision orderRevision = command.getOrderRevision();
switch (state) {
case AUTHORIZED:
LineItemQuantityChange change =
orderLineItems.lineItemQuantityChange(orderRevision);
if (change.newOrderTotal.isGreaterThanOrEqual(orderMinimum) {
throw new OrderMinimumNotMetException();
}
return singletonList(
new OrderRevisionProposed(...));
default:
throw new UnsupportedStateTransitionException(state);
}
}
public void apply(OrderRevisionProposed event) { // 이벤트를 적용하여 Order 업데이트
this.state = REVISION_PENDING;
}
}
process()를 호출하여 새 이벤트 발생apply()를 호출하여 애그리거트 수정apply() 호출process()를 호출하여 새 이벤트를 발생시킴apply()를 호출하여 애그리거트 수정OUTBOX 테이블에 일시적으로 저장하는 것이 아닌 영구 저장한다는 중요한 차이점이 있다.SELECT 문으로 새 이벤트를 조회해 메시지 브로커에 발행하게 된다.EVENT_ID를 기록할 수 있지만 문제가 존재한다.
EVENTS 테이블에 이벤트 발행 여부를 추적하는 컬럼을 추가하면 이벤트 누락을 해결할 수 있다.
PUBLISHED 컬럼이 0인 미발행 이벤트 검색PUBLISHED = 1로 변경Order 같은) 애그리거트는 이벤트를 쿼리해서 재구성하는 것이 효율적이다.Account (계좌)Class aggregateClass = ...;
Snapshot = snapshot = ...;
Aggregate aggregate = recreateFromSnapshot(aggregateClass, snapshot);
for (Event event : events) {
aggregate = aggregate.applyEvent(event);
}
// 애그리거트 사용
PROCESSED_MESSAGS 테이블에, 이벤트는 EVENTS 테이블에 삽입하여 중복을 솎아 낸다.| 수준 | 변경 | 하위 호환성 |
|---|---|---|
| 스키마 | 새 애그리거트 타입 정의 | 예 |
| 애그리거트 삭제 | 기존 애그리거트 삭제 | 아니요 |
| 애그리거트 개명 | 애그리거트 타입명 변경 | 아니요 |
| 애그리거트 | 새 이벤트 타입 추가 | 예 |
| 이벤트 삭제 | 이벤트 타입 삭제 | 아니요 |
| 이벤트 개명 | 이벤트 타입명 변경 | 아니요 |
| 이벤트 | 새 필드 추가 | 예 |
| 필드 삭제 | 필드 삭제 | 아니요 |
| 필드 개명 | 필드명 변경 | 아니요 |
| 필드 타입 변경 | 필드 타입 변경 | 아니요 |
create table events (
event_id varchar(1000) PRIMARY KEY,
event_type varchar(1000),
event_data varchar(1000) NOT NULL,
entity_type VARCHAR(1000) NOT NULL,
entity_id VARCHAR(1000) NOT NULL,
triggering_vent VARCHAR(1000) -- 중복 이벤트/메시지를 발견하는 용도
);
create table entities (
entity_type VARCHAR(1000),
entity_id VARCHAR(1000),
entity_version VARCHAR(1000) NOT NULL,
PRIMARY KEY(entity_type, entity_id)
);
create table snapshots (
entity_type VARCHAR(1000),
entity_id VARCHAR(1000),
entity_version VARCHAR(1000),
snapshot_type VARCHAR(1000) NOT NULL,
snapshot_json VARCHAR(1000), NOT NULL,
triggering_events VARCHAR(100),
PRIMARY KEY(entity_type, entity_id, entity_version)
);
events: 이벤트를 저장entities: 엔티티당 로우 하나
snapshots: 스냅샷을 저장
find, create, update 3개의 작업을 지원find - 가장 최근 스냅샷을 조회 후 존재하면 events 테이블에서 event_id가 스냅샷의 entity_version보다 크거나 같은 이벤트를 모두 찾고, 스냅샷이 존재하지 않으면 주어진 엔티티 이벤트를 모두 조회하고 entity 테이블에서 엔티티 현재 버전을 가져온다.create - entities 테이블에 새 로우를 삽입하고 events 테이블에 이벤트를 삽입update - events 테이블에 이벤트 삽입 및 entities 테이블에 버전을 업데이트해 낙관적 잠금 체크를 수행ReflectiveMutableCommandProcessingAggregate
public class Order extends ReflectiveMutableCommandProcessingAggregate<Order, OrderCommand> {
public List<Event> process(CreateOrderCommand command) { ... }
public void apply(OrderCreateEvent event) { ... }
// ...
}
public interface OrderCommand extends Command {}
public class CreateOrderCommand implements OrderCommand { ... }
Event를 상속public interface OrderEvent extends Event {}
public class OrderCreated implements OrderEvent { ... }
AggregateRepository
save(), find(), update() 메서드가 오버로드되어 있다.save()
process()를 호출하여 커맨드를 처리apply()를 호출하여 생성된 이벤트 적용update()
public class OrderService {
private AggregateRepository<Order, OrderCommand> orderRepository;
public EntityWithIdAndVersion<Order> createOrder(OrderDetails orderDetails) {
return orderRepository.save(new CreateOrder(orderDetails));
}
}
@EventSubscriber(id="orderServiceEventHandlers")
public class OrderServiceEventHandlers {
@EventHandlerMethod
public void creditReserved(EventHandlerContext<CreditReserved> ctx) {
CreditReserved event = ctx.getEvent();
// ...
}
}
@EventSubscriber로 이벤트를 처리할 스프링 빈 지정@EventHandlerMethod는 메서드를 이벤트 핸들러로 식별시킴EventHandlerContext 매개변수를 받는다.
@Autowired
private SagaManager<CreateOrderSagaState> createOrderSagaManager;
@Transactional // 한 트랜잭션에서 수행
public EntityWithIdAndVersion<Order> createOrder(OrderDetails orderDetails) {
EntityWithIdAndVersion<Order> order = orderRepository.save(new CreateOrder(orderDetails)); // 애그리거트 생성
CreateOrderSagaState data = new CreateOrderSagaState(order.getId(), orderDetails); // 사가 생성
createOrderSagaManager.create(data, Order.class, order.getId());
return order;
}
// ...
CreateOrderSaga 생성 과정
OrderCreated 이벤트를 이벤트 저장소에 저장CreateOrderSaga 생성SagaReplyRequested를 모두 이벤트 저장소에 저장SagaReplyRequested 이벤트 핸들러는 이벤트의 데이터로 응답 메시지를 만들어 오케스트레이터 응답 채널에 출력SagaCommandDispatcher가 AccountingServiceCommandHandler 호출
AccountingServiceCommandHandler는 Accounting 애그리거트로 커맨드를 전송AccountAuthorizedEvent와 SagaReplyRequestedEvent 두 이벤트 발행SagaReplyRequested 이벤트 핸들러는 주문 생성 사가에 응답 메시지를 전송하여 처리AccountingServiceCommandHandler의 커맨드 메시지 처리 코드public class AccountingServiceCommandHandler {
@Autowired
private AggregateRepository<Account, AccountCommand> accountRepository;
public void authorize(CommandMessage<AuthorizeCommand> cm) {
AuthorizeCommand command = cm.getCommand();
AccountRepository.update(command.getOrderId(), command,
replyingTo(cm)
.catching(AccountDisabledException.class,
() -> withFailure(new AccountDisabledReply()))
.build());
}
}
update() 메서드의 세 번째 옵션은 UpdateOptions
SagaReplyRequestedEvent를 추가하여 이벤트 저장소에 저장AccountDisabledException을 던지면 기본 에러 응답 대신 AccountDisabledReply를 전송SagaOrchestratorCreatedSagaOrchestratorUpdatedSagaCommandEvent를 저장하는 방식으로 접근할 수 있다.
SagaCommandEvent를 발행하여 이벤트 저장소에 저장SagaCommandEvent 처리 후 커맨드 메시지를 목적지 메시지 채널로 전송