Harbinger Explorer

Back to Knowledge Hub
databricks
Published:

Spark Performance Tuning: A Practical Guide for Data Engineers

13 min read·Tags: spark, performance tuning, databricks, pyspark, optimization, data engineering

Spark Performance Tuning: A Practical Guide for Data Engineers

Spark is powerful, but untouched default configurations will kill your query performance at scale. Whether you're running a 10-node cluster or a 200-node behemoth, the same performance principles apply. This guide covers the practical tuning techniques that make the difference between a 2-hour job and a 10-minute job.


The Mental Model: Why Spark Jobs Are Slow

Before tuning, you need to understand the three main sources of Spark slowness:

  1. Shuffle — Moving data across the network between stages (caused by joins, groupBys, repartitions)
  2. Memory pressure — Spilling to disk when executors can't hold data in RAM
  3. Skew — A few partitions being 100x larger than others, making the job wait on the stragglers

Every tuning decision addresses one or more of these. Let's go through each.


1. Understanding Your Job with Spark UI

Before tuning anything, profile first. Open the Spark UI (accessible from the Databricks cluster page) and look for:

  • Stage duration: Which stages take the longest?
  • Shuffle read/write: High shuffle = potential for optimization
  • Spill: Memory spill to disk is a major slowdown indicator
  • Skew: Task duration variance within a stage (some tasks 100x slower = skew)
# Enable event logging for detailed profiling
spark.conf.set("spark.eventLog.enabled", "true")
spark.conf.set("spark.eventLog.dir", "dbfs:/spark-events")

# Check partition size distribution
df = spark.table("prod.gold.events")
df.groupBy(spark_partition_id()).count().orderBy("count", ascending=False).show(20)

2. Memory Configuration

Executor memory is split into regions. Getting this wrong causes spill.

# Executor memory allocation
spark.conf.set("spark.executor.memory", "16g")
spark.conf.set("spark.executor.memoryOverhead", "4g")  # Off-heap for native memory

# Memory fraction split: execution vs storage
spark.conf.set("spark.memory.fraction", "0.8")        # 80% of heap for Spark
spark.conf.set("spark.memory.storageFraction", "0.3") # 30% of that for caching

# Driver memory (for collect(), toPandas(), large broadcasts)
spark.conf.set("spark.driver.memory", "8g")
spark.conf.set("spark.driver.memoryOverhead", "2g")

Detecting memory spill:

# Check for spill in the SQL metrics (Spark UI > SQL tab)
# Or programmatically via the metrics system
# High "Spill (Memory)" and "Spill (Disk)" values = increase executor memory

3. Shuffle Optimization

Shuffle is the most expensive operation in Spark. Reducing shuffle data size is always the highest-ROI optimization.

Tune shuffle partitions

The default is 200 shuffle partitions — almost always wrong for your workload:

# Rule of thumb: target 128MB–256MB per partition after shuffle
# Check your shuffle output size in Spark UI → Stages → Shuffle Write
# Then: num_partitions = total_shuffle_bytes / 200_000_000

# For small datasets (< 10GB after shuffle):
spark.conf.set("spark.sql.shuffle.partitions", "50")

# For large datasets (> 100GB after shuffle):
spark.conf.set("spark.sql.shuffle.partitions", "1000")

# Or let AQE handle it automatically (recommended on Databricks):
spark.conf.set("spark.sql.adaptive.enabled", "true")
spark.conf.set("spark.sql.adaptive.coalescePartitions.enabled", "true")

Broadcast joins — eliminate shuffle for small tables

The most impactful single optimization for join-heavy jobs:

from pyspark.sql.functions import broadcast

# Manual broadcast hint
result = large_orders_df.join(
    broadcast(small_country_codes_df),
    on="country_code",
    how="left"
)

# Or configure the threshold (default: 10MB — often too conservative)
spark.conf.set("spark.sql.autoBroadcastJoinThreshold", str(100 * 1024 * 1024))  # 100MB

# Disable broadcast entirely when it causes OOM on driver
spark.conf.set("spark.sql.autoBroadcastJoinThreshold", "-1")

Reduce shuffle data size

# Filter and project BEFORE joining — push down predicates
result = (
    spark.table("prod.gold.events")
    .filter("event_date >= '2024-01-01'")  # Filter early!
    .select("user_id", "event_type", "amount")  # Only needed columns
    .join(users_df, "user_id")
)

# Avoid Python UDFs where possible — use Spark SQL built-ins
# BAD: Python UDF (serialization overhead)
from pyspark.sql.functions import udf
@udf("string")
def slow_udf(x): return x.upper()

# GOOD: Native Spark function
from pyspark.sql.functions import upper
df.withColumn("name_upper", upper("name"))

4. Adaptive Query Execution (AQE)

AQE is Spark 3.x's automatic runtime optimizer. Enable it — it handles many tuning decisions automatically:

# Enable AQE (default ON in Databricks Runtime 8+)
spark.conf.set("spark.sql.adaptive.enabled", "true")

# Automatic partition coalescing (merges small partitions after shuffle)
spark.conf.set("spark.sql.adaptive.coalescePartitions.enabled", "true")
spark.conf.set("spark.sql.adaptive.coalescePartitions.minPartitionNum", "1")
spark.conf.set("spark.sql.adaptive.advisoryPartitionSizeInBytes", "256mb")

# Automatic skew join handling
spark.conf.set("spark.sql.adaptive.skewJoin.enabled", "true")
spark.conf.set("spark.sql.adaptive.skewJoin.skewedPartitionFactor", "5")
spark.conf.set("spark.sql.adaptive.skewJoin.skewedPartitionThresholdInBytes", "256mb")

# Convert sort-merge joins to broadcast joins at runtime when possible
spark.conf.set("spark.sql.adaptive.localShuffleReader.enabled", "true")

5. Handling Data Skew

Skew happens when one key has vastly more data than others (e.g., a customer_id that accounts for 40% of your orders table). One partition processes millions of rows while the other 999 finish in seconds.

# Detect skew
from pyspark.sql.functions import spark_partition_id, count

df = spark.table("prod.gold.orders")
(df.groupBy(spark_partition_id().alias("partition_id"))
   .agg(count("*").alias("row_count"))
   .orderBy("row_count", ascending=False)
   .show(20))

# Fix 1: Salting technique for skewed joins
from pyspark.sql.functions import col, concat, lit, rand, floor, explode, array

SALT_FACTOR = 50

# Skewed side: add random salt
skewed_df = (
    spark.table("prod.gold.large_orders")
    .withColumn("salt", (rand() * SALT_FACTOR).cast("int"))
    .withColumn("customer_id_salted", concat(col("customer_id"), lit("_"), col("salt")))
)

# Small side: replicate across all salt values
small_df = spark.table("prod.gold.customers")
small_df_salted = (
    small_df
    .withColumn("salt_array", array([lit(i) for i in range(SALT_FACTOR)]))
    .withColumn("salt", explode(col("salt_array")))
    .withColumn("customer_id_salted", concat(col("customer_id"), lit("_"), col("salt")))
    .drop("salt_array", "salt")
)

result = skewed_df.join(small_df_salted, "customer_id_salted")

AQE automatic skew handling (simpler when it works):

# AQE splits skewed partitions automatically
spark.conf.set("spark.sql.adaptive.skewJoin.enabled", "true")
# Works for sort-merge joins where one side is skewed

6. Caching Strategy

Cache DataFrames that are reused multiple times in the same job. But cache wrong and you waste precious memory.

# Cache only what's reused
customer_lookup = spark.table("prod.gold.customers").cache()
customer_lookup.count()  # Trigger the cache materialization

# Multiple downstream uses — cache pays off
orders_with_customers = orders_df.join(customer_lookup, "customer_id")
returns_with_customers = returns_df.join(customer_lookup, "customer_id")

# Explicit unpersist when done
customer_lookup.unpersist()

# Use DISK_ONLY for very large DataFrames you can't afford in memory
from pyspark import StorageLevel
huge_df.persist(StorageLevel.DISK_ONLY)

# Check what's cached
spark.catalog.listTables("prod.gold")
spark.sparkContext.statusTracker().getExecutorInfos()

Cache in Delta instead for cross-job reuse:

# Write a checkpoint table instead of in-memory cache for multi-job reuse
expensive_computation.write.format("delta").mode("overwrite").saveAsTable("prod.temp.checkpoint_table")

7. Parallelism and Cluster Sizing

# Default parallelism: controls RDD partition count for raw data
spark.conf.set("spark.default.parallelism", str(num_executors * num_cores * 2))

# For reading: control parallelism via maxPartitionBytes
spark.conf.set("spark.sql.files.maxPartitionBytes", str(256 * 1024 * 1024))  # 256MB/partition
spark.conf.set("spark.sql.files.openCostInBytes", str(4 * 1024 * 1024))     # 4MB open cost

Cluster sizing rules of thumb:

Workload TypeNode TypeWorker Count
ETL / batch joinsMemory-optimized8–32
ML trainingGPU or compute-optimized4–16
Streaming (low-latency)General purpose2–8
SQL analyticsServerless SQL WarehouseAuto-scales

8. Common Anti-Patterns

# ❌ Collect large DataFrames to driver
all_rows = df.collect()  # OOM risk for large DFs

# ✅ Process in Spark, collect only aggregations
summary = df.groupBy("category").agg({"amount": "sum"}).collect()

# ❌ Nested loops with Spark actions
for id in user_ids:
    user_df = df.filter(f"user_id = {id}")  # Creates a new job per iteration
    user_df.show()

# ✅ Filter once, process all
df.filter(col("user_id").isin(user_ids)).show()

# ❌ Using pandas on large data
pandas_df = spark_df.toPandas()  # Pulls everything to driver
result = pandas_df.groupby("category").sum()

# ✅ Use Spark SQL or Pandas API on Spark
import pyspark.pandas as ps
ps_df = spark_df.to_pandas_on_spark()
result = ps_df.groupby("category").sum()

9. Photon Engine

Databricks Photon is a native vectorized query engine that replaces the JVM-based Spark engine for SQL and DataFrame operations. Enable it for a 2–8x performance boost on eligible workloads at no configuration cost:

# Enable Photon when creating a cluster (UI or CLI)
databricks clusters create --json '{
  "cluster_name": "photon-etl",
  "spark_version": "14.3.x-photon-scala2.12",
  "node_type_id": "Standard_D8ds_v5",
  "num_workers": 8
}'

Photon accelerates: scans, filters, joins, aggregations, window functions, sorting. It doesn't accelerate: Python UDFs, RDD operations, streaming.


10. Tuning Checklist

Issue ObservedFix
Long shuffle stagesIncrease shuffle partitions, enable AQE
Memory spillIncrease executor memory, reduce partition size
Skewed tasksEnable AQE skew join, or use salting
Slow joinsBroadcast small tables
Slow scansAdd file compaction, Z-order, bloom filters
Low CPU utilizationReduce partition count, increase parallelism
OOM on driverAvoid collect() on large DFs, increase driver memory

Conclusion

Spark performance tuning is equal parts science and intuition. Start by profiling with the Spark UI, identify your bottleneck (shuffle vs memory vs skew), apply targeted fixes, and measure again. Don't change multiple things at once — you won't know what worked.

The highest-ROI changes in order: enable AQE, tune shuffle partitions, broadcast small tables, eliminate Python UDFs, and right-size your cluster.


Try Harbinger Explorer free for 7 days — track Spark job metrics, compare run durations across clusters, and get automated performance recommendations for your Databricks workloads. harbingerexplorer.com


Continue Reading

Try Harbinger Explorer for free

Connect any API, upload files, and explore with AI — all in your browser. No credit card required.

Start Free Trial

Command Palette

Search for a command to run...