🧠 Prep #20 - Understanding Spark Query Plans: How Catalyst Optimizer Works
How do you read Spark Query Plans?
Apache Spark is often praised for its ability to process massive datasets with simple, expressive code. But behind the df.filter().groupBy().count()
simplicity lies a powerful engine that turns your code into an optimized, distributed execution plan.
If you're writing Spark jobs and don't know how to read the query plan, you're flying blind.
Query plans are the source of truth for what Spark actually does, not just what your code says. Whether you’re debugging a slow job, figuring out why a join triggered a shuffle, or wondering why predicate pushdown didn’t happen, understanding the query plan is essential.
In today’s Prep, we’ll demystify the Spark query engine and its brain: the Catalyst Optimizer.
The Catalyst Optimizer: Spark's Brain
At the heart of Spark SQL lies Catalyst, a modular query optimization framework that transforms your code into efficient execution strategies. Catalyst takes your SQL queries or DataFrame transformations and rewrites them into optimized plans using a combination of rule-based and cost-based optimization.
The execution plans in Spark (or Databricks) allows you to understand how code actually gets executed across a cluster and are useful for optimising queries.
It translates operations into optimized logical and physical plans and shows what operations are going to be executed and sent to the Spark Executors.
Execution Flow
Execution Plans
The execution plan is made of logical and physical plans.
Logical Plan:
The Logical Plan is broken down into three sections:
Parsed Logical Plan (Unresolved Logical Plan): This is created once the code has been validated, and where it is unable to validate a table or column objects it flags them as
Unresolved
.Analyzed Logical Plan (Resolved Logical Plan): Using the
Catalog
which is a metadata repository of all tables and DataFrames, it validates and resolves theUnresolved
table or column objects identified in the previous plan before continuing.Optimized Logical Plan: Once everything is resolved, the plan is sent to the
Catalyst Optimizer
which applies predicates or rules to further optimize the plan. Optimize rules can consists of predicate or projection pushdown, reordering operations, conversions, and simplifying expressions.
Physical Plan:
The Physical Plan is how the logical plan that was created is going be executed on the cluster.
The
Catalyst Optimizer
generates multiple physical plans based on various strategies.Each strategy is assessed through a
Cost Model
, establishing estimates for execution time and resources utilisation.Using the
Cost Model
estimates it determines the best optimal plan/strategy and selects it as theSelected Physical Plan
which is executed on the cluster.
Generate Execution Plans
You can inspect Spark's execution plans using either the DataFrame .explain()
method or SQL's EXPLAIN
keyword.
df.explain("extended")
This returns:
Parsed Logical Plan
Analyzed Logical Plan
Optimized Logical Plan
Physical Plan
When no argument is passed, .explain()
defaults to showing just the physical plan.
In the next section, we'll begin exploring narrow and wide transformations in Spark. These operations form the building blocks of distributed execution and dictate whether or not your job will incur expensive shuffles.
We'll walk through each with real code examples and analyze their physical plans in detail.
Narrow Transformations
Let’s start with the simplest class of transformations in Spark: narrow transformations.
These operations (like filter
, select
, and withColumn
) don’t require shuffling data across the cluster. Each output partition depends only on a single input partition. As a result, they are fast, efficient, and don’t trigger a shuffle.
Here’s a real-world example:
from pyspark.sql import functions as F
df_narrow_transform = (
df_customers
.filter(F.col("city") == "boston")
.withColumn("first_name", F.split("name", " ").getItem(0))
.withColumn("last_name", F.split("name", " ").getItem(1))
.withColumn("age", F.col("age") + F.lit(5))
.select("cust_id", "first_name", "last_name", "age", "gender", "birthday")
)
df_narrow_transform.explain(True)
💡 What's Going On?
We perform the following transformations:
Filter the dataset where
city == 'boston'
Split the
name
column intofirst_name
andlast_name
Update the
age
by adding 5Select relevant columns to shape the final output
Let’s decode the physical plan.
🔬 Physical Plan Breakdown
*(1) Project [
cust_id#67,
split(name#68, , -1)[0] AS first_name,
split(name#68, , -1)[1] AS last_name,
(cast(age#69 as double) + 5.0) AS age,
gender#70,
birthday#71
]
+- *(1) Filter (isnotnull(city#73) AND (city#73 = boston))
+- *(1) ColumnarToRow
+- FileScan parquet [cust_id, name, age, gender, birthday, city]
PushedFilters: [IsNotNull(city), EqualTo(city,boston)]
Let’s break that down:
🧱 1. FileScan parquet [...]
This is the entry point into your dataset. Spark reads data directly from Parquet files.
✅ PushedFilters:
IsNotNull(city)
EqualTo(city, boston)
These are predicate pushdowns—meaning Spark will apply these filters as it reads the data, reducing I/O and improving performance.
📉 ReadSchema:
Only the necessary columns are loaded: cust_id
, name
, age
, gender
, birthday
, and city
.
🧱 2. ColumnarToRow
Spark reads Parquet files in a columnar format. But many transformations (like split()
and arithmetic) require row-based processing. This node converts batches into row format.
🧱 3. Filter (isnotnull(city) AND city = 'boston')
Spark still includes a filter node even after pushing filters down to the Parquet reader.
This is for correctness. If the data source doesn’t support pushdown or only partially applies it, Spark re-validates the filter after reading.
🧱 4. Project [...]
This is where all transformations happen:
split(name, " ")[0]
→first_name
split(name, " ")[1]
→last_name
age + 5
→age
These operations are evaluated inline, without triggering a shuffle.
🔁 Wide Transformations - Let the Shuffle Begin
Wide transformations like repartition
, coalesce
, join
, and groupBy
typically introduce shuffles—data movement across the cluster. Let’s decode each one.
1. 🔄 repartition()
df_transactions.repartition(24).explain(True)
Physical Plan:
Exchange RoundRobinPartitioning(24)
└─ FileScan parquet [...]
This explicitly reshuffles all records using round-robin partitioning. The Exchange
operator indicates a full shuffle. It’s useful for balancing partitions before a join or aggregation.
2. 🧩 coalesce()
df_transactions.coalesce(5).explain(True)
Physical Plan:
Coalesce 5
└─ FileScan parquet [...]
Unlike repartition
, coalesce()
doesn’t shuffle data. It simply collapses existing partitions to fewer ones by moving data minimally. That’s why you don’t see a partitioning scheme like RoundRobinPartitioning
here.
✅ Efficient for writing fewer output files.
3. 🔗 join()
(with broadcast disabled)
df_transactions.join(df_customers, on="cust_id", how="inner")
Physical Plan:
SortMergeJoin
├─ Exchange hashpartitioning(cust_id)
└─ Exchange hashpartitioning(cust_id)
This triggers a Sort-Merge Join because:
Broadcast is disabled
Both sides are partitioned by
cust_id
Both sides are sorted before the join
Expect significant shuffle and sort overhead if join keys aren’t skew-free. But what is hashpartitioning() and how does it work? Let’s deviate and dive into it for a bit:
hashpartitioning
In Spark?
When Spark plans a join that cannot be broadcast (e.g., a large-to-large join), it needs to align data from both sides on the join key across the cluster. This is where hashpartitioning
comes in.
👉 It means:
Spark uses a hash function on the join key to determine which partition a given row belongs to.
In our join:
df_transactions.join(df_customers, on="cust_id", how="inner")
The physical plan includes:
Exchange hashpartitioning(cust_id, 200)
This tells us:
Spark is shuffling data across the cluster.
The partitioning scheme is based on
hash(cust_id) % 200
.Both sides of the join will hash-partition their rows using the same key (
cust_id
) and the number of partitions (200) to ensure that matching keys land in the same partition.
⚙️ How Does It Work?
Let’s walk through it:
Hash Function: Spark applies a hash function to each value of the join key (
cust_id
).Modulo Operation: The result is taken modulo the number of target partitions (e.g.,
200
) →hash(cust_id) % 200
Shuffle: Rows are redistributed across the cluster to the appropriate partitions based on their hash result.
Sort & Join: Each partition then:
Sorts the data (if using sort-merge join)
Joins rows locally within the partition
🪧 Why Use It?
Hash partitioning ensures that rows with the same join key land in the same partition on both datasets. This allows Spark to execute the join locally within each partition, avoiding expensive cross-node communication during the join itself.
4. 🧮 groupBy().count()
df_transactions.groupBy("city").count().explain(True)
Physical Plan:
HashAggregate
└─ Exchange hashpartitioning(city)
└─ HashAggregate (partial)
GroupBy triggers a two-stage aggregation:
Partial aggregation happens locally
Shuffle by group key (
city
)Final aggregation post-shuffle
5. 💰 groupBy().agg(sum())
df_transactions.groupBy("city").agg(F.sum("amt"))
Physical Plan:
Same structure as count()
with sum(cast(amt as double))
Partial aggregation, then shuffle, then final aggregation
6. 🧮 groupBy().agg(countDistinct())
df_transactions.groupBy("cust_id").agg(F.countDistinct("city"))
Physical Plan:
HashAggregate(count distinct)
└─ Exchange hashpartitioning(cust_id)
└─ HashAggregate (partial)
└─ Exchange hashpartitioning(cust_id, city)
└─ HashAggregate (deduplication)
Count distinct is multi-level and expensive:
Deduplicates (cust_id, city)
Partially counts distincts
Shuffles and recomputes final count
This is heavier than regular groupBy().count()
and is a good candidate for performance tuning or approximation.
Interesting Observations on Spark’s Query Plans
Why Is a Filter Step Present Despite Predicate Pushdown?
If you've ever noticed a filter like *(1) Filter (city = 'boston')
in your physical plan, even after Spark has pushed the same filter down to the Parquet reader, you're not alone.
This happens because of the two distinct stages in Spark's Catalyst Optimizer:
Logical Planning: Spark attempts to push predicates as close to the data source as possible. This is where optimizations like predicate pushdown and projection pruning happen.
Physical Planning: Here, Spark creates the actual instructions for execution. Even if a filter was already pushed down, Spark will still include it in the physical plan.
Why this redundancy?
✅ Correctness First: Not all data sources fully support predicate pushdown. Even if Spark requests it, there's no guarantee it succeeded. So, Spark keeps the filter in the physical plan as a fail-safe.
🔒 No Assumptions: Spark never assumes that the data source has honored the pushdown.
🔁 Same Filter, Two Stages: The predicate is listed under
PushedFilters
in theFileScan
stage and then appears again in the physical filter node above it.
It’s Spark’s way of saying: “Even if Parquet didn’t filter it, I’ll do it myself to ensure accuracy.”
When Predicate Pushdown Won’t Work
Here are two common scenarios where Spark cannot push down filters.
❌ 1. Complex Data Types (e.g., Maps, Arrays, Structs)
Predicate pushdown fails when working with nested or complex types.
Example Schema:
|-- Name: string (nullable = true)
|-- properties: map<string,string> (nullable = true)
Sample Data:
|Anirudh|[eye -> black, hair -> black]|
|Shradha|[eye -> black, hair -> red] |
Query:
df.filter(df.properties.getItem("eye") == "brown").show()
Physical Plan:
*(1) Filter (properties#123[key] = value)
+- *(1) ColumnarToRow
+- FileScan parquet ...
➡️ Spark applies the filter after loading the data into memory. Parquet can’t filter on MapType
, so pushdown fails.
❌ 2. Filters With .cast()
Operations
Casting changes the datatype, which most data sources (like Parquet) cannot pre-filter on.
Query:
df.filter(F.col("age").cast("int") > 50)
Physical Plan:
*(1) Filter (isnotnull(age#69) AND (cast(age#69 as int) > 50))
+- *(1) ColumnarToRow
+- FileScan parquet ...
Here’s what happens:
The Parquet source can’t apply the
cast
filter at read time.So Spark loads all rows, then applies the filter in memory.
📌 Note: JDBC sources can push down casts if supported by the underlying SQL engine.
Key Takeaway
Predicate pushdown is powerful, but not guaranteed.
To truly know what’s happening under the hood, always check the physical plan. If you see the filter repeated after the FileScan, it’s Spark being cautious — and you should be thankful for it.
References
For video explanation (by Afaque Ahmad):