Generate Sequential and Unique IDs in a Spark Dataframe

When the data is in one table or dataframe (in one machine), adding sequential/unique ids is pretty straight forward. What happens though when you have distributed data, split into partitions that might resides in different machines like in apache spark?
And, coming from traditional relational databases, one may be used to working with ids (auto incremented usually) for identification, ordering and use them as reference in the constraints in data. Depending on the need, we might be in a position where we can benefit from having a unique auto-increment-ids like behavior in a spark dataframe. Let’s discuss them and the catch behind using them in detail.

monotonically_increasing_id

Since Spark 1.6 there is a function called monotonically_increasing_id().
The generated ID is guaranteed to be monotonically increasing and unique, but not consecutive. It generates a new column with unique 64-bit monotonic index for each row. The current implementation puts the partition ID in the upper 31 bits, and the record number within each partition in the lower 33 bits. The assumption is that the Spark DataFrame has less than 1 billion partitions, and each partition has less than 8 billion records.

As an example, consider a Spark DataFrame with two partitions, each with 3 records. This expression would return the following IDs: 0, 1, 2, 8589934592 (1L << 33), 8589934593, 8589934594.

val dfWithUniqueId = df.withColumn("unique_id", monotonically_increasing_id())

Remember it will always generate 10 digit numeric values even if you have few records in a dataframe. Also, these ids are unique but not sequential.
So, use it in such cases where you want only unique ids with no length constraint(max ).

This is equivalent to the MONOTONICALLY_INCREASING_ID function in SQL.

zipWithIndex – RDD

Another option is to fall back to RDD (Resilient Distributed Dataset) like df.rdd.zipWithIndex(). RDD is a collection of elements partitioned across the nodes of the cluster that can be operated in parallel.

Zips the RDD with its element indices. The ordering is first based on the partition index and then the ordering of items within each partition. So the first item in the first partition gets index 0, and the last item in the last partition receives the largest index.

This is similar to Scala’s zipWithIndex but it uses Long instead of Int as the index type. This method needs to trigger a spark job when this RDD contains more than one partitions.

# use zipWithIndex to add the indexes in RDD and then toDF to get back 
to a dataframe and then you will get below like structure:
+--------+---+
| _1     | _2|
+--------+---+
| [1, 2] | 0 |
|[15, 21]| 1 |
+--------+---+

Note that some RDDs, such as those returned by groupBy(), do not guarantee order of elements in a partition. The index assigned to each element is therefore not guaranteed, and may even change if the RDD is reevaluated. If a fixed ordering is required to guarantee the same index assignments, you should sort the RDD with sortByKey() or save it to a file.

The indexes will be starting from 0 and the ordering is done by partition. Below is an example using zipWithIndex to get dataframe out of RDD where you can set the starting offset (which defaults to 1) and the index column name (defaults to “id”) using scala.

import org.apache.spark.sql.DataFrame
import org.apache.spark.sql.types.{LongType, StructField, StructType}
import org.apache.spark.sql.Row


def dfZipWithIndex(
  df: DataFrame,
  offset: Int = 1,
  colName: String = "id") : DataFrame = {
  df.sqlContext.createDataFrame(
    df.rdd.zipWithIndex.map(element =>
      Row.fromSeq(Seq(element._2 + offset) ++ element._1.toSeq)
    ),
    StructType(
      Array(StructField(colName,LongType,false)) ++ df.schema.fields
    )
  ) 
}

Keep in mind falling back to RDDs and then to dataframe can be quite expensive.

row_number()

Starting in Spark 1.5Window expressions were added to Spark. Instead of having to convert the DataFrame to an RDD, you can now use org.apache.spark.sql.expressions.row_number. Note that I found performance for the the above dfZipWithIndex to be significantly faster than the below algorithm.

It returns a sequential number starting at 1 within a window partition.

In order to use row_number(), we need to move our data into one partition. The Window in both cases (sortable and not sortable data) consists basically of all the rows we currently have so that the row_number() function can go over them and increment the row number. This can cause performance and memory issues — we can easily go OOM , depending on how much data and how much memory we have. Below is small code snippet :

import org.apache.spark.sql.expressions._

df.withColumn("row_num", row_number.over(Window.partitionBy(lit(1)).orderBy(lit(1))))

Note by using lit(1) for both the partitioning and the ordering — this makes everything to be in the same partition, and seems to preserve the original ordering of the DataFrame .

In other dialect like Hive ,order by is not the must when using window function, but it is must before Spark 2.4.5 version, else you will get the error for below query

select row_number()over() from test1

Error: org.apache.spark.sql.AnalysisException: Window function row_number() requires window to be ordered, please add ORDER BY clause. For example SELECT row_number()(value_expr) OVER (PARTITION BY window_partition ORDER BY window_ordering) from table; (state=,code=0)

order by made optional for row_number window function in Spark 2.4.5. Please refer the below link :-
https://issues.apache.org/jira/browse/SPARK-31512

I hope you have enjoyed this post and it helped you to understand in generating sequential and unique ids in a spark dataframe. Please like and share and feel free to comment if you have any suggestions or feedback.

Tagged : / / /

Spark Partitions with Coalesce and Repartition (hash, range, round robin)

repartition

The repartition method can be used to either increase or decrease the number of partitions in a DataFrame.
repartition is a full Shuffle operation, whole data is taken out from existing partitions and equally distributed into newly formed partitions.

Spark provides different flavors of repartition method:-

1. Repartition using Column Names

It will returns a new Dataset partitioned by the given partitioning columns, using spark.sql.shuffle.partitions as number of partitions else spark will create 200 partitions by default.
The resulting Dataset is hash partitioned.
This is the same operation as DISTRIBUTE BY in SQL (Hive QL).
Let’s use the following data to examine how a DataFrame can be repartitioned by a particular column.

+-----+-------+
| age | color |
+-----+-------+
|  10 | blue  |
|  13 | red   |
|  15 | blue  |
|  99 | red   |
|  67 | blue  |
+-----+-------+

We’ll start by creating the DataFrame:

val people = List(
  (10, "blue"),
  (13, "red"),
  (15, "blue"),
  (99, "red"),
  (67, "blue")
)
val peopleDf = people.toDF("age", "color")

Let’s repartition the DataFrame by the color column:

colorDf = peopleDf.repartition(col("color"))

When partitioning by a column, Spark will create a minimum of 200 partitions by default. This example will have two partitions with data and 198 empty partitions.

Partition 00091
13,red
99,red
Partition 00168
10,blue
15,blue
67,blue

The colorDf contains different partitions for each color and is optimized for extracts by color. Partitioning by a column is similar to indexing a column in a relational database.

2. Repartition using Column Names and Number of Partition

It will returns a new Dataset partitioned by the given partitioning columns into the number of partitions required.
Here,the resulting Dataset is also hash partitioned.
Here is an example.

colorDf = peopleDf.repartition(4,col("color"))

We can verify here that the repartition has created a new DataFrame with four partitions:

colorDf.rdd.partitions.size // => 4
3. Repartition using Number of Partition

It will returns a new Dataset that has exactly the given number of partitions.
The resulting Dataset is round robin partitioned.
Let’s create a colorDf from the peopleDf with four partitions.

colorDf = peopleDf.repartition(4)
bartDf.rdd.partitions.size // => 4

coalesce

The coalesce method reduces the number of partitions in a DataFrame.
Let’s first create a DataFrame of numbers to illustrate how data is partitioned:

val x = (1 to 10).toList
val numbersDf = x.toDF(“number”)

On my machine, the numbersDf is split into four partitions:

numbersDf.rdd.partitions.size // => 4

Each partition is a separate CSV file when you write a DataFrame to disc.

numbersDf.write.csv("/Users/example/sparkOutput/numbers")

Here is how the data is separated on the different partitions.

Partition A: 1, 2
Partition B: 3, 4, 5
Partition C: 6, 7
Partition D: 8, 9, 10

Now, consolidate the data in two partitions using coalesce:

val numbersDf2 = numbersDf.coalesce(2)
numbersDf2.rdd.partitions.size // => 2

numbersDf2 will be written out to disc as two csv files:

numbersDf2.write.csv("/Users/example/sparkOutput/numbers2")

The partitions in numbersDf2 have the following data:

Partition A: 1, 2, 3, 4, 5
Partition C: 6, 7, 8, 9, 10

The coalesce algorithm moved the data from Partition B to Partition A and moved the data from Partition D to Partition C. The data in Partition A and Partition C does not move with the coalesce algorithm. This algorithm is fast in certain situations because it minimizes data movement.

Increasing partitions in coalesce

You can try to increase the number of partitions with coalesce, but it won’t work!

val numbersDf3 = numbersDf.coalesce(6)
numbersDf3.rdd.partitions.size // => 4

numbersDf3 keeps four partitions even though we attempted to create 6 partitions with coalesce(6).
The coalesce algorithm changes the number of nodes by moving data from some partitions to existing partitions. This algorithm obviously cannot increase the number of partitions.

Repartition by range

Since version 2.3.0, spark has introduced two flavors of repartitionByRange methods shown below:

//Repartition by range using Column Names
def repartitionByRange(partitionExprs: Column*): Dataset[T]
//Repartition by range using Column Names and Number of Partition
def repartitionByRange(numPartitions: Int, partitionExprs: Column*): Dataset[T]

The resulting Dataset is range partitioned.

Let’s take one example to understand the concept. Suppose, we have the below Dataset answerSet.

+-----+---------+
| seq | answers |
+-----+---------+
| 1   | answer1 |
| 2   | answer2 |
| 3   | answer3 |
| 4   | answer4 |
| 5   | answer5 |
+-----+---------+

Now, consolidate the data in two partitions using repartitionByRange:

val answerSet2 = answerSet.repartitionByRange(2,col("seq"))
answerSet2.rdd.partitions.size // => 2

answerSet2 will be written out to disc as two csv files:

answerSet2.write.csv("/Users/example/sparkOutput/answerSet2")

The partitions in answerSet2 have the following data:

//Partition A
+-----+---------+
| seq | answers |
+-----+---------+
| 1   | answer1 |
| 2   | answer2 |
| 3   | answer3 |
+-----+---------+
//Partition B
+-----+---------+
| seq | answers |
+-----+---------+
| 4   | answer4 |
| 5   | answer5 |
+-----+---------+

Make sure at least one partition-by column must be specified. When no explicit sort order is specified, “ascending nulls first” is assumed. Note, the rows are not sorted in each partition of the resulting Dataset.

I hope you have enjoyed this post and it helped you to understand the re-partitioning in spark. Please like and share and feel free to comment if you have any suggestions or feedback.

Tagged : / / /

Scala String Interpolation

Introduction

String Interpolation refers to substitution of defined variables or expressions in a given String with respected values. String Interpolation allows users to embed variable references directly in processed string literals. Here’s an example:

val pi = 3.14  
println(s"value of pi = $pi") // value of pi = 3.14

And, here’s an example which does not uses any string interpolator.

val pi = 3.14  
println("value of pi = "+pi) // value of pi = 3.14

Starting in Scala 2.10.0, Scala offers a new mechanism to create strings from your data: String Interpolation.

Types of String Interpolator

Scala provides three string interpolation methods out of the box: sf and raw.

1. s Interpolator

Prepending s to any string literal allows the usage of variables directly in the string. Within the String, we can access variables, object fields, functions calls, etc. You’ve already seen an example here:

val pi = 3.14  
println(s"value of pi = $pi") // value of pi = 3.14

String interpolators can also take arbitrary expressions. For example:

println(s"1 + 1 = ${1 + 1}") // 1 + 1 = 2

Any arbitrary expression can be embedded in ${}.

For some special characters, it is necessary to escape them when embedded within a string. To represent an actual dollar sign you can double it $$, like here:

println(s"New offers starting at $$15.00")

which will print the string New offers starting at $15.00.

Note: The s that’s placed before each string literal is actually a method.

2. f Interpolator

Prepending f to any string literal allows the creation of simple formatted strings, similar to printf in other languages. When using the f interpolator, all variable references should be followed by a printf-style format string, like %d. Let’s look at an example:

val height = 1.9d
val name = "James"
println(f"$name%s is $height%2.2f meters tall")  // James is 1.90 meters tall

The f interpolator is typesafe. If you try to pass a format string that only works for integers but pass a double, the compiler will issue an error. For example:

val height: Double = 1.9d

scala> f"$height%4d"
<console>:9: error: type mismatch;
 found   : Double
 required: Int
           f"$height%4d"
              ^

The f interpolator makes use of the string format utilities available from Java. The formats allowed after the % character are outlined in the Formatter javadoc. If there is no % character after a variable definition a formatter of %s (String) is assumed.

3. raw Interpolator

The raw interpolator is similar to the s interpolator except that it performs no escaping of literals within the string. Here’s an example processed string:

scala> s"a\nb"
res0: String =
a
b

Here the s string interpolator replaced the characters \n with a return character. The raw interpolator will not do that.

scala> raw"a\nb"
res1: String = a\nb

The raw interpolator is useful when you want to avoid having expressions like \n turn into a return character.
In addition to the three default string interpolators, users can define their own.

Create your own interpolator

If you’re still asking yourself what is this s before string literal the answer is that processed string literal is a code transformation which compiler transforms into a method call s on an instance of StringContext. In other words expression like

s"x is $x"

is rewritten by compiler to

StringContext("x is ", "").s(x)

Let’s create our own string interpolator which will work as s interpolator with added some debug info to the resulting string:

import java.util.Date
import java.text.SimpleDateFormat

object Interpolation {
  implicit class LogInterpolator(val sc: StringContext) extends AnyVal {
    def log(args: Any*): String = {
      val timeFormat = new SimpleDateFormat("HH:mm:ss")
      s"[DEBUG ${timeFormat.format(new Date)}] ${sc.s(args:_*)}"
    }
  }

  val logString = "one plus one is"
  def demo = log"$logString ${1+1}"
}

In the code above implicit classes and extending AnyVal (so called Value Classes) are also new features in Scala 2.10. Since any interpolator is in fact a method of StringContext class we can easily use them in our own ones (in the example we use s method forming the resulting string to not bother with implementing it in our new interpolator). The string interpolation

log"$logString ${1+1}"

will be rewritten by compiler to

new LogInterpolator(new StringContext("", " ", "")).log(logString, 2)

which is a nice combination of new Scala 2.10 features itself.

This new technique is useful writing more readable code, safe and allows to extend and combine existing functionality.

Tagged : /

Multiline Strings in Scala

You want to create multiline strings within your Scala source code, like you can with the heredoc syntax of other languages and help in escaping quotes and other symbols.

heredoc is a way to define a multiline string, while
maintaining the original indentation & formatting.This is used
to embed snippets of code, like SQL or HTML.

A great feature of Scala strings is that you can create multiline strings by including the string inside three double-quotes:

val multiline = """First line starts
                   Second line
                   Third line ends."""

Although this works, the second and third lines in this example will end up with whitespace at the beginning of their lines. If you print the string, it looks like this:

First line starts
                   Second line
                   Third line ends

You can solve this problem in several different ways. First, you can left-justify every line after the first line of your string:

val multiline = """First line starts
Second line
Third line ends"""

A cleaner approach is to add the stripMargin method to the end of your multiline string and begin all lines after the first line with the pipe symbol |:

val multiline = """First line starts
                   |Second line
                   |Third line ends""".stripMargin

If you don’t like using the | symbol, you can use any character you like with the stripMargin method:

val multiline = """First line starts
                   #Second line
                   #Third line ends""".stripMargin(#)

All of these approaches yield the same result, a multiline string with each line of the string left justified:

First line starts
Second line
Third line ends

This results in a true multiline string, with a hidden \n character after the word “and” in the first line. To convert this multiline string into one continuous line you can add a replaceAll method after the stripMargin call, replacing all newline characters with blank spaces:

val multiline = """First line starts
                   |Second line
                   |Third line ends""".stripMargin.replaceAll("\n", " ")

This yields:

First line starts Second line Third line ends

Using triple quotes to escape characters

If you have used another programming language like Java or .NET in the past, you would be familiar with escaping quotes in a String using backslash \

val exampleJson: String = "{\"name\":\"xyz\",\"level\":\"high\",\"price\":2.50}" 
println(s"exampleJson = $exampleJson")

You will see the below output if you run your Scala application.

exampleJson = {"name":"xyz","level":"high","price":2.50}

This is great but if you have longer text to escape, it will become very hectic to escape each and every individual quote within your JSON String.
Fortunately Scala has a much better solution! To help you easily escape characters and symbols inside strings, you just need to wrap the text within triple quotes 

val exampleJson: String = """{"name":"xyz","level":"high","price":2.50}"""
println(s"exampleJson = $exampleJson")

You will see the same output if you run above code snippet.

exampleJson = {"name":"xyz","level":"high","price":2.50}
Tagged : /
%d bloggers like this: