Understand RDD Operations: Transformations and Actions

As we’ve already known that each RDD has 2 sets of parallel operations: transformation and action. Today, let’s understand some of them.

1. Transformations

Transformation Explanation
map(f: T => U) Return a MappedRDD[U] by applying function f to each element. alt
flatmap(f: T => TraversableOnce[U]) Return a new FlatMappedRDD[U] by first applying a function to all elements and then flattening the results. alt
filter(f: T => Boolean) Return a FilteredRDD[T] having elemnts that f return true
mapPartitions(Iterator[T] => Iterator[U]) Return a new MapPartitionsRDD[U] by applying a function to each partition
sample(withReplacement, fraction, seed) Return a new PartitionwiseSampledRDD[T] which is a sampled subset
union(otherRdd[T]) Return a new UnionRDD[T] by making union with another Rdd
intersection(otherRdd[T]) Return a new RDD[T] by making intersection with another Rdd
distinct() Return a new RDD[T] containing distinct elements
groupByKey() Being called on (K,V) Rdd, return a new RDD[([K], Iterable[V])]
reduceByKey(f: (V, V) => V) Being called on (K, V) Rdd, return a new RDD[(K, V)] by aggregating values using key: reduceByKey(_+_)
sortByKey([ascending]) Being called on (K,V) Rdd where K implements Ordered, return a new RDD[(K, V)] sorted by K
join(other: RDD[(K, W)) Being called on (K,V) Rdd, return a new RDD[(K, (V, W))] by joining them
cogroup(other: RDD[(K, W)) Being called on (K,V) Rdd, return a new RDD[(K, (Iterable[V], Iterable[W]))] such that for each key k in this & other, get a tuple with the list of values for that key in this as well as other
cartesian(other: RDD[U]) Return a  new RDD[(T, U)] by applying product

2. Actions

Action Explanation
reduce(f: (T, T) => T) Return T by reducing the elements using specified commutative and associative binary operator
collect() Return an Array[T] containing all elements
count() Return the number of elements
first() Return the first element
take(num) Return an Array[T] taking first num elements
takeSample(withReplacement, fraction, seed) Return an Array[T] which is a sampled subset
takeOrdered(num)(order) Return an Array[T] having num smallest or biggest (depend on order) elements
saveAsTextFile(fileName)
saveAsSequenceFile(fileName)</br>saveAsObjectFile(fileName)
Save (serialized) Rdd
countByValue() Return a Map[T, Long] having the count of each unique value
countByKey() Return a Map[K, Long] counting the number of elements for each key
foreach(f: T=>Unit) Apply function f to each element

3. Transformation & lazy evaluation will bring us more chance of optimizing our job

The figure below gives a quick overview of the flow of a spark job:

alt Flow of a Spark job1

Suppose we are running a simple word count job:

/* SimpleApp.scala */
val logFile = "YOUR_SPARK_HOME/README.md"
val conf = new SparkConf().setAppName("Simple Application")
val sc = new SparkContext(conf)
val logData = sc.textFile(logFile, 2).cache()
val numAs = logData.filter(line => line.contains("a")).count()
println("Lines with a: %s".format(numAs))

Starting by creating a Rdd object by using SparkContext, then we transform it with the filter transformation and finally call action count. When an action is called on rdd, the SparkContext will submit a job to the DAGScheduler - where the very first optimizations happen. The DAGSchedule receives target Rdds, functions to run on each partition (pipe the transformations, action), and a listener for results. It will:

  • Build Stages of Task objects (code + preferred location)
  • Submit them to TaskScheduler as ready
  • Resubmit failed Stages if outputs are lost The TaskScheduler is responsible for launching tasks at Executors in our cluster, re-launch failed tasks several times, return the result to DAGScheduler. We can now quickly summarize:
  • We submit a jar application which contains jobs
  • The job gets submitted to DAGScheduler via SparkContext will be split into Stages. The DAGScheduler schedules the running order of these stages.
  • A Stage contains a set of Tasks to run on Executors. The TaskScheduler schedules the run of tasks.

4. RDD Dependency types and the optimization at DAGScheduler

alt

  • Narrow dependency: each partition of the parent RDD is used by at most one partition of the child RDD. This means the task can be executed locally and we don’t have to shuffle. (Eg: map, flatMap, filter, sample)
  • Wide dependency: multiple child partitions may depend on one partition of the parent RDD. This means we have to shuffle data unless the parents are hash-partitioned (Eg: sortByKey, reduceByKey, groupByKey, cogroupByKey, join, cartesian) Thanks to the lazy evaluation technique, the Scheduler will be able to optimize the stages before submitting the job: pipelines narrow operations within a stage, picks join algorithms based on partitioning (try to minimize shuffles), reuses previously cached data.

alt
Example of optimized stages in a job1

5.Further readings

  1. Introduction to AmpLab Spark Internals presentation-Matei Zaharia, https://www.youtube.com/watch?v=49Hr5xZyTEA  2

Leave a Comment