Spark Performance Tuning: A Practical Guide for Data Engineers
Spark Performance Tuning: A Practical Guide for Data Engineers
Spark is powerful, but untouched default configurations will kill your query performance at scale. Whether you're running a 10-node cluster or a 200-node behemoth, the same performance principles apply. This guide covers the practical tuning techniques that make the difference between a 2-hour job and a 10-minute job.
The Mental Model: Why Spark Jobs Are Slow
Before tuning, you need to understand the three main sources of Spark slowness:
- Shuffle — Moving data across the network between stages (caused by joins, groupBys, repartitions)
- Memory pressure — Spilling to disk when executors can't hold data in RAM
- Skew — A few partitions being 100x larger than others, making the job wait on the stragglers
Every tuning decision addresses one or more of these. Let's go through each.
1. Understanding Your Job with Spark UI
Before tuning anything, profile first. Open the Spark UI (accessible from the Databricks cluster page) and look for:
- Stage duration: Which stages take the longest?
- Shuffle read/write: High shuffle = potential for optimization
- Spill: Memory spill to disk is a major slowdown indicator
- Skew: Task duration variance within a stage (some tasks 100x slower = skew)
# Enable event logging for detailed profiling
spark.conf.set("spark.eventLog.enabled", "true")
spark.conf.set("spark.eventLog.dir", "dbfs:/spark-events")
# Check partition size distribution
df = spark.table("prod.gold.events")
df.groupBy(spark_partition_id()).count().orderBy("count", ascending=False).show(20)
2. Memory Configuration
Executor memory is split into regions. Getting this wrong causes spill.
# Executor memory allocation
spark.conf.set("spark.executor.memory", "16g")
spark.conf.set("spark.executor.memoryOverhead", "4g") # Off-heap for native memory
# Memory fraction split: execution vs storage
spark.conf.set("spark.memory.fraction", "0.8") # 80% of heap for Spark
spark.conf.set("spark.memory.storageFraction", "0.3") # 30% of that for caching
# Driver memory (for collect(), toPandas(), large broadcasts)
spark.conf.set("spark.driver.memory", "8g")
spark.conf.set("spark.driver.memoryOverhead", "2g")
Detecting memory spill:
# Check for spill in the SQL metrics (Spark UI > SQL tab)
# Or programmatically via the metrics system
# High "Spill (Memory)" and "Spill (Disk)" values = increase executor memory
3. Shuffle Optimization
Shuffle is the most expensive operation in Spark. Reducing shuffle data size is always the highest-ROI optimization.
Tune shuffle partitions
The default is 200 shuffle partitions — almost always wrong for your workload:
# Rule of thumb: target 128MB–256MB per partition after shuffle
# Check your shuffle output size in Spark UI → Stages → Shuffle Write
# Then: num_partitions = total_shuffle_bytes / 200_000_000
# For small datasets (< 10GB after shuffle):
spark.conf.set("spark.sql.shuffle.partitions", "50")
# For large datasets (> 100GB after shuffle):
spark.conf.set("spark.sql.shuffle.partitions", "1000")
# Or let AQE handle it automatically (recommended on Databricks):
spark.conf.set("spark.sql.adaptive.enabled", "true")
spark.conf.set("spark.sql.adaptive.coalescePartitions.enabled", "true")
Broadcast joins — eliminate shuffle for small tables
The most impactful single optimization for join-heavy jobs:
from pyspark.sql.functions import broadcast
# Manual broadcast hint
result = large_orders_df.join(
broadcast(small_country_codes_df),
on="country_code",
how="left"
)
# Or configure the threshold (default: 10MB — often too conservative)
spark.conf.set("spark.sql.autoBroadcastJoinThreshold", str(100 * 1024 * 1024)) # 100MB
# Disable broadcast entirely when it causes OOM on driver
spark.conf.set("spark.sql.autoBroadcastJoinThreshold", "-1")
Reduce shuffle data size
# Filter and project BEFORE joining — push down predicates
result = (
spark.table("prod.gold.events")
.filter("event_date >= '2024-01-01'") # Filter early!
.select("user_id", "event_type", "amount") # Only needed columns
.join(users_df, "user_id")
)
# Avoid Python UDFs where possible — use Spark SQL built-ins
# BAD: Python UDF (serialization overhead)
from pyspark.sql.functions import udf
@udf("string")
def slow_udf(x): return x.upper()
# GOOD: Native Spark function
from pyspark.sql.functions import upper
df.withColumn("name_upper", upper("name"))
4. Adaptive Query Execution (AQE)
AQE is Spark 3.x's automatic runtime optimizer. Enable it — it handles many tuning decisions automatically:
# Enable AQE (default ON in Databricks Runtime 8+)
spark.conf.set("spark.sql.adaptive.enabled", "true")
# Automatic partition coalescing (merges small partitions after shuffle)
spark.conf.set("spark.sql.adaptive.coalescePartitions.enabled", "true")
spark.conf.set("spark.sql.adaptive.coalescePartitions.minPartitionNum", "1")
spark.conf.set("spark.sql.adaptive.advisoryPartitionSizeInBytes", "256mb")
# Automatic skew join handling
spark.conf.set("spark.sql.adaptive.skewJoin.enabled", "true")
spark.conf.set("spark.sql.adaptive.skewJoin.skewedPartitionFactor", "5")
spark.conf.set("spark.sql.adaptive.skewJoin.skewedPartitionThresholdInBytes", "256mb")
# Convert sort-merge joins to broadcast joins at runtime when possible
spark.conf.set("spark.sql.adaptive.localShuffleReader.enabled", "true")
5. Handling Data Skew
Skew happens when one key has vastly more data than others (e.g., a customer_id that accounts for 40% of your orders table). One partition processes millions of rows while the other 999 finish in seconds.
# Detect skew
from pyspark.sql.functions import spark_partition_id, count
df = spark.table("prod.gold.orders")
(df.groupBy(spark_partition_id().alias("partition_id"))
.agg(count("*").alias("row_count"))
.orderBy("row_count", ascending=False)
.show(20))
# Fix 1: Salting technique for skewed joins
from pyspark.sql.functions import col, concat, lit, rand, floor, explode, array
SALT_FACTOR = 50
# Skewed side: add random salt
skewed_df = (
spark.table("prod.gold.large_orders")
.withColumn("salt", (rand() * SALT_FACTOR).cast("int"))
.withColumn("customer_id_salted", concat(col("customer_id"), lit("_"), col("salt")))
)
# Small side: replicate across all salt values
small_df = spark.table("prod.gold.customers")
small_df_salted = (
small_df
.withColumn("salt_array", array([lit(i) for i in range(SALT_FACTOR)]))
.withColumn("salt", explode(col("salt_array")))
.withColumn("customer_id_salted", concat(col("customer_id"), lit("_"), col("salt")))
.drop("salt_array", "salt")
)
result = skewed_df.join(small_df_salted, "customer_id_salted")
AQE automatic skew handling (simpler when it works):
# AQE splits skewed partitions automatically
spark.conf.set("spark.sql.adaptive.skewJoin.enabled", "true")
# Works for sort-merge joins where one side is skewed
6. Caching Strategy
Cache DataFrames that are reused multiple times in the same job. But cache wrong and you waste precious memory.
# Cache only what's reused
customer_lookup = spark.table("prod.gold.customers").cache()
customer_lookup.count() # Trigger the cache materialization
# Multiple downstream uses — cache pays off
orders_with_customers = orders_df.join(customer_lookup, "customer_id")
returns_with_customers = returns_df.join(customer_lookup, "customer_id")
# Explicit unpersist when done
customer_lookup.unpersist()
# Use DISK_ONLY for very large DataFrames you can't afford in memory
from pyspark import StorageLevel
huge_df.persist(StorageLevel.DISK_ONLY)
# Check what's cached
spark.catalog.listTables("prod.gold")
spark.sparkContext.statusTracker().getExecutorInfos()
Cache in Delta instead for cross-job reuse:
# Write a checkpoint table instead of in-memory cache for multi-job reuse
expensive_computation.write.format("delta").mode("overwrite").saveAsTable("prod.temp.checkpoint_table")
7. Parallelism and Cluster Sizing
# Default parallelism: controls RDD partition count for raw data
spark.conf.set("spark.default.parallelism", str(num_executors * num_cores * 2))
# For reading: control parallelism via maxPartitionBytes
spark.conf.set("spark.sql.files.maxPartitionBytes", str(256 * 1024 * 1024)) # 256MB/partition
spark.conf.set("spark.sql.files.openCostInBytes", str(4 * 1024 * 1024)) # 4MB open cost
Cluster sizing rules of thumb:
| Workload Type | Node Type | Worker Count |
|---|---|---|
| ETL / batch joins | Memory-optimized | 8–32 |
| ML training | GPU or compute-optimized | 4–16 |
| Streaming (low-latency) | General purpose | 2–8 |
| SQL analytics | Serverless SQL Warehouse | Auto-scales |
8. Common Anti-Patterns
# ❌ Collect large DataFrames to driver
all_rows = df.collect() # OOM risk for large DFs
# ✅ Process in Spark, collect only aggregations
summary = df.groupBy("category").agg({"amount": "sum"}).collect()
# ❌ Nested loops with Spark actions
for id in user_ids:
user_df = df.filter(f"user_id = {id}") # Creates a new job per iteration
user_df.show()
# ✅ Filter once, process all
df.filter(col("user_id").isin(user_ids)).show()
# ❌ Using pandas on large data
pandas_df = spark_df.toPandas() # Pulls everything to driver
result = pandas_df.groupby("category").sum()
# ✅ Use Spark SQL or Pandas API on Spark
import pyspark.pandas as ps
ps_df = spark_df.to_pandas_on_spark()
result = ps_df.groupby("category").sum()
9. Photon Engine
Databricks Photon is a native vectorized query engine that replaces the JVM-based Spark engine for SQL and DataFrame operations. Enable it for a 2–8x performance boost on eligible workloads at no configuration cost:
# Enable Photon when creating a cluster (UI or CLI)
databricks clusters create --json '{
"cluster_name": "photon-etl",
"spark_version": "14.3.x-photon-scala2.12",
"node_type_id": "Standard_D8ds_v5",
"num_workers": 8
}'
Photon accelerates: scans, filters, joins, aggregations, window functions, sorting. It doesn't accelerate: Python UDFs, RDD operations, streaming.
10. Tuning Checklist
| Issue Observed | Fix |
|---|---|
| Long shuffle stages | Increase shuffle partitions, enable AQE |
| Memory spill | Increase executor memory, reduce partition size |
| Skewed tasks | Enable AQE skew join, or use salting |
| Slow joins | Broadcast small tables |
| Slow scans | Add file compaction, Z-order, bloom filters |
| Low CPU utilization | Reduce partition count, increase parallelism |
| OOM on driver | Avoid collect() on large DFs, increase driver memory |
Conclusion
Spark performance tuning is equal parts science and intuition. Start by profiling with the Spark UI, identify your bottleneck (shuffle vs memory vs skew), apply targeted fixes, and measure again. Don't change multiple things at once — you won't know what worked.
The highest-ROI changes in order: enable AQE, tune shuffle partitions, broadcast small tables, eliminate Python UDFs, and right-size your cluster.
Try Harbinger Explorer free for 7 days — track Spark job metrics, compare run durations across clusters, and get automated performance recommendations for your Databricks workloads. harbingerexplorer.com
Continue Reading
Databricks Autoloader: The Complete Guide
CI/CD Pipelines for Databricks Projects: A Production-Ready Guide
Build a robust CI/CD pipeline for your Databricks projects using GitHub Actions, Databricks Asset Bundles, and automated testing. Covers branching strategy, testing, and deployment.
Databricks Cluster Policies for Cost Control: A Practical Guide
Learn how to use Databricks cluster policies to enforce cost guardrails, standardize cluster configurations, and prevent cloud bill surprises without blocking your team's productivity.
Try Harbinger Explorer for free
Connect any API, upload files, and explore with AI — all in your browser. No credit card required.
Start Free Trial