Distributed Execution Drills
Focus on scan, shuffle, skew, spill, and file layout.
1. A Spark SQL job is slow at a large GROUP BY user_id.
Inspect shuffle bytes, task duration distribution, spill, partition count, and top user_id frequencies.
SELECT user_id, COUNT(*) AS rows
FROM events
GROUP BY user_id
ORDER BY rows DESC
FETCH FIRST 20 ROWS ONLY;
Likely fixes include pre-aggregation, salting hot keys, or isolating skewed users.
2. A fact-to-small-dimension join shuffles both sides.
If the dimension is truly small after projection and filtering, prefer a broadcast join. In pure SQL, make the small side obvious by filtering/projecting it in a CTE.
3. A lake table has millions of tiny files.
Compaction is the fix. Tiny files increase metadata overhead, scheduler overhead, and scan inefficiency even if the logical query is simple.
4. A table is partitioned by user ID and backfills are date-based.
This is probably a bad partitioning choice. Date partitions usually align better with event-data ingestion, retention, and backfills. User ID may be better as clustering/sorting, not a physical partition.
5. A distributed query uses ORDER BY without a limit.
Global sort is expensive because it requires coordination and data movement. Ask whether the use case needs total ordering, top N, or ordering within partitions/groups.
6. A query uses a global window over all rows.
SUM(amount) OVER (ORDER BY event_ts)
This can force a global sort. Partition the window if semantics allow, or pre-aggregate to a smaller grain first.
7. A join key has many NULL values in distributed execution.
NULL/default keys can create skew or unmatched rows. Filter invalid keys, route them separately, or handle them explicitly before the join.
8. A Dask DataFrame operation unexpectedly triggers a full shuffle.
Operations like groupby on a non-index column, join on non-aligned partitions, set_index, and global sorts can trigger shuffles. Align partitions/divisions or reduce data before the operation.
9. A query reads all columns from Parquet.
Columnar formats only save I/O if you project fewer columns.
SELECT user_id, event_ts, event_name
FROM events
WHERE event_date = DATE '2026-06-01';
10. A partition filter compares date column to a string.
Type mismatch can block pruning in some engines or cause implicit casts. Use typed literals.
WHERE event_date = DATE '2026-06-01'
11. A broadcast join times out because the small side is not small.
Project fewer columns, apply filters before broadcast, update stats, or use a shuffle join. Also check whether the “small” side grew due to duplicate keys or unbounded date range.
12. A shuffle join has one reducer processing far more data.
This is skew. Confirm with key-frequency analysis, then salt hot keys, isolate hot keys, or pre-aggregate before the join.
13. Explain salting a skewed join key.
Add a salt bucket to spread a hot key across multiple partitions on the large side, and replicate or assign matching salt values on the small side. It trades extra rows/work for better parallelism.
14. A query has too many output files.
Control output partition count and target file size. Use coalesce/repartition carefully before write, and avoid partitioning output by high-cardinality columns.
15. More workers make the query slower.
Likely coordination overhead dominates: shuffles, tiny files, metadata pressure, task scheduling, or source throughput limits. More parallelism helps only when the workload has enough independent work.
16. A table is partitioned by event date but queries filter by timestamp.
Add an explicit event-date predicate when safe.
WHERE event_date = DATE '2026-06-01'
AND event_ts >= TIMESTAMP '2026-06-01 10:00:00'
AND event_ts < TIMESTAMP '2026-06-01 11:00:00'
The date predicate prunes partitions; the timestamp predicate narrows rows.
17. A query does exact distinct counts for many dashboard slices.
Consider pre-aggregated distinct grains, approximate sketches, or metric-specific materialized tables. Exact high-cardinality distinct across many dimensions is often shuffle-heavy.
18. A Spark SQL job spills during a window function.
Reduce row width, filter earlier, partition the window, pre-aggregate, increase partitions for large groups, or handle skewed partition keys separately.
19. A pipeline reads yesterday plus all history every day.
Make it incremental if semantics allow. Process impacted partitions and maintain modeled aggregates instead of recomputing full history. Keep a backfill path for corrections.
20. A distributed job spends most time in the write stage.
Check output partitioning, file count, compression, sink throughput, small files, and whether the job writes data sorted or partitioned in an expensive way.
21. Adaptive execution changes a join strategy at runtime. What does that mean?
The engine used runtime statistics to revise the physical plan, such as switching join type, coalescing shuffle partitions, or handling skew. It helps bad estimates but does not fix wrong SQL.
22. Dask known divisions matter for what kind of operation?
Index-aligned operations, range joins, and time-based slicing can be much cheaper when divisions are known. Without them, Dask may need a shuffle or full scan.
23. A query over partitioned data is slow because each partition has tiny files.
Partition pruning may work but still read too many files. Compact files within partitions and target healthy file sizes for the execution engine.
24. A join between two huge event tables explodes output rows.
This may be a semantic many-to-many problem, not just a distributed problem. Pre-aggregate, deduplicate, restrict time windows, or define one-to-one matching rules before joining.
25. Staff answer: how do you debug a slow Spark SQL query?
Read the physical plan and Spark UI. Identify scan bytes, partition pruning, shuffle stages, join strategy, skew, spill, task counts, and output file behavior. Then choose a fix tied to the dominant cost.