Generate Sequential and Unique IDs in a Spark Dataframe

When the data is in one table or dataframe (in one machine), adding sequential/unique ids is pretty straight forward. What happens though when you have distributed data, split into partitions that might resides in different machines like in apache spark?
And, coming from traditional relational databases, one may be used to working with ids (auto incremented usually) for identification, ordering and use them as reference in the constraints in data. Depending on the need, we might be in a position where we can benefit from having a unique auto-increment-ids like behavior in a spark dataframe. Let’s discuss them and the catch behind using them in detail.

monotonically_increasing_id

Since Spark 1.6 there is a function called monotonically_increasing_id().
The generated ID is guaranteed to be monotonically increasing and unique, but not consecutive. It generates a new column with unique 64-bit monotonic index for each row. The current implementation puts the partition ID in the upper 31 bits, and the record number within each partition in the lower 33 bits. The assumption is that the Spark DataFrame has less than 1 billion partitions, and each partition has less than 8 billion records.

As an example, consider a Spark DataFrame with two partitions, each with 3 records. This expression would return the following IDs: 0, 1, 2, 8589934592 (1L << 33), 8589934593, 8589934594.

val dfWithUniqueId = df.withColumn("unique_id", monotonically_increasing_id())

Remember it will always generate 10 digit numeric values even if you have few records in a dataframe. Also, these ids are unique but not sequential.
So, use it in such cases where you want only unique ids with no length constraint(max ).

This is equivalent to the MONOTONICALLY_INCREASING_ID function in SQL.

zipWithIndex – RDD

Another option is to fall back to RDD (Resilient Distributed Dataset) like df.rdd.zipWithIndex(). RDD is a collection of elements partitioned across the nodes of the cluster that can be operated in parallel.

Zips the RDD with its element indices. The ordering is first based on the partition index and then the ordering of items within each partition. So the first item in the first partition gets index 0, and the last item in the last partition receives the largest index.

This is similar to Scala’s zipWithIndex but it uses Long instead of Int as the index type. This method needs to trigger a spark job when this RDD contains more than one partitions.

# use zipWithIndex to add the indexes in RDD and then toDF to get back 
to a dataframe and then you will get below like structure:
+--------+---+
| _1     | _2|
+--------+---+
| [1, 2] | 0 |
|[15, 21]| 1 |
+--------+---+

Note that some RDDs, such as those returned by groupBy(), do not guarantee order of elements in a partition. The index assigned to each element is therefore not guaranteed, and may even change if the RDD is reevaluated. If a fixed ordering is required to guarantee the same index assignments, you should sort the RDD with sortByKey() or save it to a file.

The indexes will be starting from 0 and the ordering is done by partition. Below is an example using zipWithIndex to get dataframe out of RDD where you can set the starting offset (which defaults to 1) and the index column name (defaults to “id”) using scala.

import org.apache.spark.sql.DataFrame
import org.apache.spark.sql.types.{LongType, StructField, StructType}
import org.apache.spark.sql.Row


def dfZipWithIndex(
  df: DataFrame,
  offset: Int = 1,
  colName: String = "id") : DataFrame = {
  df.sqlContext.createDataFrame(
    df.rdd.zipWithIndex.map(element =>
      Row.fromSeq(Seq(element._2 + offset) ++ element._1.toSeq)
    ),
    StructType(
      Array(StructField(colName,LongType,false)) ++ df.schema.fields
    )
  ) 
}

Keep in mind falling back to RDDs and then to dataframe can be quite expensive.

row_number()

Starting in Spark 1.5Window expressions were added to Spark. Instead of having to convert the DataFrame to an RDD, you can now use org.apache.spark.sql.expressions.row_number. Note that I found performance for the the above dfZipWithIndex to be significantly faster than the below algorithm.

It returns a sequential number starting at 1 within a window partition.

In order to use row_number(), we need to move our data into one partition. The Window in both cases (sortable and not sortable data) consists basically of all the rows we currently have so that the row_number() function can go over them and increment the row number. This can cause performance and memory issues — we can easily go OOM , depending on how much data and how much memory we have. Below is small code snippet :

import org.apache.spark.sql.expressions._

df.withColumn("row_num", row_number.over(Window.partitionBy(lit(1)).orderBy(lit(1))))

Note by using lit(1) for both the partitioning and the ordering — this makes everything to be in the same partition, and seems to preserve the original ordering of the DataFrame .

In other dialect like Hive ,order by is not the must when using window function, but it is must before Spark 2.4.5 version, else you will get the error for below query

select row_number()over() from test1

Error: org.apache.spark.sql.AnalysisException: Window function row_number() requires window to be ordered, please add ORDER BY clause. For example SELECT row_number()(value_expr) OVER (PARTITION BY window_partition ORDER BY window_ordering) from table; (state=,code=0)

order by made optional for row_number window function in Spark 2.4.5. Please refer the below link :-
https://issues.apache.org/jira/browse/SPARK-31512

I hope you have enjoyed this post and it helped you to understand in generating sequential and unique ids in a spark dataframe. Please like and share and feel free to comment if you have any suggestions or feedback.

Tagged : / / /

Spark Partitions with Coalesce and Repartition (hash, range, round robin)

repartition

The repartition method can be used to either increase or decrease the number of partitions in a DataFrame.
repartition is a full Shuffle operation, whole data is taken out from existing partitions and equally distributed into newly formed partitions.

Spark provides different flavors of repartition method:-

1. Repartition using Column Names

It will returns a new Dataset partitioned by the given partitioning columns, using spark.sql.shuffle.partitions as number of partitions else spark will create 200 partitions by default.
The resulting Dataset is hash partitioned.
This is the same operation as DISTRIBUTE BY in SQL (Hive QL).
Let’s use the following data to examine how a DataFrame can be repartitioned by a particular column.

+-----+-------+
| age | color |
+-----+-------+
|  10 | blue  |
|  13 | red   |
|  15 | blue  |
|  99 | red   |
|  67 | blue  |
+-----+-------+

We’ll start by creating the DataFrame:

val people = List(
  (10, "blue"),
  (13, "red"),
  (15, "blue"),
  (99, "red"),
  (67, "blue")
)
val peopleDf = people.toDF("age", "color")

Let’s repartition the DataFrame by the color column:

colorDf = peopleDf.repartition(col("color"))

When partitioning by a column, Spark will create a minimum of 200 partitions by default. This example will have two partitions with data and 198 empty partitions.

Partition 00091
13,red
99,red
Partition 00168
10,blue
15,blue
67,blue

The colorDf contains different partitions for each color and is optimized for extracts by color. Partitioning by a column is similar to indexing a column in a relational database.

2. Repartition using Column Names and Number of Partition

It will returns a new Dataset partitioned by the given partitioning columns into the number of partitions required.
Here,the resulting Dataset is also hash partitioned.
Here is an example.

colorDf = peopleDf.repartition(4,col("color"))

We can verify here that the repartition has created a new DataFrame with four partitions:

colorDf.rdd.partitions.size // => 4
3. Repartition using Number of Partition

It will returns a new Dataset that has exactly the given number of partitions.
The resulting Dataset is round robin partitioned.
Let’s create a colorDf from the peopleDf with four partitions.

colorDf = peopleDf.repartition(4)
bartDf.rdd.partitions.size // => 4

coalesce

The coalesce method reduces the number of partitions in a DataFrame.
Let’s first create a DataFrame of numbers to illustrate how data is partitioned:

val x = (1 to 10).toList
val numbersDf = x.toDF(“number”)

On my machine, the numbersDf is split into four partitions:

numbersDf.rdd.partitions.size // => 4

Each partition is a separate CSV file when you write a DataFrame to disc.

numbersDf.write.csv("/Users/example/sparkOutput/numbers")

Here is how the data is separated on the different partitions.

Partition A: 1, 2
Partition B: 3, 4, 5
Partition C: 6, 7
Partition D: 8, 9, 10

Now, consolidate the data in two partitions using coalesce:

val numbersDf2 = numbersDf.coalesce(2)
numbersDf2.rdd.partitions.size // => 2

numbersDf2 will be written out to disc as two csv files:

numbersDf2.write.csv("/Users/example/sparkOutput/numbers2")

The partitions in numbersDf2 have the following data:

Partition A: 1, 2, 3, 4, 5
Partition C: 6, 7, 8, 9, 10

The coalesce algorithm moved the data from Partition B to Partition A and moved the data from Partition D to Partition C. The data in Partition A and Partition C does not move with the coalesce algorithm. This algorithm is fast in certain situations because it minimizes data movement.

Increasing partitions in coalesce

You can try to increase the number of partitions with coalesce, but it won’t work!

val numbersDf3 = numbersDf.coalesce(6)
numbersDf3.rdd.partitions.size // => 4

numbersDf3 keeps four partitions even though we attempted to create 6 partitions with coalesce(6).
The coalesce algorithm changes the number of nodes by moving data from some partitions to existing partitions. This algorithm obviously cannot increase the number of partitions.

Repartition by range

Since version 2.3.0, spark has introduced two flavors of repartitionByRange methods shown below:

//Repartition by range using Column Names
def repartitionByRange(partitionExprs: Column*): Dataset[T]
//Repartition by range using Column Names and Number of Partition
def repartitionByRange(numPartitions: Int, partitionExprs: Column*): Dataset[T]

The resulting Dataset is range partitioned.

Let’s take one example to understand the concept. Suppose, we have the below Dataset answerSet.

+-----+---------+
| seq | answers |
+-----+---------+
| 1   | answer1 |
| 2   | answer2 |
| 3   | answer3 |
| 4   | answer4 |
| 5   | answer5 |
+-----+---------+

Now, consolidate the data in two partitions using repartitionByRange:

val answerSet2 = answerSet.repartitionByRange(2,col("seq"))
answerSet2.rdd.partitions.size // => 2

answerSet2 will be written out to disc as two csv files:

answerSet2.write.csv("/Users/example/sparkOutput/answerSet2")

The partitions in answerSet2 have the following data:

//Partition A
+-----+---------+
| seq | answers |
+-----+---------+
| 1   | answer1 |
| 2   | answer2 |
| 3   | answer3 |
+-----+---------+
//Partition B
+-----+---------+
| seq | answers |
+-----+---------+
| 4   | answer4 |
| 5   | answer5 |
+-----+---------+

Make sure at least one partition-by column must be specified. When no explicit sort order is specified, “ascending nulls first” is assumed. Note, the rows are not sorted in each partition of the resulting Dataset.

I hope you have enjoyed this post and it helped you to understand the re-partitioning in spark. Please like and share and feel free to comment if you have any suggestions or feedback.

Tagged : / / /
%d bloggers like this: