Pasé 3 días depurando un job de Procesamiento Big Data que tardaba 4 horas en ejecutarse y fallaba sistemáticamente en el último 1%. El síntoma era clásico: 199 tareas terminaban en segundos, pero 1 tarea se quedaba colgada eternamente hasta lanzar un error de memoria (OOM). No era un problema de clúster; era un problema de física.
En este artículo, documento la solución exacta que utilizamos para arreglar el Sesgo de datos (Data Skew) en producción y cómo usamos estrategias de Broadcast Join para reducir el tiempo de ejecución de horas a minutos. Sin teoría innecesaria, solo código y configuración.
1. El Enemigo: Data Skew (Sesgo de Datos)
El "Data Skew" ocurre cuando una partición de datos es significativamente más grande que las demás. En Ingeniería de datos, esto suele suceder cuando una clave de join (ej. null, o un user_id muy popular) concentra el 90% de los registros.
Solución A: Adaptive Query Execution (AQE) - La vía rápida
Si usas Spark 3.x, habilita AQE. Esta funcionalidad detecta particiones sesgadas en tiempo de ejecución y las divide automáticamente.
# Configuración para activar en SparkSession
spark.conf.set("spark.sql.adaptive.enabled", "true")
spark.conf.set("spark.sql.adaptive.skewJoin.enabled", "true")
# Ajuste fino (Opcional)
# Define qué tan grande debe ser una partición para considerarse sesgada (ej. 5x la mediana)
spark.conf.set("spark.sql.adaptive.skewJoin.skewedPartitionFactor", "5")
Solución B: Salting (La opción nuclear)
Si AQE no es suficiente (o estás en una versión antigua), debes usar "Salting". Esta técnica consiste en añadir un número aleatorio a la clave sesgada para redistribuirla entre múltiples particiones.
El truco está en "explotar" la tabla pequeña para que coincida con las nuevas claves saladas.
from pyspark.sql.functions import col, lit, rand, floor, explode, array
# 1. Definir el factor de sal (ej. dividir la data en 10 partes)
SALT_FACTOR = 10
# 2. Tabla Grande (Skewed): Añadir columna 'salt' aleatoria (0 a 9)
df_big_salted = df_big.withColumn(
"salt",
floor(rand() * SALT_FACTOR)
)
# 3. Tabla Pequeña: Replicarla para cada posible valor de salt
df_small_salted = df_small.withColumn(
"salt_array",
array([lit(i) for i in range(SALT_FACTOR)]) # Crea [0, 1, ..., 9]
).select(
"*",
explode(col("salt_array")).alias("salt") # Explota en 10 filas por cada 1 original
).drop("salt_array")
# 4. Join usando la clave original + la sal
df_result = df_big_salted.join(
df_small_salted,
on=["join_key", "salt"],
how="inner"
).drop("salt")
2. La Estrategia: Broadcast Join
El Broadcast Join es la técnica más efectiva para Tuning Spark cuando unes una tabla gigante con una pequeña (ej. Tabla de Hechos vs Tabla de Dimensiones). En lugar de mover gigabytes de datos por la red (Shuffle), Spark envía una copia de la tabla pequeña a cada ejecutor.
Configuración Automática
Spark hace esto automáticamente si la tabla es menor a 10MB. En producción, suelo aumentar este umbral, pero con cuidado.
# Aumentar umbral a 100MB (Cuidado con la memoria del Driver)
spark.conf.set("spark.sql.autoBroadcastJoinThreshold", "104857600")
Forzar el Broadcast (Broadcast Hint)
A veces Spark estima mal el tamaño de la tabla. Puedes forzar el comportamiento usando broadcast().
from pyspark.sql.functions import broadcast
# Forzamos que 'df_small' sea enviada a todos los nodos
# Esto elimina el Shuffle de la tabla grande 'df_large'
final_df = df_large.join(broadcast(df_small), "id")
OutOfMemoryError. Úsalo solo cuando estés seguro del tamaño.
Comparativa de Rendimiento (Caso Real)
| Estrategia | Tiempo Ejecución | Uso de Shuffle | Resultado |
|---|---|---|---|
| SortMerge Join (Default) | 4h 12m | 3.2 TB | Fallido (Skew OOM) |
| AQE Activado | 45m | 3.2 TB | Éxito (Lento) |
| Salting Manual | 28m | 3.5 TB | Éxito (Estable) |
| Broadcast Join | 8m | 0 GB | Óptimo |
Conclusión
El rendimiento en Spark no se trata de añadir más servidores, se trata de entender cómo se mueven los datos. Para jobs críticos de producción:
- Siempre revisa si puedes usar Broadcast Join primero (es la victoria más fácil).
- Activa AQE por defecto en Spark 3.0+.
- Usa Salting solo cuando detectes un stage bloqueado por una sola tarea masiva.
Aplicar estas técnicas no solo reduce costos de nube, sino que estabiliza tus pipelines de datos para que puedas dormir tranquilo.
Post a Comment