Author Avatar

Pradeep Mishra

0

Share post:

When the data is in one table or dataframe (in one machine), adding sequential/unique ids is pretty straight forward. What happens though when you have distributed data, split into partitions that might resides in different machines like in apache spark?
And, coming from traditional relational databases, one may be used to working with ids (auto incremented usually) for identification, ordering and use them as reference in the constraints in data. Depending on the need, we might be in a position where we can benefit from having a unique auto-increment-ids like behavior in a spark dataframe. Let’s discuss them and the catch behind using them in detail.

monotonically_increasing_id

Since Spark 1.6 there is a function called monotonically_increasing_id().
The generated ID is guaranteed to be monotonically increasing and unique, but not consecutive. It generates a new column with unique 64-bit monotonic index for each row. The current implementation puts the partition ID in the upper 31 bits, and the record number within each partition in the lower 33 bits. The assumption is that the Spark DataFrame has less than 1 billion partitions, and each partition has less than 8 billion records.

As an example, consider a Spark DataFrame with two partitions, each with 3 records. This expression would return the following IDs: 0, 1, 2, 8589934592 (1L << 33), 8589934593, 8589934594.

val dfWithUniqueId = df.withColumn("unique_id", monotonically_increasing_id())

Remember it will always generate 10 digit numeric values even if you have few records in a dataframe. Also, these ids are unique but not sequential.
So, use it in such cases where you want only unique ids with no length constraint(max ).

This is equivalent to the MONOTONICALLY_INCREASING_ID function in SQL.

zipWithIndex – RDD

Another option is to fall back to RDD (Resilient Distributed Dataset) like df.rdd.zipWithIndex(). RDD is a collection of elements partitioned across the nodes of the cluster that can be operated in parallel.

Zips the RDD with its element indices. The ordering is first based on the partition index and then the ordering of items within each partition. So the first item in the first partition gets index 0, and the last item in the last partition receives the largest index.

This is similar to Scala’s zipWithIndex but it uses Long instead of Int as the index type. This method needs to trigger a spark job when this RDD contains more than one partitions.

# use zipWithIndex to add the indexes in RDD and then toDF to get back 
to a dataframe and then you will get below like structure:
+--------+---+
| _1     | _2|
+--------+---+
| [1, 2] | 0 |
|[15, 21]| 1 |
+--------+---+

Note that some RDDs, such as those returned by groupBy(), do not guarantee order of elements in a partition. The index assigned to each element is therefore not guaranteed, and may even change if the RDD is reevaluated. If a fixed ordering is required to guarantee the same index assignments, you should sort the RDD with sortByKey() or save it to a file.

The indexes will be starting from 0 and the ordering is done by partition. Below is an example using zipWithIndex to get dataframe out of RDD where you can set the starting offset (which defaults to 1) and the index column name (defaults to “id”) using scala.

import org.apache.spark.sql.DataFrame
import org.apache.spark.sql.types.{LongType, StructField, StructType}
import org.apache.spark.sql.Row


def dfZipWithIndex(
  df: DataFrame,
  offset: Int = 1,
  colName: String = "id") : DataFrame = {
  df.sqlContext.createDataFrame(
    df.rdd.zipWithIndex.map(element =>
      Row.fromSeq(Seq(element._2 + offset) ++ element._1.toSeq)
    ),
    StructType(
      Array(StructField(colName,LongType,false)) ++ df.schema.fields
    )
  ) 
}

Keep in mind falling back to RDDs and then to dataframe can be quite expensive.

row_number()

Starting in Spark 1.5Window expressions were added to Spark. Instead of having to convert the DataFrame to an RDD, you can now use org.apache.spark.sql.expressions.row_number. Note that I found performance for the the above dfZipWithIndex to be significantly faster than the below algorithm.

It returns a sequential number starting at 1 within a window partition.

In order to use row_number(), we need to move our data into one partition. The Window in both cases (sortable and not sortable data) consists basically of all the rows we currently have so that the row_number() function can go over them and increment the row number. This can cause performance and memory issues — we can easily go OOM , depending on how much data and how much memory we have. Below is small code snippet :

import org.apache.spark.sql.expressions._

df.withColumn("row_num", row_number.over(Window.partitionBy(lit(1)).orderBy(lit(1))))

Note by using lit(1) for both the partitioning and the ordering — this makes everything to be in the same partition, and seems to preserve the original ordering of the DataFrame .

In other dialect like Hive ,order by is not the must when using window function, but it is must before Spark 2.4.5 version, else you will get the error for below query

select row_number()over() from test1

Error: org.apache.spark.sql.AnalysisException: Window function row_number() requires window to be ordered, please add ORDER BY clause. For example SELECT row_number()(value_expr) OVER (PARTITION BY window_partition ORDER BY window_ordering) from table; (state=,code=0)

order by made optional for row_number window function in Spark 2.4.5. Please refer the below link :-
https://issues.apache.org/jira/browse/SPARK-31512

I hope you have enjoyed this post and it helped you to understand in generating sequential and unique ids in a spark dataframe. Please like and share and feel free to comment if you have any suggestions or feedback.

Locking in Hibernate using Java
SQOOP : Data transfer between Hadoop and RDBMS

Leave a Reply