Spark, Dask, and Distributed Execution
Distributed SQL is still SQL, but the dominant cost shifts from local CPU to network, shuffle, memory pressure, file layout, and skew.
Execution Shape
Section titled “Execution Shape”flowchart TD SQL[SQL / DataFrame expression] --> Logical[Logical plan] Logical --> Optimized[Optimized plan] Optimized --> Physical[Physical plan] Physical --> Stages[Stages] Stages --> Tasks[Tasks over partitions] Tasks --> Shuffle[Shuffle when data must be repartitioned] Shuffle --> Result[Result]
Spark SQL, Trino-like engines, Dask DataFrame, and other distributed systems differ in details, but the interview principles are similar.
The Big Costs
Section titled “The Big Costs”| Cost | Cause | Mitigation |
|---|---|---|
| scan I/O | reading too many files/columns/partitions | partition pruning, column pruning, compact files |
| shuffle | join/group/window repartitions data | pre-aggregate, broadcast small side, align partitioning |
| skew | hot keys create straggler tasks | salting, hot-key isolation, adaptive skew handling |
| spill | task memory insufficient | reduce row width, repartition, pre-aggregate, tune memory |
| tiny files | scheduler and metadata overhead | compaction, target file size |
| small tasks | too many partitions | coalesce/repartition sensibly |
| giant tasks | too few partitions | increase partition count, split skewed inputs |
SQL Pattern: Filter and Project Before Shuffle
Section titled “SQL Pattern: Filter and Project Before Shuffle”WITH base AS (
SELECT user_id, event_date, event_name
FROM events
WHERE event_date >= DATE '2026-06-01'
AND event_date < DATE '2026-07-01'
AND event_name IN ('view', 'purchase')
)
SELECT user_id, COUNT(*) AS events
FROM base
GROUP BY user_id;
The GROUP BY user_id may shuffle. Reduce rows and columns before it.
Broadcast Join
Section titled “Broadcast Join”If products is small:
SELECT
p.category,
COUNT(*) AS item_rows
FROM order_items oi
JOIN products p
ON p.product_id = oi.product_id
GROUP BY p.category;
A distributed engine may broadcast products to every worker. That avoids shuffling the large order_items table. If the small side is underestimated and too large, broadcast can fail or cause memory pressure.
Shuffle Join
Section titled “Shuffle Join”When both sides are large, data is repartitioned by join key.
SELECT *
FROM events e
JOIN predictions p
ON p.entity_id = e.user_id;
Potential issues:
- both sides scan huge ranges
- join key has skew
- row width is large
- no pre-aggregation
- join creates many-to-many explosion
Hot key example: anonymous user, default tenant, missing country, NULL, or a huge customer.
Detection:
SELECT user_id, COUNT(*) AS rows
FROM events
GROUP BY user_id
ORDER BY rows DESC
FETCH FIRST 20 ROWS ONLY;
Portable SQL may use LIMIT instead of FETCH FIRST; the intent is top keys.
Mitigations:
- filter impossible/default keys
- split hot keys into a separate query path
- salt the large side and replicate the small side
- pre-aggregate by key before join
- choose a different partition key
Partitioning
Section titled “Partitioning”Partitioning is not indexing. It is coarse pruning and task layout.
Good partition columns:
- frequently filtered
- moderate cardinality
- stable
- evenly distributed enough
- aligned with lifecycle/backfill
Bad partition columns:
- user ID with millions of tiny partitions
- boolean flags
- high-cardinality timestamps at second granularity
- columns rarely used in filters
File Formats
Section titled “File Formats”Columnar formats help analytics by reading only needed columns and storing min/max metadata. Text/CSV is expensive for repeated large scans. For lakehouse-style systems, file size and clustering often matter as much as query syntax.
Spark SQL-ish Examples
Section titled “Spark SQL-ish Examples”Use SQL to express the transformation:
CREATE TEMP VIEW monthly_revenue AS
SELECT
customer_id,
order_month,
SUM(total_amount) AS revenue
FROM orders
WHERE status = 'paid'
GROUP BY customer_id, order_month;
The staff-level part is explaining the physical impact:
WHERE status = 'paid'may not prune files unless status is clustered or metadata helps.GROUP BY customer_id, order_monthshuffles by those keys.- If month is partitioned, filtering a month prunes partitions.
- If customer distribution is skewed, one reduce task may dominate.
Dask SQL-ish Thinking
Section titled “Dask SQL-ish Thinking”Dask DataFrame often feels SQL-like:
events[events.event_date >= cutoff]
.groupby("user_id")
.event_id
.count()
SQL translation:
SELECT user_id, COUNT(event_id) AS event_count
FROM events
WHERE event_date >= DATE '2026-06-01'
GROUP BY user_id;
The same concerns apply: partitioning, shuffles, known divisions, skew, memory per worker, and avoiding operations that require global coordination unless needed.
Interview Drill
Section titled “Interview Drill”1. A Spark SQL query is slow at the final GROUP BY user_id. What do you inspect?
Check input size after filters, number of partitions, shuffle read/write bytes, task duration skew, top user_id frequencies, spill metrics, row width, and whether partial aggregation happens before shuffle. Then consider pre-aggregation, filtering bad keys, salting hot keys, or changing the aggregation grain.
2. A join of a 5 TB fact table to a 20 MB dimension is slow. What plan do you want?
Usually a broadcast/hash join with the dimension broadcast to workers, assuming 20 MB after projection/filtering and safe executor memory. Also project only needed dimension columns and filter fact partitions early.
3. Why can partitioning by user_id be terrible?
It can create huge numbers of tiny partitions or uneven partitions if user activity is skewed. It also rarely aligns with date-based backfills and retention policies. Date partitioning plus clustering by user is often more practical for event data.
4. A query gets slower after adding more workers. Why?
Possibilities: shuffle overhead dominates, too many small tasks, metadata pressure from tiny files, skew leaves one task on the critical path, broadcast overhead increases, or the source system cannot feed workers fast enough. More workers help parallelizable work, not coordination-heavy work.
5. How do you explain adaptive execution generically?
The engine uses runtime statistics to revise the physical plan, such as changing join strategy, coalescing shuffle partitions, or handling skew. It helps when compile-time estimates are wrong, but it does not fix bad semantics or pathological data layout by itself.