How to create a column with unique, incrementing index value in Spark? - Big Data In Real World

How to create a column with unique, incrementing index value in Spark?

How to find the number of partitions in a DataFrame?
April 20, 2022
How to make Hive recursively read files from all the sub directories?
May 11, 2022

Let’s say we have a DataFrame like below.

 +---------+-------+---------------+
 |  Project|   Name|Cost_To_Project|
 +---------+-------+---------------+
 |Ingestion|  Jerry|           1000|
 |Ingestion|   Arya|           2000|
 |Ingestion|  Emily|           3000|
 |       ML|  Riley|           9000|
 |       ML|Patrick|           1000|
 |       ML| Mickey|           8000|
 |Analytics| Donald|           1000|
 |Ingestion|   John|           1000|
 |Analytics|  Emily|           8000|
 |Analytics|   Arya|          10000|
 |       BI| Mickey|          12000|
 |       BI| Martin|           5000|
 +---------+-------+---------------+ 

We would like to add an index column with a unique incrementing value like below.

 +---------+-------+---------------+-----+
 |  Project|   Name|Cost_To_Project|index|
 +---------+-------+---------------+-----+
 |Ingestion|  Jerry|           1000|    0|
 |Ingestion|   Arya|           2000|    1|
 |Ingestion|  Emily|           3000|    2|
 |       ML|  Riley|           9000|    3|
 |       ML|Patrick|           1000|    4|
 |       ML| Mickey|           8000|    5|
 |Analytics| Donald|           1000|    6|
 |Ingestion|   John|           1000|    7|
 |Analytics|  Emily|           8000|    8|
 |Analytics|   Arya|          10000|    9|
 |       BI| Mickey|          12000|   10|
 |       BI| Martin|           5000|   11|
 +---------+-------+---------------+-----+ 

There are few options to implement this use case in Spark. Let’s see them one by one.

Option 1 – Using monotonically_increasing_id function

Spark comes with a function named monotonically_increasing_id which creates a unique incrementing number for each record in the DataFrame.

 val data = Seq(
       ("Ingestion", "Jerry", 1000), ("Ingestion", "Arya", 2000), ("Ingestion", "Emily", 3000),
       ("ML", "Riley", 9000), ("ML", "Patrick", 1000), ("ML", "Mickey", 8000),
       ("Analytics", "Donald", 1000), ("Ingestion", "John", 1000), ("Analytics", "Emily", 8000),
       ("Analytics", "Arya", 10000), ("BI", "Mickey", 12000), ("BI", "Martin", 5000))
 import spark.sqlContext.implicits._
 val df = data.toDF("Project", "Name", "Cost_To_Project")
 df.show()
 val dfIndex = df.withColumn("index", monotonically_increasing_id)
 dfIndex.show
 +---------+-------+---------------+-----+
 |  Project|   Name|Cost_To_Project|index|
 +---------+-------+---------------+-----+
 |Ingestion|  Jerry|           1000|    0|
 |Ingestion|   Arya|           2000|    1|
 |Ingestion|  Emily|           3000|    2|
 |       ML|  Riley|           9000|    3|
 |       ML|Patrick|           1000|    4|
 |       ML| Mickey|           8000|    5|
 |Analytics| Donald|           1000|    6|
 |Ingestion|   John|           1000|    7|
 |Analytics|  Emily|           8000|    8|
 |Analytics|   Arya|          10000|    9|
 |       BI| Mickey|          12000|   10|
 |       BI| Martin|           5000|   11|
 +---------+-------+---------------+-----+ 

Let’s now assume we have a DataFrame which is divided into 4 partitions. Let’s repartition the data with 4 partitions and apply monotonically_increasing_id function.

You can see in the below output that the index id is not sequential anymore. This is because  with the monotonically_increasing_id, generated ID is guaranteed to be monotonically increasing and unique, but not consecutive. The current implementation puts the partition ID in the upper 31 bits, and the record number within each partition in the lower 33 bits.

 val dfRepartitionIndex = df.repartition(4).withColumn("monotonically_increasing_id", monotonically_increasing_id)
 dfRepartitionIndex.show
 +---------+-------+---------------+---------------------------+
 |  Project|   Name|Cost_To_Project|monotonically_increasing_id|
 +---------+-------+---------------+---------------------------+
 |Analytics| Donald|           1000|                          0|
 |Analytics|   Arya|          10000|                          1|
 |Ingestion|   Arya|           2000|                          2|
 |       ML| Mickey|           8000|                 8589934592|
 |Ingestion|  Jerry|           1000|                 8589934593|
 |       ML|  Riley|           9000|                 8589934594|
 |Ingestion|  Emily|           3000|                17179869184|
 |Analytics|  Emily|           8000|                17179869185|
 |       BI| Martin|           5000|                17179869186|
 |Ingestion|   John|           1000|                25769803776|
 |       BI| Mickey|          12000|                25769803777|
 |       ML|Patrick|           1000|                25769803778|
 +---------+-------+---------------+---------------------------+ 

Option 2 – Using row_number function

Here we are using row_number window function over a global ordering of the dataframe. This will give us a sequential unique, incremental id.

 val windowSpec  = Window.orderBy("Name")
 df.withColumn("index", row_number.over(windowSpec)).show()
 +---------+-------+---------------+-----+
 |  Project|   Name|Cost_To_Project|index|
 +---------+-------+---------------+-----+
 |Ingestion|   Arya|           2000|    1|
 |Analytics|   Arya|          10000|    2|
 |Analytics| Donald|           1000|    3|
 |Ingestion|  Emily|           3000|    4|
 |Analytics|  Emily|           8000|    5|
 |Ingestion|  Jerry|           1000|    6|
 |Ingestion|   John|           1000|    7|
 |       BI| Martin|           5000|    8|
 |       ML| Mickey|           8000|    9|
 |       BI| Mickey|          12000|   10|
 |       ML|Patrick|           1000|   11|
 |       ML|  Riley|           9000|   12|
 +---------+-------+---------------+-----+ 

Option 3 – zipWithIndex function

We can convert the DataFrame to RDD and then apply the zipWithIndex function. This will result in an Array with the records in RDD as Row and then the index.

Seems like an overkill when you don’t need to use RDD and if you have to further unnest to fetch the individual columns.

 df.rdd.zipWithIndex.collect
 res39: Array[(org.apache.spark.sql.Row, Long)] = Array(([Ingestion,Jerry,1000],0), ([Ingestion,Arya,2000],1), ([Ingestion,Emily,3000],2), ([ML,Riley,9000],3), ([ML,Patrick,1000],4), ([ML,Mickey,8000],5), ([Analytics,Donald,1000],6), ([Ingestion,John,1000],7), ([Analytics,Emily,8000],8), ([Analytics,Arya,10000],9), ([BI,Mickey,12000],10), ([BI,Martin,5000],11)) 

Which option to use?

Use monotonically_increasing_id if you don’t need the index to be sequential

Use row_number if you need index to be sequential

Use zipWithIndex if you are dealing with RDD and you don’t want to convert them to DataFrame.

Big Data In Real World
Big Data In Real World
We are a group of Big Data engineers who are passionate about Big Data and related Big Data technologies. We have designed, developed, deployed and maintained Big Data applications ranging from batch to real time streaming big data platforms. We have seen a wide range of real world big data problems, implemented some innovative and complex (or simple, depending on how you look at it) solutions.

Comments are closed.

gdpr-image
This website uses cookies to improve your experience. By using this website you agree to our Data Protection Policy.

Hadoop In Real World is now Big Data In Real World!

X