How does Broadcast Hash Join work in Spark? - Big Data In Real World

How does Broadcast Hash Join work in Spark?

How to merge multiple output files from MapReduce or Spark jobs to one?
January 13, 2021
How to check size of a directory in HDFS?
January 18, 2021
How to merge multiple output files from MapReduce or Spark jobs to one?
January 13, 2021
How to check size of a directory in HDFS?
January 18, 2021

Broadcast Hash Join in Spark works by broadcasting the small dataset to all the executors and once the data is broadcasted a standard hash join is performed in all the executors. Broadcast Hash Join happens in 2 phases.

Broadcast phase – small dataset is broadcasted to all executors

Hash Join phase – small dataset is hashed in all the executors and joined with the partitioned big dataset.

Broadcast Hash Join doesn’t involve a sort operation and it is one of the reasons it is the  fastest join algorithm. We will see in detail how it works with an example.

Do you like us to send you a 47 page Definitive guide on Spark join algorithms? ===>

Example

spark.sql.autoBroadcastJoinThreshold  – max size of dataframe that can be broadcasted. The default is 10 MB. Which means only datasets below 10 MB can be broadcasted.

We have 2 DataFrames df1 and df2 with one column in each – id1 and id2 respectively. We are doing a simple join on id1 and id2.

 

scala> spark.conf.get("spark.sql.autoBroadcastJoinThreshold")
res1: String = 10485760

scala> val data1 = Seq(10, 20, 20, 30, 40, 10, 40, 20, 20, 20, 20, 50)
data1: Seq[Int] = List(10, 20, 20, 30, 40, 10, 40, 20, 20, 20, 20, 50)

scala> val df1 = data1.toDF("id1")
df1: org.apache.spark.sql.DataFrame = [id1: int]

scala> val data2 = Seq(30, 20, 40, 50)
data2: Seq[Int] = List(30, 20, 40, 50)

scala> val df2 = data2.toDF("id2")
df2: org.apache.spark.sql.DataFrame = [id2: int]

 

Note that you can also use the broadcast function to specify the dataframe you like to broadcast. And the syntax would look like – df1.join(broadcast(df2), $”id1″ === $”id2″) 

scala> val dfJoined = df1.join(df2, $"id1" === $"id2")
dfJoined: org.apache.spark.sql.DataFrame = [id1: int, id2: int]

When we see the plan that will be executed, we can see that BroadcastHashJoin is used.

scala> dfJoined.queryExecution.executedPlan

res2: org.apache.spark.sql.execution.SparkPlan =
*(1) BroadcastHashJoin [id1#3], [id2#8], Inner, BuildRight
:- LocalTableScan [id1#3]
+- BroadcastExchange HashedRelationBroadcastMode(List(cast(input[0, int, false] as bigint)))
   +- LocalTableScan [id2#8]


scala> dfJoined.show

+---+---+
|id1|id2|
+---+---+
| 20| 20|
| 20| 20|
| 30| 30|
| 40| 40|
| 40| 40|
| 20| 20|
| 20| 20|
| 20| 20|
| 20| 20|
| 50| 50|
+---+---+

Stages involved in Broadcast Hash Join

As you can see below, the entire Broadcast Hash Join is performed in a single stage. Which means no shuffle is involved.

Broadcast Hash join Spark stages

Internal workings of Broadcast Hash Join

There are 2 phases in a Broadcast Hash Join – Broadcast phase and Hash Join phase

Broadcast Phase

Let’s say the big dataset is divided into 2 partitions and this means we will have 2 separate tasks assigned to process the partitions. 

The smallest dataframe between the two dataframes will be broadcasted to the executors processing both the tasks.

20(1) means it is from DataFrame 1 and 20(2) means it is from DataFrame 2.

Broadcast Hash join Spark stage 1

Hash Join phase

  1. The small dataset which was broadcasted to the executors will be hashed by key in to buckets
  2. In the below example key 30 is assigned to bucket 1 so anytime we encounter a record with key 30 it will be assigned to bucket 1. 20 and 40 are assigned to bucket 2. 50 is assigned to bucket 3.
  3. Once the small dataset is hashed and bucketed, keys from the big dataset will be attempted to match ONLY with the respective buckets. For eg. 20(1) from big dataset will be hashed and will be mapped to bucket 2. So with Hash join, we will only attempt to match the keys for 20(1) with all keys inside bucket 2 and not on any other buckets.
  4. Steps 2 and 3 are executed on all partitions in the stage for all records in the big dataset.

As we have seen Broadcast Hash Join doesn’t involve a sort operation. 

Broadcast Hash join Spark stage1 - partition 1 and 2

When does Broadcast Hash Join work?

  • Broadcast Hash Join is the fastest join algorithm when the following criterias are met.
  • Works only for equi joins.
  • Works for all joins except for full outer joins.
  • Broadcast Hash Join works when a dataset is small enough that it can be broadcasted and hashed.

When Broadcast Hash Join doesn’t work?

  • Broadcast join doesn’t work for non-equi joins
  • Broadcast join doesn’t work for full outer joins

Broadcast Hash Join doesn’t work well if the dataset that is being broadcasted is big. 

  1. If the size of the broadcasted dataset is big, it could become a network intensive operation and cause your job execution to slow down.
  2. If the size of the broadcasted dataset is big, you would get an OutOfMemory exception when Spark builds the Hash table on the data. Because the Hash table will be kept in memory.

Interested in learning about Broadcast Nested Loop Join in Spark? – Click here.

Big Data In Real World
Big Data In Real World
We are a group of Big Data engineers who are passionate about Big Data and related Big Data technologies. We have designed, developed, deployed and maintained Big Data applications ranging from batch to real time streaming big data platforms. We have seen a wide range of real world big data problems, implemented some innovative and complex (or simple, depending on how you look at it) solutions.

4 Comments

  1. […] Interested in learning about Broadcast Hash Join in Spark? – Click here. […]

  2. […] Check this post to understand how Broad Hash Join works. […]

  3. […] Check this post to understand how Broad Hash Join works. […]

  4. […] How does Broadcast Hash Join work in Spark? […]

How does Broadcast Hash Join work in Spark?
This website uses cookies to improve your experience. By using this website you agree to our Data Protection Policy.

Hadoop In Real World is now Big Data In Real World!

X