How joins work


In Apache Spark, choosing the right join strategy is the difference between a job that finishes in seconds and one that crashes with an OutOfMemory error. Spark evaluates the size of your DataFrames and the cluster resources to decide how to move data between the Master (Driver) and the Executors.

Here are the primary join types and how they operate across your partitions.


Broadcast Hash Join (BHJ)

This is the "gold standard" for performance. It’s used when one table is small enough to fit into the memory of a single executor (default limit is 10MB, though often increased).

  • The Process: The Driver collects the small table from the executors and sends (broadcasts) a full copy of it to every executor in the cluster.

  • Cluster Relation: The Master node orchestrates the broadcast; Executors perform a local hash join.

  • Partitions: The large table remains in its original partitions. No data from the large table moves across the network (no shuffle).

spinner

Shuffle Hash Join

Used when tables are too large to broadcast but can still fit in the memory of a combined partition.

  • The Process: Spark hashes the join keys of both tables and redistributes the data. Rows with the same hash key from both tables end up on the same executor.

  • Cluster Relation: Requires a "Shuffle," which is a heavy network I/O operation where executors exchange data.

  • Partitions: New partitions are created based on the hash of the join key.

spinner

Sort-Merge Join (SMJ)

This is the default join strategy for large-scale data in Spark (as of Spark 2.3+). It is robust and handles tables of any size.

  • The Process:

    • Step 1: Shuffle: Data is redistributed so that the same keys are on the same node.

    • Step 2: Sort: Within each partition, the data is sorted by the join key.

    • Step 3: Merge: Spark iterates through both sorted lists and matches them (very efficient memory-wise).

  • Cluster Relation: Heavy network usage during shuffle, followed by high CPU usage for sorting on executors.

  • Partitions: Highly dependent on spark.sql.shuffle.partitions (default is 200).

spinner

Broadcast Nested Loop Join (BNLJ)

This is the "fallback" join, usually triggered when you are performing a join without an equality condition (e.g., tableA.id > tableB.id).

  • The Process: It broadcasts one table and then performs a nested loop (checks every row against every other row).

  • Performance Note: It is very slow—essentially O(n×m)O(n \times m). Avoid this if possible by adding an equality constraint.

spinner

Cartesian Product Join (CPJ)

This is the "heavyweight" version of a nested loop join. While a Broadcast Nested Loop Join (BNLJ) sends a small table to every executor, a Cartesian Product Join is used when neither table is small enough to broadcast.

  • The Process: Spark creates a CartesianRDD. It pairs every partition of Table A with every partition of Table B.

  • Cluster Relation: This creates a massive number of tasks. If Table A has 100 partitions and Table B has 100 partitions, Spark must manage 100×100=10,000100 \times 100 = 10,000 partition interactions.

  • Data Partitions: Each partition in the resulting DataFrame is the product of two input partitions.

spinner

Summary Table

Join Type

Best Used When...

Network Cost

Memory Risk

Broadcast Hash

One table is very small (< 10-100MB).

Low: One-way broadcast from Master to all Executors.

Low: Risk only if the "small" table exceeds Executor RAM.

Shuffle Hash

Tables are medium; join keys are evenly distributed.

High: Full shuffle across the network.

Medium: The hash table of a partition must fit in memory.

Sort-Merge

Both tables are large (Default strategy).

High: Full shuffle of both datasets.

Low: Very robust; sorts on disk if memory is tight.

Broadcast Nested Loop

Non-equi joins (e.g., >) + one side is small.

Low: Broadcasts the small table once.

High: Computationally expensive (O(n×m)O(n \times m)).

Cartesian Product

Non-equi joins + neither table can be broadcasted.

Extreme: Every partition must talk to every other partition.

Critical: High risk of cluster-wide crashes or "hanging" jobs.

Pro Tip: If you know your table is small but Spark isn't broadcasting it, you can force it using the hint: df1.join(broadcast(df2), "id").


"Hierarchy of Efficiency"

When you run a query, Spark tries to pick the best strategy in this order:

  1. BHJ (Broadcast Hash): Can I fit one side in memory? (Fastest)

  2. SMJ (Sort-Merge): Is this an equality join? (Most reliable for Big Data)

  3. SHJ (Shuffle Hash): Is the data distributed well enough to hash without sorting?

  4. BNLJ (Broadcast Nested Loop): No equality join, but one side is small?

  5. CPJ (Cartesian Product): No equality join and both sides are huge? (Slowest / Avoid if possible)


Last updated