テラバイト級のログデータを処理する際、最も恐ろしいのはジョブが99%の進捗で数時間停止する現象だ。これは典型的なSparkチューニングの問題であり、特定のパーティションにデータが集中するData Skew(データの偏り)が原因であるケースが9割を占める。本稿では、プロダクション環境で発生した深刻な遅延を、Salting(ソルティング)手法とBroadcast Joinの最適化によって解決した実録を公開する。
現象:なぜSparkジョブは最後の1タスクで詰まるのか
ビッグデータ処理において、均等分散は理想郷に過ぎない。現実のデータセット(特にユーザーIDや地域コードをキーにする場合)は、ロングテール分布に従う。特定のキー(例: null値や特定の大口ユーザー)にレコードが集中すると、そのキーを担当するExecutorだけが過負荷となり、全体のボトルネックとなる。
Spark UIを確認し、ステージ内の他のタスクが数秒で終わっているのに、1つのタスクだけが何時間も実行中の場合、それは間違いなくSkewの問題だ。Executorのメモリ不足(OOM)エラーが頻発する場合も同様である。
解決策1:Salting(ソルティング)による強制分散
Spark 3.0以降のAQE(Adaptive Query Execution)でもある程度のSkew緩和は可能だが、極端な偏りがある場合は手動でのSaltingが唯一の解となる。これは結合キーにランダムな接尾辞(Salt)を付与し、巨大なパーティションを強制的に分割する手法だ。
実装コード:PySparkによるSalting結合
以下は、偏りのある大規模テーブル(df_skew)と小規模テーブル(df_small)を結合する際のSalting実装例である。
from pyspark.sql.functions import col, lit, rand, floor, explode, array
# 設定:Saltの分割数(偏りの度合いに応じて調整。通常はExecutor数の倍数)
SALT_NUMBER = 100
# 1. 左側(偏っているテーブル)にランダムなSalt IDを付与
# キーを "key_0" 〜 "key_99" のように分散させる
df_skew_salted = df_skew.withColumn(
"salt_id",
(rand() * SALT_NUMBER).cast("int")
)
# 2. 右側(小さいテーブル)をSaltの数だけ複製(Explode)
# 各レコードを0〜99の全てのSalt IDと紐付ける
df_small_salted = df_small.withColumn(
"salt_id_list",
array([lit(i) for i in range(SALT_NUMBER)])
).select(
"*",
explode(col("salt_id_list")).alias("salt_id")
).drop("salt_id_list")
# 3. 本来のキー + Salt ID で結合
# これにより、特定のキーに集中していた処理が100個のタスクに分散される
df_result = df_skew_salted.join(
df_small_salted,
on=["join_key", "salt_id"],
how="inner"
).drop("salt_id")
この手法を適用することで、単一Executorで処理されていた1,000万件のレコードが100個のExecutorに分散される。データエンジニアリングの現場では、これにより処理時間が4時間から15分に短縮された事例がある。
解決策2:Broadcast Joinの戦略的活用
結合する片方のテーブルがメモリに乗るサイズである場合、シャッフル(SortMergeJoin)を発生させるのは無駄だ。Broadcast Joinを使用すれば、小規模テーブルを全Executorにコピーして配布することで、ネットワークI/Oコストの高いシャッフルを回避できる。
しきい値の調整と強制ヒント
Sparkはデフォルトでspark.sql.autoBroadcastJoinThreshold(デフォルト10MB)以下のテーブルを自動的にブロードキャストする。しかし、昨今のサーバスペックを考慮すれば、この値は保守的すぎる。
- 設定変更: メモリに余裕がある場合、しきい値を引き上げる(例:100MB〜1GB)。
- 強制ヒント: Sparkのオプティマイザがサイズ推定を誤る場合、明示的に
broadcast()ヒントを使用する。
from pyspark.sql.functions import broadcast
# 明示的にブロードキャストを指定
# 注意:ドライバおよびExecutorのメモリに収まるサイズであることを確認すること
df_optimized = df_large.join(
broadcast(df_small_ref),
on="user_id",
how="left"
)
Broadcast対象のテーブルが大きすぎる(数GB以上)と、Driver側で
OutOfMemoryErrorが発生したり、ブロードキャストパケットの転送コストがシャッフルコストを上回る場合がある。Sparkチューニングにおいては、データサイズの見極めが肝心だ。
| 結合方式 | メリット | デメリット | 適用シナリオ |
|---|---|---|---|
| SortMergeJoin | 大規模データ同士の結合に堅牢 | シャッフルによるネットワーク/ディスクI/Oが大 | 両方のテーブルが巨大な場合 |
| BroadcastJoin | シャッフルなし、超高速 | メモリ制限あり、Driver負荷増 | 片方が小さい(<1GB目安)場合 |
| Skew Join (Salting) | データ偏りを物理的に解消 | データ爆発(Explode)による計算コスト増 | 極端なキー偏りがある場合 |
結論:攻めのチューニングを実施せよ
Sparkのデフォルト設定は「安全に動く」こと重視されているが、「速く動く」ようには設定されていない。Data Skewに遭遇した際は、まずAQEが有効か確認し、それでも解消しない場合はSaltingを実装すべきだ。また、Broadcast Joinは最も手軽で効果の高いチューニング手法の一つである。
インフラコストを削減し、SLAを遵守するために、これらのテクニックをコードレビューの標準チェックリストに組み込むことを推奨する。
Post a Comment