When the data is in one table or dataframe (in one machine), adding sequential/unique ids is pretty straight forward. What happens though when you have distributed data, split into partitions that might resides in different machines like in apache spark?
And, coming from traditional relational databases, one may be used to working with ids (auto incremented usually) for identification, ordering and use them as reference in the constraints in data. Depending on the need, we might be in a position where we can benefit from having a unique auto-increment-ids like behavior in a spark dataframe. Let’s discuss them and the catch behind using them in detail.
monotonically_increasing_id
Since Spark 1.6 there is a function called monotonically_increasing_id().
The generated ID is guaranteed to be monotonically increasing and unique, but not consecutive. It generates a new column with unique 64-bit monotonic index for each row. The current implementation puts the partition ID in the upper 31 bits, and the record number within each partition in the lower 33 bits. The assumption is that the Spark DataFrame has less than 1 billion partitions, and each partition has less than 8 billion records.
As an example, consider a Spark DataFrame with two partitions, each with 3 records. This expression would return the following IDs: 0, 1, 2, 8589934592 (1L << 33), 8589934593, 8589934594.
val dfWithUniqueId = df.withColumn("unique_id", monotonically_increasing_id())
Remember it will always generate 10 digit numeric values even if you have few records in a dataframe. Also, these ids are unique but not sequential.
So, use it in such cases where you want only unique ids with no length constraint(max ).
This is equivalent to the MONOTONICALLY_INCREASING_ID function in SQL.
zipWithIndex – RDD
Another option is to fall back to RDD (Resilient Distributed Dataset) like df.rdd.zipWithIndex()
. RDD is a collection of elements partitioned across the nodes of the cluster that can be operated in parallel.
Zips the RDD with its element indices. The ordering is first based on the partition index and then the ordering of items within each partition. So the first item in the first partition gets index 0, and the last item in the last partition receives the largest index.
This is similar to Scala’s zipWithIndex but it uses Long
instead of Int
as the index type. This method needs to trigger a spark job when this RDD contains more than one partitions.
# use zipWithIndex to add the indexes in RDD and then toDF to get back
to a dataframe and then you will get below like structure:
+--------+---+
| _1 | _2|
+--------+---+
| [1, 2] | 0 |
|[15, 21]| 1 |
+--------+---+
Note that some RDDs, such as those returned by groupBy(), do not guarantee order of elements in a partition. The index assigned to each element is therefore not guaranteed, and may even change if the RDD is reevaluated. If a fixed ordering is required to guarantee the same index assignments, you should sort the RDD with sortByKey() or save it to a file.
The indexes will be starting from 0 and the ordering is done by partition. Below is an example using zipWithIndex to get dataframe out of RDD where you can set the starting offset (which defaults to 1) and the index column name (defaults to “id”) using scala.
import org.apache.spark.sql.DataFrame
import org.apache.spark.sql.types.{LongType, StructField, StructType}
import org.apache.spark.sql.Row
def dfZipWithIndex(
df: DataFrame,
offset: Int = 1,
colName: String = "id") : DataFrame = {
df.sqlContext.createDataFrame(
df.rdd.zipWithIndex.map(element =>
Row.fromSeq(Seq(element._2 + offset) ++ element._1.toSeq)
),
StructType(
Array(StructField(colName,LongType,false)) ++ df.schema.fields
)
)
}
Keep in mind falling back to RDDs and then to dataframe can be quite expensive.
row_number()
Starting in Spark 1.5, Window
expressions were added to Spark. Instead of having to convert the DataFrame
to an RDD
, you can now use org.apache.spark.sql.expressions.row_number
. Note that I found performance for the the above dfZipWithIndex
to be significantly faster than the below algorithm.
It returns a sequential number starting at 1 within a window partition.
In order to use row_number()
, we need to move our data into one partition. The Window
in both cases (sortable and not sortable data) consists basically of all the rows we currently have so that the row_number()
function can go over them and increment the row number. This can cause performance and memory issues — we can easily go OOM , depending on how much data and how much memory we have. Below is small code snippet :
import org.apache.spark.sql.expressions._
df.withColumn("row_num", row_number.over(Window.partitionBy(lit(1)).orderBy(lit(1))))
Note by using lit(1)
for both the partitioning and the ordering — this makes everything to be in the same partition, and seems to preserve the original ordering of the DataFrame
.
In other dialect like Hive ,order by is not the must when using window function, but it is must before Spark 2.4.5 version, else you will get the error for below query
select row_number()over() from test1
Error: org.apache.spark.sql.AnalysisException: Window function row_number() requires window to be ordered, please add ORDER BY clause. For example SELECT row_number()(value_expr) OVER (PARTITION BY window_partition ORDER BY window_ordering) from table; (state=,code=0)
order by made optional for row_number window function in Spark 2.4.5. Please refer the below link :-
https://issues.apache.org/jira/browse/SPARK-31512
I hope you have enjoyed this post and it helped you to understand in generating sequential and unique ids in a spark dataframe. Please like and share and feel free to comment if you have any suggestions or feedback.
Like this:
Like Loading...
Pradeep Mishra
Share post:
When the data is in one table or dataframe (in one machine), adding sequential/unique ids is pretty straight forward. What happens though when you have distributed data, split into partitions that might resides in different machines like in apache spark?
And, coming from traditional relational databases, one may be used to working with ids (auto incremented usually) for identification, ordering and use them as reference in the constraints in data. Depending on the need, we might be in a position where we can benefit from having a unique auto-increment-ids like behavior in a spark dataframe. Let’s discuss them and the catch behind using them in detail.
monotonically_increasing_id
Since Spark 1.6 there is a function called monotonically_increasing_id().
The generated ID is guaranteed to be monotonically increasing and unique, but not consecutive. It generates a new column with unique 64-bit monotonic index for each row. The current implementation puts the partition ID in the upper 31 bits, and the record number within each partition in the lower 33 bits. The assumption is that the Spark DataFrame has less than 1 billion partitions, and each partition has less than 8 billion records.
As an example, consider a Spark DataFrame with two partitions, each with 3 records. This expression would return the following IDs: 0, 1, 2, 8589934592 (1L << 33), 8589934593, 8589934594.
This is equivalent to the MONOTONICALLY_INCREASING_ID function in SQL.
zipWithIndex – RDD
Another option is to fall back to RDD (Resilient Distributed Dataset) like
df.rdd.zipWithIndex()
. RDD is a collection of elements partitioned across the nodes of the cluster that can be operated in parallel.Zips the RDD with its element indices. The ordering is first based on the partition index and then the ordering of items within each partition. So the first item in the first partition gets index 0, and the last item in the last partition receives the largest index.
This is similar to Scala’s zipWithIndex but it uses
Long
instead ofInt
as the index type. This method needs to trigger a spark job when this RDD contains more than one partitions.Note that some RDDs, such as those returned by groupBy(), do not guarantee order of elements in a partition. The index assigned to each element is therefore not guaranteed, and may even change if the RDD is reevaluated. If a fixed ordering is required to guarantee the same index assignments, you should sort the RDD with sortByKey() or save it to a file.
The indexes will be starting from 0 and the ordering is done by partition. Below is an example using zipWithIndex to get dataframe out of RDD where you can set the starting offset (which defaults to 1) and the index column name (defaults to “id”) using scala.
row_number()
Starting in Spark 1.5,
Window
expressions were added to Spark. Instead of having to convert theDataFrame
to anRDD
, you can now useorg.apache.spark.sql.expressions.row_number
. Note that I found performance for the the abovedfZipWithIndex
to be significantly faster than the below algorithm.It returns a sequential number starting at 1 within a window partition.
In order to use
row_number()
, we need to move our data into one partition. TheWindow
in both cases (sortable and not sortable data) consists basically of all the rows we currently have so that therow_number()
function can go over them and increment the row number. This can cause performance and memory issues — we can easily go OOM , depending on how much data and how much memory we have. Below is small code snippet :Note by using
lit(1)
for both the partitioning and the ordering — this makes everything to be in the same partition, and seems to preserve the original ordering of theDataFrame
.I hope you have enjoyed this post and it helped you to understand in generating sequential and unique ids in a spark dataframe. Please like and share and feel free to comment if you have any suggestions or feedback.
Share this:
Like this: