TIL

6장 비즈니스 로직 개발: 이벤트 소싱

6.1 이벤트 소싱 응용 비즈니스 로직 개발

6.1.1 기존 영속화의 문제점

객체-관계 임피던스 부정합

애그리거트 이력이 없다

감사 로깅은 구현하기 힘들고 오류도 자주 발생

이벤트 발행 로직이 비즈니스 로직에 추가된다

6.1.2 이벤트 소싱 개요

이벤트를 이용하여 애그리거트를 저장

event_id event_type entity_type entity_id event_data
102 OrderCreated Order 101 {…}
103 OrderApproved Order 101 {…}
104 OrderShipped Order 101 {…}
105 OrderDelivered Order 101 {…}
Class aggregateClass = ...;
Aggregate aggregate = aggregateClass.newInstance();
for (Event event : events) {
    aggregate = aggregate.applyEvent(event);
}
// 애그리거트 사용

이벤트는 곧 상태 변화

애그리거트 메서드의 관심사는 오직 이벤트

// 기존 로직
public class Order {
    public List<DomainEvent> revise(OrderRevision orderRevision) {
        switch (state) {
            case AUTHORIZED:
                LineItemQuantityChange change = 
                    orderLineItems.lineItemQuantityChange(orderRevision);
                if (change.newOrderTotal.isGreaterThanOrEqual(orderMinimum) {
                    throw new OrderMinimumNotMetException();
                }
                this.state = REVISION_PENDING;
                return ...;
            default:
                throw new UnsupportedStateTransitionException(state);
        }
    }
}

// 이벤트 소싱 로직
public class Order {
    
    public List<Event> process(ReviseOrder command) { // Order 업데이트 없이 이벤트 반환
        OrderRevision orderRevision = command.getOrderRevision();
        switch (state) {
            case AUTHORIZED:
                LineItemQuantityChange change = 
                    orderLineItems.lineItemQuantityChange(orderRevision);
                if (change.newOrderTotal.isGreaterThanOrEqual(orderMinimum) {
                    throw new OrderMinimumNotMetException();
                }
                return singletonList(
                    new OrderRevisionProposed(...));
            default:
                throw new UnsupportedStateTransitionException(state);
        }        
    }
    
    public void apply(OrderRevisionProposed event) { // 이벤트를 적용하여 Order 업데이트
        this.state = REVISION_PENDING;
    }
}

6.1.3 동시 업데이트: 낙관적 잠금

6.1.4 이벤트 소싱과 이벤트 발행

이벤트 발행: 폴링

이벤트 발행: 트랜잭션 로그 테일링

6.1.5 스냅샷으로 성능 개선

Class aggregateClass = ...;
Snapshot = snapshot = ...;
Aggregate aggregate = recreateFromSnapshot(aggregateClass, snapshot);
for (Event event : events) {
    aggregate = aggregate.applyEvent(event);
}
// 애그리거트 사용

6.1.6 멱등한 메시지 처리

RDBMS 이벤트 저장소 사용

NoSQL 이벤트 저장소 사용

6.1.7 도메인 이벤트 발전시키기

이벤트 스키마

수준 변경 하위 호환성
스키마 새 애그리거트 타입 정의
애그리거트 삭제 기존 애그리거트 삭제 아니요
애그리거트 개명 애그리거트 타입명 변경 아니요
애그리거트 새 이벤트 타입 추가
이벤트 삭제 이벤트 타입 삭제 아니요
이벤트 개명 이벤트 타입명 변경 아니요
이벤트 새 필드 추가
필드 삭제 필드 삭제 아니요
필드 개명 필드명 변경 아니요
필드 타입 변경 필드 타입 변경 아니요

업캐스팅을 통한 스키마 변화 관리

6.1.8 이벤트 소싱의 장점

6.1.9 이벤트 소싱의 단점

6.2 이벤트 저장소 구현

6.2.1 이벤추에이트 로컬 이벤트 저장소의 작동 원리

이벤추에이트 로컬의 이벤트 DB 스키마

create table events (
    event_id varchar(1000) PRIMARY KEY,
    event_type varchar(1000),
    event_data varchar(1000) NOT NULL,
    entity_type VARCHAR(1000) NOT NULL,
    entity_id VARCHAR(1000) NOT NULL,
    triggering_vent VARCHAR(1000) -- 중복 이벤트/메시지를 발견하는 용도
);

create table entities (
    entity_type VARCHAR(1000),
    entity_id VARCHAR(1000),
    entity_version VARCHAR(1000) NOT NULL,
    PRIMARY KEY(entity_type, entity_id)
);

create table snapshots (
    entity_type VARCHAR(1000),
    entity_id VARCHAR(1000),
    entity_version VARCHAR(1000),
    snapshot_type VARCHAR(1000) NOT NULL,
    snapshot_json VARCHAR(1000), NOT NULL,
    triggering_events VARCHAR(100),
    PRIMARY KEY(entity_type, entity_id, entity_version)
);

이벤추에이트 로컬의 이벤트 브로커를 구독하여 이벤트를 소비

이벤추에이트 로컬 이벤트 릴레이가 이벤트를 DB에서 메시지 브로커로 전파

6.2.2 자바용 이벤추에이트 클라이언트 프레임워크

애그리거트 정의: ReflectiveMutableCommandProcessingAggregate 클래스

public class Order extends ReflectiveMutableCommandProcessingAggregate<Order, OrderCommand> {

    public List<Event> process(CreateOrderCommand command) { ... }

    public void apply(OrderCreateEvent event) { ... }

    // ...    
}

애그리거트 커맨드 정의

public interface OrderCommand extends Command {}

public class CreateOrderCommand implements OrderCommand { ... }

도메인 이벤트 정의

public interface OrderEvent extends Event {}

public class OrderCreated implements OrderEvent { ... }

AggregateRepository 클래스로 애그리거트 생성, 검색, 수정

public class OrderService {
    private AggregateRepository<Order, OrderCommand> orderRepository;

    public EntityWithIdAndVersion<Order> createOrder(OrderDetails orderDetails) {
        return orderRepository.save(new CreateOrder(orderDetails));
    }
}

도메인 이벤트 구독

@EventSubscriber(id="orderServiceEventHandlers")
public class OrderServiceEventHandlers {
    
    @EventHandlerMethod
    public void creditReserved(EventHandlerContext<CreditReserved> ctx) {
        CreditReserved event = ctx.getEvent();
        // ...
    }
}

6.3 사가와 이벤트 소싱을 접목

6.3.1 코레오그래피 사가 구현: 이벤트 소싱

6.3.2 오케스트레이션 사가 생성

사가 오케스트레이터 작성: RDBMS 이벤트 저장소 사용 서비스

@Autowired
private SagaManager<CreateOrderSagaState> createOrderSagaManager;

@Transactional // 한 트랜잭션에서 수행
public EntityWithIdAndVersion<Order> createOrder(OrderDetails orderDetails) {
    EntityWithIdAndVersion<Order> order = orderRepository.save(new CreateOrder(orderDetails)); // 애그리거트 생성
    CreateOrderSagaState data = new CreateOrderSagaState(order.getId(), orderDetails); // 사가 생성
    createOrderSagaManager.create(data, Order.class, order.getId());
    return order;
}
// ...

사가 오케스트레이터 작성: NoSQL 이벤트 저장소 사용 서비스

6.3.3 이벤트 소싱 기반의 사가 참여자 구현

커맨드 메시지를 멱등하게 처리

응답 메시지를 원자적으로 전송

예제: 이벤트 소싱 기반의 사가 참여자

public class AccountingServiceCommandHandler {
    @Autowired
    private AggregateRepository<Account, AccountCommand> accountRepository;

    public void authorize(CommandMessage<AuthorizeCommand> cm) {
        AuthorizeCommand command = cm.getCommand();
        AccountRepository.update(command.getOrderId(), command, 
            replyingTo(cm)
                .catching(AccountDisabledException.class, 
                    () -> withFailure(new AccountDisabledReply()))
            .build());
    }
}

6.3.4 사가 오케스트레이터 구현: 이벤트 소싱

이벤트 소싱으로 사가 오케스트레이터 저장

커맨드 메시지를 확실하게 전송

응답을 꼭 한 번만 처리