Author Avatar

Pradeep Mishra

0

Share post:

repartition

The repartition method can be used to either increase or decrease the number of partitions in a DataFrame.
repartition is a full Shuffle operation, whole data is taken out from existing partitions and equally distributed into newly formed partitions.

Spark provides different flavors of repartition method:-

1. Repartition using Column Names

It will returns a new Dataset partitioned by the given partitioning columns, using spark.sql.shuffle.partitions as number of partitions else spark will create 200 partitions by default.
The resulting Dataset is hash partitioned.
This is the same operation as DISTRIBUTE BY in SQL (Hive QL).
Let’s use the following data to examine how a DataFrame can be repartitioned by a particular column.

+-----+-------+
| age | color |
+-----+-------+
|  10 | blue  |
|  13 | red   |
|  15 | blue  |
|  99 | red   |
|  67 | blue  |
+-----+-------+

We’ll start by creating the DataFrame:

val people = List(
  (10, "blue"),
  (13, "red"),
  (15, "blue"),
  (99, "red"),
  (67, "blue")
)
val peopleDf = people.toDF("age", "color")

Let’s repartition the DataFrame by the color column:

colorDf = peopleDf.repartition(col("color"))

When partitioning by a column, Spark will create a minimum of 200 partitions by default. This example will have two partitions with data and 198 empty partitions.

Partition 00091
13,red
99,red
Partition 00168
10,blue
15,blue
67,blue

The colorDf contains different partitions for each color and is optimized for extracts by color. Partitioning by a column is similar to indexing a column in a relational database.

2. Repartition using Column Names and Number of Partition

It will returns a new Dataset partitioned by the given partitioning columns into the number of partitions required.
Here,the resulting Dataset is also hash partitioned.
Here is an example.

colorDf = peopleDf.repartition(4,col("color"))

We can verify here that the repartition has created a new DataFrame with four partitions:

colorDf.rdd.partitions.size // => 4
3. Repartition using Number of Partition

It will returns a new Dataset that has exactly the given number of partitions.
The resulting Dataset is round robin partitioned.
Let’s create a colorDf from the peopleDf with four partitions.

colorDf = peopleDf.repartition(4)
bartDf.rdd.partitions.size // => 4

coalesce

The coalesce method reduces the number of partitions in a DataFrame.
Let’s first create a DataFrame of numbers to illustrate how data is partitioned:

val x = (1 to 10).toList
val numbersDf = x.toDF(“number”)

On my machine, the numbersDf is split into four partitions:

numbersDf.rdd.partitions.size // => 4

Each partition is a separate CSV file when you write a DataFrame to disc.

numbersDf.write.csv("/Users/example/sparkOutput/numbers")

Here is how the data is separated on the different partitions.

Partition A: 1, 2
Partition B: 3, 4, 5
Partition C: 6, 7
Partition D: 8, 9, 10

Now, consolidate the data in two partitions using coalesce:

val numbersDf2 = numbersDf.coalesce(2)
numbersDf2.rdd.partitions.size // => 2

numbersDf2 will be written out to disc as two csv files:

numbersDf2.write.csv("/Users/example/sparkOutput/numbers2")

The partitions in numbersDf2 have the following data:

Partition A: 1, 2, 3, 4, 5
Partition C: 6, 7, 8, 9, 10

The coalesce algorithm moved the data from Partition B to Partition A and moved the data from Partition D to Partition C. The data in Partition A and Partition C does not move with the coalesce algorithm. This algorithm is fast in certain situations because it minimizes data movement.

Increasing partitions in coalesce

You can try to increase the number of partitions with coalesce, but it won’t work!

val numbersDf3 = numbersDf.coalesce(6)
numbersDf3.rdd.partitions.size // => 4

numbersDf3 keeps four partitions even though we attempted to create 6 partitions with coalesce(6).
The coalesce algorithm changes the number of nodes by moving data from some partitions to existing partitions. This algorithm obviously cannot increase the number of partitions.

Repartition by range

Since version 2.3.0, spark has introduced two flavors of repartitionByRange methods shown below:

//Repartition by range using Column Names
def repartitionByRange(partitionExprs: Column*): Dataset[T]
//Repartition by range using Column Names and Number of Partition
def repartitionByRange(numPartitions: Int, partitionExprs: Column*): Dataset[T]

The resulting Dataset is range partitioned.

Let’s take one example to understand the concept. Suppose, we have the below Dataset answerSet.

+-----+---------+
| seq | answers |
+-----+---------+
| 1   | answer1 |
| 2   | answer2 |
| 3   | answer3 |
| 4   | answer4 |
| 5   | answer5 |
+-----+---------+

Now, consolidate the data in two partitions using repartitionByRange:

val answerSet2 = answerSet.repartitionByRange(2,col("seq"))
answerSet2.rdd.partitions.size // => 2

answerSet2 will be written out to disc as two csv files:

answerSet2.write.csv("/Users/example/sparkOutput/answerSet2")

The partitions in answerSet2 have the following data:

//Partition A
+-----+---------+
| seq | answers |
+-----+---------+
| 1   | answer1 |
| 2   | answer2 |
| 3   | answer3 |
+-----+---------+
//Partition B
+-----+---------+
| seq | answers |
+-----+---------+
| 4   | answer4 |
| 5   | answer5 |
+-----+---------+

Make sure at least one partition-by column must be specified. When no explicit sort order is specified, “ascending nulls first” is assumed. Note, the rows are not sorted in each partition of the resulting Dataset.

I hope you have enjoyed this post and it helped you to understand the re-partitioning in spark. Please like and share and feel free to comment if you have any suggestions or feedback.

Scala String Interpolation
Configure Logging to File in Spring Boot in just one line

Leave a Reply