들어가며
카프카를 도입해서 서비스 간 결합도를 낮추는 작업을 진행했다.
전송 방식은 데이터 유실 없이 안정적인 전송을 위해 At-Least-Once(적어도 한 번 전송) 방식으로 구현했다. 그래서 중복 전송에 대한 대비가 필요했다.
프로듀서가 메시지를 보내고 브로커로부터 ACK을 받지 못하면, 프로듀서는 메시지가 유실됐다고 판단하고 재전송을 한다. 이때 실제로는 브로커에 저장이 되었지만 ACK만 유실된 경우라면? 컨슈머는 똑같은 메시지를 두 번 받게 된다.
그래서 컨슈머 쪽에 멱등성 처리를 해주어야 한다.
즉, 카프카가 "정확히 한 번 전송"을 보장해주기 어렵다면, 컨슈머가 "여러 번 받아도 한 번만 처리"하도록 만들면 된다. 결과적으로 시스템 전체는 Exactly-Once 효과를 낼 수 있다.
이 글에서는 이커머스 플랫폼에서 상품 메트릭스(좋아요 수, 조회 수, 판매 수)를 실시간으로 집계하는 기능을 구현하면서 적용한 Outbox 패턴과 event_handled 테이블을 활용한 Kafka Consumer 멱등 처리 방식을 공유하고자 한다.
Kafka 메시지 전달 방식
Kafka의 메시지 전달 보장
카프카는 세 가지 메시지 전달 보장을 제공한다.
- At Most Once (최대 한 번) : 속도는 빠르지만 메시지가 유실될 수 있다.
- At Least Once (적어도 한 번) : ACK을 받을 때까지 보낸다. 그래서 유실은 없지만 중복이 발생할 수 있다.
- Exactly Once (정확히 한 번) : 가장 이상적이지만, 카프카와 외부 DB 간의 트랜잭션을 완벽히 맞추기는 어려울 수 있다.
왜 At-Least-Once를 선택했는가
비즈니스적으로 중요한 이벤트는 절대 유실되어서는 안 된다.
그래서 유실을 방지하기 위해 메시지가 최소 한 번은 전달되는 At-Least-Once 방식을 선택했고,
이 방식을 선택함으로써 발생할 수 있는 중복 메시지는 컨슈머 측에서 멱등 처리를 구현하여 중복 메시지를 한 번만 처리할 수 있도록 처리했다.
Producer 측 구현 : Outbox 패턴
Outbox 패턴의 흐름은 다음과 같다.
- 이벤트 저장
- 비즈니스 로직이 실행되는 트랜잭션 안에서, 보낼 메시지를 바로 카프카로 쏘지 않고 DB의 Outbox 테이블에 PENDING 상태로 저장한다.
- 비동기 발행
- 스케줄러가 주기적으로 Outbox 테이블에서 PENDING 상태인 메시지를 조회한다.
- 카프카 발행 및 상태 업데이트
- 조회된 메시지들을 카프카로 전송한다.
- 성공 시 : 해당 Outbox 상태를 PUBLISHED로 변경한다.
- 실패 시 : 상태를 FAILED로 변경하고 재시도 횟수를 증가시킨다.
- 재시도
- 스케줄러는 FAILED 상태이면서, 재시도 횟수가 남은 메시지들을 조회해서 재전송을 시도한다.
1. Outbox 엔티티
@Entity
@Table(name = "outbox")
@Getter
public class Outbox extends BaseEntity {
@Column(nullable = false, length = 100)
private String topic;
@Column(nullable = false, length = 100)
private String partitionKey; // 이벤트 순서 보장을 위한 키
@Column(nullable = false, columnDefinition = "TEXT")
private String payload; // JSON 직렬화된 이벤트
@Enumerated(EnumType.STRING)
@Column(nullable = false, length = 20)
private OutboxStatus status; // PENDING, PUBLISHED, FAILED
@Column(nullable = false)
private Integer retryCount; // 재시도 횟수
...
}
2. 이벤트 저장 : 트랜잭션 내에서 Outbox 저장
@Component
public class KafkaOutboxEventListener {
private final OutboxService outboxService;
@TransactionalEventListener(phase = TransactionPhase.BEFORE_COMMIT)
public void handleOrderCreated(OrderEvent.OrderCreated event) {
KafkaEvent.OrderEvent.OrderCreated kafkaEvent =
KafkaEvent.OrderEvent.OrderCreated.from(
event.orderKey(),
event.userId(),
event.orderId(),
event.originalTotalPrice(),
event.discountPrice()
);
// 비즈니스 트랜잭션과 동일한 트랜잭션에서 Outbox에 저장
outboxService.saveOutbox("order-created-events", event.orderKey(), kafkaEvent);
}
}
3. 비동기 발행 : Scheduler를 통한 주기적 발행
@Service
public class OutboxService {
private final KafkaTemplate<String, Object> kafkaTemplate;
private final OutboxRepository outboxRepository;
public void publishPendingOutboxes(int batchSize) {
List<Outbox> pendingOutboxes = outboxRepository.findPendingOutboxes(batchSize);
for (Outbox outbox : pendingOutboxes) {
kafkaTemplate
.send(
outbox.getTopic(),
outbox.getPartitionKey(), // Partition Key로 순서 보장
outbox.getPayload()
)
.whenComplete((result, exception) -> {
if (exception == null) {
outboxRepository.markAsPublished(outbox.getId());
} else {
outboxRepository.markAsFailed(outbox.getId(), exception.getMessage());
}
});
}
}
}
@Component
public class OutboxRelayScheduler {
private final OutboxService outboxService;
@Scheduled(fixedDelay = 1000) // 1초마다 실행
public void publishPendingOutboxEvents() {
outboxService.publishPendingOutboxes(100);
}
}
4. Outbox 재시도 로직
@Service
public class OutboxService {
private static final int MAX_RETRIES = 3;
public void retryFailedOutboxes(int batchSize) {
List<Outbox> failedOutboxes =
outboxRepository.findFailedAndRetryableOutboxes(batchSize, MAX_RETRIES);
for (Outbox outbox : failedOutboxes) {
if (outbox.hasExceededMaxRetries(MAX_RETRIES)) {
log.warn("Outbox 재시도 횟수 초과: outboxId={}, retryCount={}",
outbox.getId(), outbox.getRetryCount());
continue; // 최대 재시도 횟수 초과 시 스킵
}
kafkaTemplate
.send(outbox.getTopic(), outbox.getPartitionKey(), outbox.getPayload())
.whenComplete((result, exception) -> {
if (exception == null) {
outboxRepository.markAsPublished(outbox.getId());
} else {
outboxRepository.markAsFailed(outbox.getId(), exception.getMessage());
}
});
}
}
}
이벤트 순서 보장 : Partition Key
카프카는 토픽 전체의 순서를 보장하지 않는다.
토픽이 아닌, 파티션 내에서만 순서를 보장한다.
예를 들어, 하나의 토픽이 3개의 파티션으로 나뉘어 있다면
- 주문 생성 이벤트가 파티션 1번으로 가고,
- 주문 취소 이벤트가 파티션 2번으로 간다면?
- 컨슈머가 파티션 2번을 먼저 읽어버릴 경우, 생성되기도 전에 취소하려는 오류가 발생할 수 있다.
그래서 파티션 키를 사용해서 순서를 보장하려고 한다.
상품 관련 이벤트는 productId, 주문 관련 이벤트는 orderKey를 키로 사용한다.
따라서 특정 주문에 대한 모든 이벤트는 항상 같은 파티션에 순서대로 쌓이고, 컨슈머도 순서대로 처리할 수 있게 된다.
// 상품 관련 이벤트: productId를 Partition Key로 사용
outboxService.saveOutbox("product-liked-events", event.productId().toString(), kafkaEvent);
// 주문 관련 이벤트: orderKey를 Partition Key로 사용
outboxService.saveOutbox("order-created-events", event.orderKey(), kafkaEvent);
Consumer 측 구현 : event_handled 테이블로 멱등 처리
프로듀서가 메시지를 중복해서 보내더라도, 컨슈머가 중복을 걸러낼 수 있다면 시스템은 안전하다.
이를 위해 event_handled 테이블을 도입했다.
event_handled 테이블에 대해 소개를 하자면 다음과 같다.
이미 처리된 이벤트를 추적하는 일종의 처리 이력 테이블이다. 각 이벤트는 고유한 eventId를 가지고, 이 ID를 기준으로 중복 처리를 방지한다.
eventId에 UNIQUE 제약조건을 걸어 데이터베이스 레벨에서 중복 저장을 방지해주고,
각 이벤트가 처리되었는지 여부를 eventId로 확인한다.
그리고 eventType이랑 aggregateKey를 저장해서 로깅이나 모니터링, 디버깅에 활용할 수 있다.
1. event_handled 엔티티
@Entity
@Table(name = "event_handled")
@Getter
public class EventHandled extends BaseEntity {
@Column(nullable = false, unique = true, length = 200)
private String eventId; // 이벤트 고유 ID (PK 역할)
@Column(nullable = false, length = 100)
private String eventType; // 이벤트 타입 (로깅/모니터링용)
@Column(nullable = false, length = 100)
private String aggregateKey; // 집계 키 (productId, orderId 등)
...
}
2. 멱등 처리 로직
@Component
public class MetricsConsumer {
private final EventHandledService eventHandledService;
private final ProductMetricsService productMetricsService;
@KafkaListener(
topics = {"product-liked-events"},
containerFactory = KafkaConfig.BATCH_LISTENER,
groupId = "metrics-consumer-group"
)
@Transactional
public void handleProductLikedEvents(
List<ConsumerRecord<String, Object>> messages,
Acknowledgment acknowledgment
) {
for (ConsumerRecord<String, Object> record : messages) {
KafkaEvent.ProductEvent.ProductLiked event =
(KafkaEvent.ProductEvent.ProductLiked) record.value();
// 1. 멱등성 체크: 이미 처리된 이벤트인지 확인
if (eventHandledService.isAlreadyHandled(event.eventId())) {
log.debug("이미 처리된 이벤트 스킵: eventId={}", event.eventId());
continue; // 중복 메시지는 스킵
}
// 2. 비즈니스 로직 실행
productMetricsService.incrementLikeCount(event.productId());
// 3. 처리 완료 표시 (트랜잭션 내에서)
eventHandledService.markAsHandled(
event.eventId(),
"ProductLiked",
event.productId().toString()
);
}
// 4. 모든 메시지 처리 완료 후 수동 커밋
acknowledgment.acknowledge();
}
}
중복 테스트 검증
테스트 시나리오는 다음과 같다.
- 상품 조회 이벤트 중복 처리
- 동일한 ProductViewed 이벤트를 2번 연속 수신했을 때, 조회 수가 한 번만 증가하는지 확인한다.
- 주문 결제 이벤트 중복 처리
- 동일한 OrderPaid 이벤트를 2번 연속 수신했을 때, 여러 상품의 판매량이 한 번만 증가하는지 확인한다.
- 여러 이벤트 타입 혼합 처리
- 같은 상품에 대한 서로 다른 이벤트 타입이 중복 수신되었을 때, 각 이벤트가 독립적으로 멱등하게 처리되는지 확인한다.
- 상품 좋아요 이벤트 중복 처리
- 동일한 ProductLiked 이벤트를 3번 연속 수신했을 때, 좋아요 수가 한 번만 증가하는지 확인한다.
