How does Shuffle Hash Join work in Spark? - Big Data In Real World

How does Shuffle Hash Join work in Spark?

How to delete a Kakfa topic?
December 30, 2020
How to convert a String to Integer in Hive?
January 4, 2021
How to delete a Kakfa topic?
December 30, 2020
How to convert a String to Integer in Hive?
January 4, 2021

Shuffle Hash Join, as the name indicates works by shuffling both datasets. So the same keys from both sides end up in the same partition or task. Once the data is shuffled, the smallest of the two will be hashed into buckets and a hash join is performed within the partition.

Shuffle Hash Join is different from Broadcast Hash Join because the entire dataset is not broadcasted instead both datasets are shuffled and then the smallest side data is hashed and bucketed and hash joined with the bigger side in all the partitions.

Do you like us to send you a 47 page Definitive guide on Spark join algorithms? ===>

Shuffle Hash Join is divided into 2 phases.

Shuffle phase – both datasets are shuffled

Hash Join phase – smaller side data is hashed and bucketed and hash joined with he bigger side in all the partitions.

Sorting is not needed with Shuffle Hash Joins inside the partitions. 

Example

spark.sql.join.preferSortMergeJoin should be set to false and spark.sql.autoBroadcastJoinThreshold should be set to lower value so Spark can choose to use Shuffle Hash Join over Sort Merge Join.

scala> spark.conf.set("spark.sql.autoBroadcastJoinThreshold", 2)
scala> spark.conf.set("spark.sql.join.preferSortMergeJoin", "false")


scala> spark.conf.get("spark.sql.join.preferSortMergeJoin")
res2: String = false

scala> spark.conf.get("spark.sql.autoBroadcastJoinThreshold")
res3: String = 2

scala> val data1 = Seq(10, 20, 20, 30, 40, 10, 40, 20, 20, 20, 20, 50)
data1: Seq[Int] = List(10, 20, 20, 30, 40, 10, 40, 20, 20, 20, 20, 50)

scala> val df1 = data1.toDF("id1")
df1: org.apache.spark.sql.DataFrame = [id1: int]

scala> val data2 = Seq(30, 20, 40, 50)
data2: Seq[Int] = List(30, 20, 40, 50)

scala> val df2 = data2.toDF("id2")
df2: org.apache.spark.sql.DataFrame = [id2: int]

scala> val dfJoined = df1.join(df2, $"id1" === $"id2")
dfJoined: org.apache.spark.sql.DataFrame = [id1: int, id2: int]

 

When we see the plan that will be executed, we can see that ShuffledHashJoin is used.

scala> dfJoined.queryExecution.executedPlan

res4: org.apache.spark.sql.execution.SparkPlan =
ShuffledHashJoin [id1#3], [id2#8], Inner, BuildRight
:- Exchange hashpartitioning(id1#3, 200)
:  +- LocalTableScan [id1#3]
+- Exchange hashpartitioning(id2#8, 200)
   +- LocalTableScan [id2#8]

scala> dfJoined.show

+---+---+
|id1|id2|
+---+---+
| 20| 20|
| 20| 20|
| 40| 40|
| 40| 40|
| 20| 20|
| 20| 20|
| 20| 20|
| 20| 20|
| 50| 50|
| 30| 30|
+---+---+

 

Stages involved in Shuffle Hash Join

As we can see below a shuffle is needed with Shuffle Hash Join. First dataset is read in Stage 0 and the second dataset is read in Stage 1. Stage 2 below represents the shuffle.

Shuffle Hash Join Spark Stages

Internal workings for Shuffle Hash Join

There are 2 phases in a Shuffle Hash Join – Shuffle phase and Hash Join phase.  

Shuffle phase

Data from both datasets are read and shuffled. After the shuffle operation, records with the same keys from both datasets will end up in the same partition after the shuffle. Note that the entire dataset is not broadcasted with this join. This means the dataset in each partition will be in a manageable size after the shuffle.

Shuffle Hash Join Stages

Hash Join

  1. After the shuffle, Spark picks one side based on the statistics and will hash the side by key in to buckets
  2. In the below example, we have 2 partitions and side 2 is picket for hashing and will be assigned to buckets. There is one bucket in partition 1 with key 20. Partition 2 has 2 buckets 20 and 40 are assigned to bucket 1 and 50 assigned to bucket 2.
  3. Keys from the big dataset will be attempted to match ONLY with the respective buckets. For eg. in partition 1 when the hash value of 101 results in anything other than bucket 1 a match will not be attempted. In partition 2, 401 will only be attempted to match with keys in bucket 1 and not with bucket 1 because 401 is hashed to bucket 2.
  4. Hash join is performed across all partitions after the shuffle.

Sorting is not needed for Shuffle Hash Joins.

Shuffle Hash Join Stage 3

 

When does Shuffle Hash Join work?

  • Faster than a sort merge join since sorting is not involved.
  • Works only for equi joins
  • Works for all join types
  • Works well when a dataset can not be broadcasted but one side of partitioned data after shuffling will be small enough for hash join.

 

When Shuffle Hash Join doesn’t work?

  • Does not works for non-equi joins
  • Does not work with data which are heavily skewed. Let’s say we are joining a sales dataset on the product key. It is possible that the dataset has a disproportionate number of records for a certain product key. Shuffle will result in sending all the records for this product key to a single partition. Hashing all the records for this product key inside a single partition will result in an Out of Memory exception. So Shuffle Hash Join will work for a balanced dataset but not for skewed dataset.
Big Data In Real World
Big Data In Real World
We are a group of Big Data engineers who are passionate about Big Data and related Big Data technologies. We have designed, developed, deployed and maintained Big Data applications ranging from batch to real time streaming big data platforms. We have seen a wide range of real world big data problems, implemented some innovative and complex (or simple, depending on how you look at it) solutions.

2 Comments

  1. […] Interested in learning about Shuffle Hash join in Spark? – Click here. […]

How does Shuffle Hash Join work in Spark?
This website uses cookies to improve your experience. By using this website you agree to our Data Protection Policy.

Hadoop In Real World is now Big Data In Real World!

X