Spark 작업 모니터링 중 가장 고통스러운 순간은 200개의 태스크 중 199개가 5분 만에 끝났는데, 마지막 1개의 태스크가 4시간 동안 도는 것을 지켜볼 때다. 이는 전형적인 Data Skew(데이터 편향) 현상이다. 수십 TB 규모의 빅데이터 처리 파이프라인에서 이 병목을 해결하지 못하면 클러스터 리소스 비용은 기하급수적으로 늘어난다. 본 가이드에서는 현업 데이터 엔지니어링 과정에서 겪은 Skew 문제의 확실한 해결책인 'Salting' 기법과 네트워크 I/O를 획기적으로 줄이는 'Broadcast Join' 전략을 다룬다.
1. Data Skew의 주범과 Salting 기법
Data Skew는 특정 Key에 데이터가 몰릴 때 발생한다. 예를 들어, 전자상거래 데이터에서 '비회원(Null Key)' 주문이나 특정 'Power User'의 로그가 압도적으로 많을 때, 해당 Key를 처리하는 파티션 하나가 메모리 한계(OOM)에 부딪히거나 무한 루프에 빠진다.
Production Fix: Salting 기법 적용
해결책은 'Salting'이다. 쏠림이 발생한 Key에 랜덤한 접미사(Salt)를 붙여 강제로 파티션을 쪼개는 방식이다.
from pyspark.sql.functions import col, lit, rand, concat, floor
# 1. 대용량 Fact 테이블 (Skew가 발생한 쪽)
# Join Key에 0~N 사이의 랜덤 숫자를 붙여 파티션을 분산시킨다.
SALT_NUMBER = 20
df_skewed = big_table_df.withColumn(
"salted_key",
concat(col("join_key"), lit("_"), floor(rand() * SALT_NUMBER))
)
# 2. 소형 Dimension 테이블
# Join 대상이 되는 테이블은 모든 Salt Key와 매칭되도록 데이터를 N배 뻥튀기(Explode)한다.
df_dim = small_table_df.withColumn("salt_array", array([lit(i) for i in range(SALT_NUMBER)]))
df_dim_exploded = df_dim.select(
col("*"),
explode(col("salt_array")).alias("salt_id")
).withColumn(
"salted_key",
concat(col("join_key"), lit("_"), col("salt_id"))
)
# 3. Salted Key로 Join 수행 (Skew 해소)
result = df_skewed.join(df_dim_exploded, on="salted_key", how="inner")
이 코드를 적용하면 하나의 파티션에 몰려있던 1,000만 건의 데이터가 20개의 파티션으로 분산되어 병렬 처리된다. Spark 튜닝의 핵심은 결국 '병렬성을 얼마나 균일하게 유지하느냐'에 달려있다.
2. 셔플을 없애는 마법: Broadcast Join
일반적인 Join(Sort-Merge Join)은 두 테이블의 데이터를 네트워크를 통해 이동시키는 'Shuffle' 과정을 동반한다. 이는 디스크 I/O와 네트워크 병목의 주원인이다. 한쪽 테이블이 충분히 작다면, 해당 테이블을 모든 Executor의 메모리에 복제(Broadcast)하여 Shuffle을 아예 제거할 수 있다.
Broadcast Join 발동 조건
Spark는 기본적으로 spark.sql.autoBroadcastJoinThreshold(기본값 10MB)보다 작은 테이블을 자동으로 브로드캐스팅한다. 하지만 운영 환경에서는 이 값을 명시적으로 제어하거나 힌트를 사용하는 것이 안전하다.
Driver OOM이 발생하여 애플리케이션 전체가 셧다운된다. 테이블 크기가 1GB를 넘는다면 신중해야 한다.
from pyspark.sql.functions import broadcast
# 명시적 힌트를 사용하여 강제로 Broadcast Join 유도
# 100GB Fact Table과 500MB Dim Table 조인 시 셔플 없이 수행됨
start_time = time.time()
joined_df = big_fact_table.join(
broadcast(small_dim_table),
on="user_id",
how="left"
)
# 실행 계획 확인 (BroadcastHashJoin이 보여야 함)
joined_df.explain()
3. 성능 비교: Sort-Merge vs Broadcast
실제 500GB 로그 데이터와 100MB 메타 데이터를 조인했을 때의 성능 차이는 다음과 같다.
| Join Strategy | Shuffle Write | 실행 시간 | 비고 |
|---|---|---|---|
| Sort-Merge Join (Default) | 120 GB | 24분 | 네트워크 I/O 과다 발생 |
| Broadcast Join | 0 B | 3분 | Map-Side Join으로 처리됨 |
Broadcast Join을 활용하면 셔플 단계(Exchange)가 사라지므로, 네트워크 비용이 '0'에 수렴한다. 이는 클라우드 환경(AWS EMR, Databricks)에서 비용 절감과 직결된다.
Conclusion
Apache Spark 성능 최적화는 '균형'의 싸움이다. Data Skew를 방치하면 클러스터 자원의 90%가 낭비되고, 적절하지 않은 Join 전략은 불필요한 네트워크 트래픽을 유발한다. 데이터가 쏠릴 때는 주저 없이 Salting을 적용하고, 작은 테이블 조인에는 Broadcast Join을 강제하여 셔플을 제거해야 한다. 이 두 가지 패턴만 확실히 적용해도 대부분의 Spark 성능 문제는 해결된다.
Post a Comment