The data within an RDD is split into several partitions.
Properties of partitions:
– Partitions never span multiple machines, i.e., tuples in the same partition are guaranteed to be on the same machine.
– Each machine in the cluster contains one or more partitions.
– The number of partitions to use is configurable. By default, it equals the total number of cores on all executor nodes.
Two kinds of partitioning available in Spark:
– Hash partitioning
– Range partitioning
Customizing a partitioning is only possible on Pair RDDs.
Hash partitioning-
Given a Pair RDD that should be grouped:
val purchasesPerCust = -> (p.customerId, p.price)) // Pair RDD
groupByKey first computes per tuple (k, v) its partition p:
p = k.hashCode() % numPartitions
Then, all tuples in the same partition p are sent to the machine hosting p.
Intuition: hash partitioning attempts to spread data evenly across
partitions based on the key.
Range partitioning-
Pair RDDs may contain keys that have an ordering defined.
Examples: Int, Char, String,..
For such RDDs, range partitioning may be more efficient.
Using a range partitioner, keys are partitioned according to:
1. an ordering for keys
2. a set of sorted ranges of keys
Property: tuples with keys in the same range appear on the same machine.
Hash Partitioning: Example
Consider a Pair RDD, with keys [8, 96, 240, 400, 401, 800], and the desired number of partitions of 4.
Furthermore, suppose that hashCode() is the identity (n.hash€ode() ==n).
In this case, hash partitioning distributes the keys as follows among the partitions:
– partition 0: [8, 96, 240, 400, 800]
– partition 1: [401]
– partition 2: []
– partition 3: []
The result is a very unbalanced distribution which hurts performance.
Range Partitioning: Example
Using range partitioning the distribution can be improved significantly:
Assumptions: (a) keys non-negative, (b) 800 is biggest key in the RDD.
Set of ranges: [1, 200], [201, 400], [401, 600], [601, 800]
In this case, range partitioning distributes the keys as follows among the partitions:
-partition 0: [8, 96]
-partition 1: [240, 400]
-partition 2: [401]
-partition 3: [800]
The resulting partitioning is much more balanced.
Partitioning Data: partitionBy
Invoking partitionBy creates an RDD with a specified partitioner.
val pairs = => (p.customerId, p.price))
val tunedPartitioner = new RangePartitioner(8, pairs)
val partitioned = pairs.partitionBy(tunedPartitioner).persist()
Creating a RangePartitioner requires:
1. Specifying the desired number of partitions.
2. Providing a Pair RDD with ordered keys. This RDD is sampled to create a suitable set of sorted ranges.
Important: the result of partitionBy should be persisted. Otherwise, the partitioning is repeatedly applied (involved shuffling!) each time the partitioned RDD is used.