들어가며
매일 쌓이는 대량의 메트릭 데이터를 집계해서 주간/월간 랭킹을 실시간으로 산출하는 과정을 쉽지 않다.
단순히 "데이터를 읽어서 계산한다"는 직관적인 접근으로 시작한다면, 대량의 데이터를 안정적으로 처리할 수 없을 것이다.
이 글에서는 Spring Batch를 사용한 랭킹 배치 시스템 구현 과정과, 메모리를 지키기 위한 리팩토링 경험을 공유하고자 한다.
요구사항과 초기 설계
사용자들에게 인기 상품을 추천하기 위해 주간/월간 랭킹 기능을 제공한다.
요구사항은 다음과 같다.
- 주간 랭킹 : 매주 월요일부터 일요일까지의 기간 동안 Top 100 상ㅍ품
- 월간 랭킹 : 매월 1일부터 말일까지의 기간 동안 Top 100 상품
- 배치 실행 : 매일 새벽 3시에 전날 데이터를 기반으로 주간/월간 랭킹 계산
데이터 구조를 보면, product_metrics 테이블에는 각 상품의 일별 집계(좋아요 수, 조회 수, 판매 수)가 저장된다.
이 데이터를 주간/월간으로 집계해서 조회 전용 테이블인 MV 테이블에 저장한다.
일별 상품 집계 테이블, 주간 랭킹 mv 테이블, 월간 랭킹 mv 테이블의 구조는 다음과 같다.
-- 일별 메트릭 테이블
CREATE TABLE product_metrics (
id BIGINT PRIMARY KEY,
product_id BIGINT NOT NULL,
metrics_date DATE NOT NULL,
like_count BIGINT NOT NULL,
view_count BIGINT NOT NULL,
sales_count BIGINT NOT NULL,
UNIQUE KEY uk_product_metrics (product_id, metrics_date)
);
-- 주간 랭킹 Materialized View
CREATE TABLE mv_product_rank_weekly (
id BIGINT PRIMARY KEY,
product_id BIGINT NOT NULL,
ranking INT,
score DOUBLE NOT NULL,
period_start_date DATE NOT NULL,
period_end_date DATE NOT NULL,
like_count BIGINT NOT NULL,
view_count BIGINT NOT NULL,
sales_count BIGINT NOT NULL,
UNIQUE KEY uk_weekly_rank (product_id, period_start_date, period_end_date)
);
왜 Spring Batch인가?
랭킹을 계산하는 부분을 단순히 스케줄러와 서비스 레이어만으로 구현할 수 있다. 하지만 몇 가지 문제가 있다.
1. 가장 큰 문제는 대량 데이터 처리의 안정성이다.
상품이 100만 건이 있다고 가정해보자.
100만 건의 데이터를 한 번에 메모리에 올리면 OutOfMemory가 발생할 수 있다.
그리고 트랜잭션도 문제가 된다. 모든 데이터를 처리한 후 한 번에 커밋하면, 중간에 실패했을 때 전체를 다시 처리해줘야 한다.
물론 상품이 소량이라면,, 배치 따윈 필요 없을 수 있다.
2. 다음 문제는 중간에 실패했을 때 발생한다.
일반적인 스케줄러와 서비스 레이어 만으로 구현한다면 단순히 '실행'만 하고, 상태를 기록하지 않는다.
그래서 중간에 실패했을 때 처음부터 다시 시작해줘야 한다. 즉, 어디까지 처리가 완료되었고, 어디부터 다시 시작해야 하는지 알 수 없다.
이런 것들을 고려했을 때 Spring Batch가 적합한 선택이었다.
Spring Batch는 대량 데이터를 청크 단위로 처리할 수 있는 Chunk-Oriented Processing을 제공하고, JobRepository를 통해 실행 이력을 자동으로 관리해준다. 그리고 실패 지점부터 재시작할 수 있는 기능을 제공한다.
Tasklet vs Chunk-Oriented
Spring Batch에는 두 가지 처리 방식이 있다.
Tasklet과 Chunk-Oriented 방식이다.
처음에는 Tasklet이 더 간단해 보였다. 하나의 메서드에서 모든 로직을 처리하면 되기 때문이다.
// Tasklet 방식 예시
@Component
public class RankingTasklet implements Tasklet {
@Override
public RepeatStatus execute(StepContribution contribution, ChunkContext chunkContext) {
// 모든 데이터를 한 번에 메모리에 로드
List<ProductMetrics> allMetrics = productMetricsRepository.findAll();
// 메모리에서 모든 데이터 처리
Map<Long, ProductRankingAggregate> aggregateMap = new HashMap<>();
for (ProductMetrics metric : allMetrics) {
// 처리 로직...
}
// 모든 데이터를 한 번에 저장
rankingRepository.saveAll(aggregateMap.values());
return RepeatStatus.FINISHED;
}
}
하지만 Tasklet 방식에는 한계가 있다. 모든 데이터를 한 번에 메모리에 로드해야 했고, 대량의 데이터를 처리할 때 OOM이 발생할 위험이 있었다. 그리고 트랜잭션도 전체를 한 번에 커밋해야 했기 때문에, 중간에 실패하면 전체를 다시 처리해야 했다.
이 방식은 단순히 스케줄러와 서비스 레이어에서 구현하는 방식과 별반 다르지 않아 보였다.
Chunk-Oriented 방식은 데이터를 청크 단위로 나눠서 처리했다.
만약 청크 단위를 100이라고 한다면
100건씩 읽고, 처리하고, 저장하고, 커밋하는 방식이다.
이렇게 하면 각 청크마다 트랜잭션을 커밋하니까 부분 실패 시에도 해당 청크만 재처리하면 됐다.
그리고 나중에 알게 된 것은, Chunk-Oriented를 사용한다고 해서 자동으로 메모리 문제가 해결되는 것은 아니였다.
Reader에서 청크 단위로 읽는 것만으로는 부족했고, Writer에서도 청크 단위로 처리하고 메모리에서 해제해야 했다.
초기 구현과 문제점
초기 구현은 Reader에서 전날 데이터를 읽고, Processor에서 점수를 계산하고, Writer에서 집계해서 최종 랭킹을 계산하면 된다고 생각했다.
초기 구현의 데이터 흐름은 다음과 같다.
[새벽 3시] 스케줄러 트리거
↓
[Step 시작] @BeforeStep: aggregateMap 초기화
↓
[Reader] 전날 데이터 읽기 (상품 100만 건, 청크 단위 100)
- DB: SELECT * FROM product_metrics WHERE metrics_date = '2024-01-14'
- 청크 1: 100건
- 청크 2: 100건
- ...
- 청크 10,000: 100건
↓
[Processor]
- 메트릭 집계
- 점수 계산: (viewCount × 0.1) + (likeCount × 0.2) + (salesCount × 0.7)
↓
[Writer] 청크 단위로 aggregateMap에 누적
- 청크 1: aggregateMap에 100개 추가
- 청크 2: aggregateMap에 100개 추가 (merge)
- ...
- 최종: aggregateMap에 100만 개 객체 저장
↓
[@AfterStep] 최종 처리
1. 기간 계산: weekStart = 2024-01-08, weekEnd = 2024-01-14
2. 기존 주간 랭킹 조회 (mv_product_rank_weekly)
3. 기존 랭킹 + 전날 데이터 합산
4. 점수 재계산
5. 점수 기준 정렬
6. TOP 100 추출
7. UPSERT
8. TOP 100에서 밀려난 데이터 DELETE
↓
[Step 종료] 주간 랭킹 저장 완료
이 로직의 문제점은 다음과 같다.
Reader에서 청크 단위로 읽고 Processor에서 처리했지만, Writer의 aggregateMap에 모든 데이터를 누적하고 있다.
Step이 시작될 때부터 종료될 때까지 100만 개의 객체가 메모리에 있는 것이다.
그래서 상품이 많아진다면 OOM이 발생할 수 있다.
OOM을 피하기 위해 배치를 선택했고, Chunk-Oriented 방식을 선택했지만, 여기서도 OOM이 발생할 수 있다는 것이다.
하지만 Writer에서 처리한 모든 데이터를 누적해야 하는 이유는 있다.
계산한 점수를 통해 각 상품의 랭킹을 정하기 위해선 결국 모든 상품 데이터(상품과 상품의 점수)가 있어야 하기 때문이다.
해결 방법
메모리 문제를 해결하기 위한 생각은
"메모리에 누적하지 말고, Step이 완료된 후에 다시 가져와서 랭킹을 계산하자"이다.
첫 번째 Step에서는 청크 단위로 읽고, 처리하고, DB에 저장하고, 메모리에서 즉시 해제했다.
두 번째 Step에서는 mv 테이블에서 필요한 데이터를 조회해서 최종 랭킹을 계산했다.
수정한 로직의 데이터 흐름은 다음과 같다.
[새벽 3시] 스케줄러
↓
[Step 1: rankingStep] Chunk-Oriented Processing
↓
[Reader] 전날 데이터 읽기 (상품 100만 건, 청크 단위 100)
- DB: SELECT * FROM product_metrics WHERE metrics_date = '2024-01-14'
↓
[Processor]
- 메트릭 집계
- 점수 계산
↓
[Writer] 청크 단위로 MV 테이블에 UPSERT (ranking = null)
- 청크 1: 100건 UPSERT
- 청크 2: 100건 UPSERT
- ...
- 청크 10,000: 100건 UPSERT
- 메모리에서 즉시 해제!
↓
[Step 2: rankingCalculationStep] Tasklet
↓
[Tasklet] 최종 랭킹 계산
1. MV 테이블에서 period에 해당하는 모든 데이터 조회
2. score 기준으로 정렬
3. TOP 100 추출
4. ranking 필드 업데이트
5. TOP 100에서 밀려난 데이터 DELETE
↓
[Job 종료] 주간 랭킹 저장 완료
아키텍처는 2-Step 구조로, 각 Step의 책임을 명확히 분리했다.
마치며
이 경험을 콩해 배운 것은, Chunk-Oriented Processing을 사용한다고 해서 자동으로 메모리 문제가 해결되는 것은 아니라는 점이다. Reader에서 청크 단위로 읽는 것만으로는 부족했고, Writer에서도 청크 단위로 처리하고 즉시 메모리에서 해제해야 했다.
그리고 2-Step 구조를 활용해서 각 Step의 책임을 명확히 분리하는 것이 중요했던 것 같다.