Cartesian Product Join (a.k.a Shuffle-and-Replication Nested Loop) join works very similar to a Broadcast Nested Loop join except the dataset is not broadcasted.
Shuffle-and-Replication does not mean a “true” shuffle as in records with the same keys are sent to the same partition. Instead the entire partition of the dataset is sent over or replicated to all the partitions for a full cross or nested-loop join.
Do you like us to send you a 47 page Definitive guide on Spark join algorithms? ===>
We are setting spark.sql.autoBroadcastJoinThreshold to -1 to disable broadcast.
scala> spark.conf.get("spark.sql.join.preferSortMergeJoin") res1: String = true scala> spark.conf.get("spark.sql.autoBroadcastJoinThreshold") res2: String = -1 scala> val data1 = Seq(10, 20, 20, 30, 40, 10, 40, 20, 20, 20, 20, 50) data1: Seq[Int] = List(10, 20, 20, 30, 40, 10, 40, 20, 20, 20, 20, 50) scala> val df1 = data1.toDF("id1") df1: org.apache.spark.sql.DataFrame = [id1: int] scala> val data2 = Seq(30, 20, 40, 50) data2: Seq[Int] = List(30, 20, 40, 50) scala> val df2 = data2.toDF("id2") df2: org.apache.spark.sql.DataFrame = [id2: int]
Note here we are trying to perform a non-equi join operation.
scala> val dfJoined = df1.join(df2, $"id1" >= $"id2") dfJoined: org.apache.spark.sql.DataFrame = [id1: int, id2: int]
When we see the plan that will be executed, we can see that CartesianProduct is used.
scala> dfJoined.queryExecution.executedPlan res3: org.apache.spark.sql.execution.SparkPlan = CartesianProduct (id1#3 >= id2#8) :- LocalTableScan [id1#3] +- LocalTableScan [id2#8] scala> dfJoined.show +---+---+ |id1|id2| +---+---+ | 20| 20| | 20| 20| | 30| 30| | 30| 20| | 40| 30| | 40| 20| | 40| 40| | 40| 30| | 40| 20| | 40| 40| | 20| 20| | 20| 20| | 20| 20| | 20| 20| | 50| 30| | 50| 20| | 50| 40| | 50| 50| +---+---+
This join is executed all in one stage. Even though this join is also called Shuffle-and-Replication it does not mean a “true” shuffle as in records with the same keys are sent to the same partition. Instead the entire partition of the dataset is sent over or replicated to all the partitions for a full cross or nested-loop join.
Interested in learning about Shuffle Sort Merge join in Spark? – Click here.
2 Comments
[…] post How does Cartesian Product Join work in Spark? appeared first on Hadoop In Real […]
[…] How does Cartesian Product Join work in Spark? […]