# Understand RDD Operations: Transformations and Actions

As we’ve already known that each RDD has 2 sets of parallel operations: transformation and action. Today, let’s understand some of them.

## 1. Transformations

Transformation Explanation
`map(f: T => U)` Return a `MappedRDD[U]` by applying function f to each element. `flatmap(f: T => TraversableOnce[U])` Return a new `FlatMappedRDD[U]` by first applying a function to all elements and then flattening the results. `filter(f: T => Boolean)` Return a `FilteredRDD[T]` having elemnts that f return true
`mapPartitions(Iterator[T] => Iterator[U])` Return a new `MapPartitionsRDD[U]` by applying a function to each partition
`sample(withReplacement, fraction, seed)` Return a new `PartitionwiseSampledRDD[T]` which is a sampled subset
`union(otherRdd[T])` Return a new `UnionRDD[T]` by making union with another Rdd
`intersection(otherRdd[T])` Return a new `RDD[T]` by making intersection with another Rdd
`distinct()` Return a new `RDD[T]` containing distinct elements
`groupByKey()` Being called on (K,V) Rdd, return a new `RDD[([K], Iterable[V])]`
`reduceByKey(f: (V, V) => V)` Being called on (K, V) Rdd, return a new `RDD[(K, V)]` by aggregating values using key: `reduceByKey(_+_)`
`sortByKey([ascending])` Being called on (K,V) Rdd where K implements Ordered, return a new `RDD[(K, V)]` sorted by K
`join(other: RDD[(K, W))` Being called on (K,V) Rdd, return a new `RDD[(K, (V, W))]` by joining them
`cogroup(other: RDD[(K, W))` Being called on (K,V) Rdd, return a new `RDD[(K, (Iterable[V], Iterable[W]))]` such that for each key k in this & other, get a tuple with the list of values for that key in this as well as other
`cartesian(other: RDD[U])` Return a  new `RDD[(T, U)]` by applying product

## 2. Actions

Action Explanation
reduce(f: (T, T) => T) Return T by reducing the elements using specified commutative and associative binary operator
collect() Return an Array[T] containing all elements
count() Return the number of elements
first() Return the first element
take(num) Return an Array[T] taking first num elements
takeSample(withReplacement, fraction, seed) Return an Array[T] which is a sampled subset
takeOrdered(num)(order) Return an Array[T] having num smallest or biggest (depend on order) elements
saveAsTextFile(fileName)
saveAsSequenceFile(fileName)</br>saveAsObjectFile(fileName)
Save (serialized) Rdd
countByValue() Return a Map[T, Long] having the count of each unique value
countByKey() Return a Map[K, Long] counting the number of elements for each key
foreach(f: T=>Unit) Apply function f to each element

## 3. Transformation & lazy evaluation will bring us more chance of optimizing our job

The figure below gives a quick overview of the flow of a spark job: Flow of a Spark job1

Suppose we are running a simple word count job:

``````/* SimpleApp.scala */
val conf = new SparkConf().setAppName("Simple Application")
val sc = new SparkContext(conf)
val logData = sc.textFile(logFile, 2).cache()
val numAs = logData.filter(line => line.contains("a")).count()
println("Lines with a: %s".format(numAs))
``````

Starting by creating a Rdd object by using `SparkContext`, then we transform it with the filter transformation and finally call action count. When an action is called on rdd, the `SparkContext` will submit a job to the `DAGScheduler` - where the very first optimizations happen. The `DAGSchedule` receives target Rdds, functions to run on each partition (pipe the transformations, action), and a listener for results. It will:

• Build `Stages` of `Task` objects (code + preferred location)
• Submit them to `TaskScheduler` as ready
• Resubmit failed `Stages` if outputs are lost The `TaskScheduler` is responsible for launching tasks at `Executors` in our cluster, re-launch failed tasks several times, return the result to `DAGScheduler`. We can now quickly summarize:
• We submit a jar application which contains jobs
• The job gets submitted to `DAGScheduler` via `SparkContext` will be split into `Stages`. The `DAGScheduler` schedules the running order of these stages.
• A `Stage` contains a set of `Tasks` to run on `Executors`. The `TaskScheduler` schedules the run of tasks.

## 4. RDD Dependency types and the optimization at DAGScheduler • Narrow dependency: each partition of the parent `RDD` is used by at most one partition of the child `RDD`. This means the task can be executed locally and we don’t have to shuffle. (Eg: `map`, `flatMap`, `filter`, `sample`)
• Wide dependency: multiple child partitions may depend on one partition of the parent `RDD`. This means we have to shuffle data unless the parents are hash-partitioned (Eg: `sortByKey`, `reduceByKey`, `groupByKey`, `cogroupByKey`, `join`, `cartesian`) Thanks to the lazy evaluation technique, the `Scheduler` will be able to optimize the stages before submitting the job: pipelines narrow operations within a stage, picks join algorithms based on partitioning (try to minimize shuffles), reuses previously cached data. Example of optimized stages in a job1