Spark

Joins in Spark SQL- Shuffle Hash, Sort Merge, BroadCast

Apache Spark SQL component comes with catalyst optimizer which smartly optimizes the jobs by re-arranging the order of transformations and by implementing some special joins according to datasets.
Spark performs these joins internally or you can force it to perform them.
It’s worthwhile to know this topic, so that it comes to rescue when optimizing the jobs according to your use case.

Shuffle Hash Join

Shuffle hash join shuffles the data based on join keys and then perform the join.

The shuffled hash join ensures that data on each partition will contain the same keys by partitioning the second dataset with the same default partitioner as the first, so that the keys with the same hash value from both datasets are in the same partition.

It follows the classic map-reduce pattern:

  • First it maps through two tables(dataframes)
  • Uses the join keys as output key
  • Shuffles both dataframes by the output key, So that rows related to same keys from both tables will be moved on to same machine.
  • In reducer phase, join the two datasets.

Shuffle hash join works well-
1. when the dataframe are distributed evenly with the keys you are used to join and
2. when dataframes has enough number of keys for parallelism.

While this approach always works, it can be more expensive than necessary because it requires a shuffle.
Only supported for equi-joins, while the join keys do not need to be sortable. Supported for all join types except full outer joins.

Shuffle hash join can be used only when spark.sql.join.preferSortMergeJoin is set to false.
By default, sort merge join is preferred over shuffle hash join.

Sort merge join

As the name suggests, Sort merge join perform the Sort operation first and then merges the datasets.

This is Spark’s default join strategy, Since Spark 2.3 the default value of spark.sql.join.preferSortMergeJoin has been changed to true.
Spark performs this join when you are joining two BIG tables, Sort Merge Joins minimize data movements in the cluster, highly scalable approach and performs better when compared to Shuffle Hash Joins.

Performs disk IO operations same like Map Reduce paradigm which makes this join scalable.

Three phases of sort Merge Join –

1. Shuffle Phase : The 2 big tables are repartitioned as per the join keys across the partitions in the cluster.
2. Sort Phase: Sort the data within each partition parallelly.
3. Merge Phase: Join the 2 Sorted and partitioned data. This is basically merging of dataset by iterating over the elements and joining the rows having the same value for the join key.

BroadCast Join

Broadcast join is famous join for joining small table(dimension table) with big table(fact table) by avoiding costly data shuffling.

The table which is less than ~10MB(default threshold value) is broadcasted across all the nodes in cluster, such that this table becomes lookup to that local node in the cluster which avoids shuffling.

It has two phases-
1. Broadcast – smaller dataset is cached across the executors in the cluster.
2. Hash Join– Where a standard hash join performed on each executor.

Below property can be used to configure the maximum size for dataset to be broadcasted.

spark.sql.autoBroadcastJoinThreshold

The default value is 10485760 (10MB)
Maximum limit is 8GB (as of Spark 2.4 – Source)

Broadcast can be implemented by using the hint like below –

fact_table = fact_table.join(broadcast(dimension_table), fact_table.col(“dimension_id”) ===dimension_table.col(“id”))

Share This Post

An Ambivert, music lover, enthusiast, artist, designer, coder, gamer, content writer. He is Professional Software Developer with hands-on experience in Spark, Kafka, Scala, Python, Hadoop, Hive, Sqoop, Pig, php, html,css. Know more about him at www.24tutorials.com/sai

Lost Password

Register

24 Tutorials