Today, let’s get to understand what’s really happening behind the scene after we submit a Spark job to the cluster. I promise you that there will be many interesting stuffs to be discovered ;).
This is my first post on the series of understanding core components in the module spark-core - the scheduling implementation. I would like to explain this first because this component requires us to start from the beginning (job submission) to the end of the job (gets back the results after all tasks are run and finished). Therefore, we will able to:
- Have a really broad look at the whole system.
- Understand the optimization on building graph of stages which are sets of tasks for running a job.
- Understand how TaskScheduler delivers and schedules the works on cluster.
- Understand how different components interact with each others: DAGScheduler, TaskScheduler, CacheManager, BlockManager, ConnectionManager,…
Going for details
Starting with an application submitted to the Master, the user’s code will be run on a driver.
Stack calls (only if you are interested in)
What roles the DAGScheduler plays?
DAGScheduler implements stage-oriented scheduling: computes a DAG (Directed Acyclic Graph) of stages for each submitted job (lines 10-14, with the details in lines 15-27), keeps track of which RDDs and stage outputs are materialized, and find a minimal schedule to run the job . It then submits stages as
TaskScheduler (line 26) which will run them on cluster.
The scheduling algorithm also takes the preferred locations to run each task into consideration (line 20). Based on the information about the current cache status,
DAGScheduler passes this information to the
In case of failures due to the loss of shuffle output files,
DAGScheduler can resubmit the old stages. The failures within a stage that are not caused by shuffle file loss are handled by
TaskScheduler: retrying the task.
Regarding to the implementation, the
DAGScheduler uses event queue architecture to process incoming events, which is implemented by the
DAGSchedulerEventProcessLoop class (line 6). It runs in a daemon thread and handles the events which are subclass of
CompletionEvent,… as displayed in the diagram.
Tell me more detail about the flow to the DAGScheduler
When a job is submitted to DAGScheduler by the SparkContext, a JobWaiter object is created and used to block the thread and wait for job result. Just after that object is created, the JobSubmitted event is fired. [lines 3,4. Refers to 2 functions: runJob and submitJob in class DAGScheduler for more information]
The JobSubmitted event (line 5) results in the function call of handleJobSubmitted in DAGScheduler (line 6). Firstly, a newly job (ActiveJob) is created and one stage (which is the last stage of the job to obtain the results) gets submitted (line 8-9). Depending on the dependency type of Rdd (NarrowDependency or WideDependency (ShuffleDependency)), DAGScheduler can produce stages: only generates a new map stage for the ShuffleDependency (line 16), and pipes the NarrowDependencies (line 17) [refers to function getMissingParentStages in class DAGScheduler]
A Stage is a set of independent tasks all computing the same function that needs to run as part of Spark job, where all the tasks have the same shuffle dependencies. Each DAG of tasks run by the scheduler is split up into stages at the boundaries where shuffle occurs as I mentioned before, and then DAGScheduler runs these stages in topological order. Each stage can either be a shuffle map stage (stage.isShuffleMap = true), in which case its’ tasks’ results are input for another stage, or a result stage (stage.isShuffleMap = false), in which case its’ tasks directly compute the action that initiated a job (e.g. count(), save(), etc) [refers to Stage class].
Next, for each stage of the job being created and then submitted, there will be a TaskSet containing tasks (ResultTasks, ShuffleMapTasks) submitted to the TaskScheduler to run on cluster (line 26). As you can see from the line 22 and 24, a TaskSet containing multiple same tasks for each partition of a RDD is generated and submitted to the TaskScheduler.
TaskScheduler is the next part after the DAGScheduler
TaskSchedulerImpl which is the only implementation of the interface TaskScheduler: schedules tasks for a single SparkContext: sends the task sets submitted by DAGScheduler to the cluster, runs them and retry them if there are failures, mitigate stragglers. The resulting events are sent back to DAGScheduler.
One SparkContext has one DAGScheduler and one TaskSchedulerImpl.
In detail, a TaskSetManager is created to wrap the submitted TaskSet (line 28). This class provides the implementations of scheduling the tasks: keeps track of each task, retries tasks if they fail and handles locality-aware scheduling for a given TaskSet via delay scheduling (line 29, 30 and more in the TaskSetManager class). The LaunchTask message is sent to Executors whenever an offer is available (line 34-44). The implementation of messages passed between objects (actors) is simplified thanks to the Akka framework. I intend to write a post about the features of Akka framework so that we can read and understand the source code related to concurrency.
Tasks are finally run at worker machines
Tasks are then scheduled to the acquired Executors according to resources and locality constraints.
The Executors run the Tasks in multiple Threads (line 3)
As I’ve already mentioned, we have 2 kinds of Task: ResultTask and ShuffleMapTask. The ResultTask will apply the function on rdd after being computed (line 19). ShuffleMapTask will use BlockManager to store shuffle output via ShuffleWriter (line 23-24). The ShuffleRead phase, depends on the type of operation that results in type of RDD, will handle differently. The compute function in RDDs is the place for putting implementation details (line 27).
To sum up, I give here the visualized example of an optimized job that is the result of DAGScheduler and TaskScheduler. The Stages are split whenever the shuffle phases occur (Stage 1,2,3 in the figure), and piped when the dependency is narrow (rddC -> rddD -> rddF) - works as expected. For each stage, the number of tasks to run is also the number of partitions (Eg: the flow from rddC to rddF: 2 tasks will be generated because rddC has 2 partitions).
That’s all for today! If you enjoyed this post, I’d be very grateful if you’d help it spread. Thank you!