Spark SQL Internals

This is the continuing post to my previous article Introduction to SparkSQL, intending to understand SparkSQL on a deeper level.

alt
Figure 1. SparkSQL execution flow

Figure 1 reminds us about the execution flow of SparkSQL: from the user’s query to the final RDD to be executed in cluster. To understand how this flow is implemented, let’s have a closer look at the SQLContext class - the main entry point of any SparkSQL application.

alt
Figure 2. SQLContext and its components

SQLContext uses services of: Parsers, Cache Manager, Analyzer, Optimizer, …

  • DDLParser: a parser for foreign DDL commands, eg: CREATE TABLE
  • SparkSQLParser: parse a sql string to a LogicalPlan
  • CacheManager (org.apache.spark.sql.execution.CacheManager): provides the implementation for caching query results and automatically use these cached results for the later queries. Data is cached using byte buffers stored in an InMemoryRelation.
  • Analyzer: provides a Logical Plan analyzer which resolves the UnresolvedAttributes (eg column names) and UnresolvedRelations (eg json data source, text file data source,… ) into fully typed objects using information in a schema (Catalog). Analyzer inherits the implementations of RuleExecutor which runs a set of batches containing multiple rules to continuously transform the Logical Plan tree.
  • Optimizer: same as Analyzer, Optimizer also applies set of rules to the Logical Plan tree to optimize the Logical Plan, as explained in my previous post.
  • QueryExecution: defines the whole workflow of executing relational queries in SparkSQL - Figure 1
// SQLContext.scala
// org.apache.spark.sql.SQLContext#QueryExecution
class QueryExecution(val logical: LogicalPlan){
 lazy val analyzed:LogicalPlan = analyzer.execute(logical)
 lazy val withCachedData: LogicalPlan = cacheManager.useCachedData(analyzed)
 lazy val optimizedPlan: LogicalPlan = optimizer.execute(withCachedData)
 lazy val sparkPlan: SparkPlan = {
       SparkPlan.currentContext.set(self)
       planner.plan(optimizedPlan).next()
     }
 lazy val executedPlan: SparkPlan = prepareForExecution.execute(sparkPlan)
 lazy val toRdd: RDD[InternalRow] = executedPlan.execute()
}

I think it’s better to hold on just a little bit by going through a concrete example. Assuming we will query on a simple json file: people.json where each row contains 3 columns: (name, age, department)

{"name": "Tiab", "age": 39, "department":1}
{"name": "Echsam", "age": 24, "department":2}
{"name": "Pebure", "age": 30, "department":1}
{"name": "Vorosan", "age": 28, "department":1}
{"name": "Tydar", "age": 32, "department":1}
{"name": "Inahoncha", "age": 32, "department":2}
{"name": "Tydar", "age": 29, "department":3}
{"name": "Vorchat", "age": 23, "department":2}
{"name": "Ackp", "age": 34, "department":2}
{"name": "Pebure", "age": 31, "department":2}

We write a simple application to query the data:

import org.apache.spark.sql.SQLContext
import org.apache.spark.{SparkContext, SparkConf}
object Test {
  def main(args: Array[String]) {
    val conf = new SparkConf().setAppName("Simple Application")
    conf.setMaster("local[1]")

    val sc = new SparkContext(conf)
    val sqlc = new SQLContext(sc)
    val logFile = "people.json"
    val inputDF = sqlc.read.json(logFile)

    val query = inputDF.select(inputDF("age"), inputDF("department"))
           .filter(inputDF("age") < 31)
           .filter(inputDF("age") < 29)
           .groupBy("department")
           .count()
    
    val res = query.collect()
  }
}

We can obtain the detail of the execution flow by print the object query.queryExecution:

== Parsed Logical Plan ==
Aggregate [department#1L], [department#1L,COUNT(1) AS count#3L]
Filter (age#0L < CAST(29, LongType))
Filter (age#0L < CAST(31, LongType))
Project [age#0L,department#1L]
Relation[age#0L,department#1L,name#2] org.apache.spark.sql.json.JSONRelation@9202d634

== Analyzed Logical Plan ==
department: bigint, count: bigint
Aggregate [department#1L], [department#1L,COUNT(1) AS count#3L]
Filter (age#0L < CAST(29, LongType))
Filter (age#0L < CAST(31, LongType))
Project [age#0L,department#1L]
Relation[age#0L,department#1L,name#2] org.apache.spark.sql.json.JSONRelation@9202d634

== Optimized Logical Plan ==
Aggregate [department#1L], [department#1L,COUNT(1) AS count#3L]
Project [department#1L]
Filter ((age#0L < 31) && (age#0L < 29))
Relation[age#0L,department#1L,name#2] org.apache.spark.sql.json.JSONRelation@9202d634

== Physical Plan ==
Aggregate false, [department#1L], [department#1L,Coalesce(SUM(PartialCount#5L),0) AS count#3L]
Exchange (HashPartitioning 200)
Aggregate true, [department#1L], [department#1L,COUNT(1) AS PartialCount#5L]
Project [department#1L]
Filter ((age#0L < 31) && (age#0L < 29))
PhysicalRDD [department#1L,age#0L], MapPartitionsRDD[6] at toString at test.scala:22

alt
Parsed Logical Plan to Optimized Logical Plan

The Logical Plan is achieved by passing through the analyzer and optimizer. Rules are applied to the tree of Logical Plan. For example, the rule CombineFilters transforms 2 consecutive Filters to a single Filter with the new expression is AND(old_expression1, old_expression2); the rule ConstantFolding transforms the expressions CAST(29, LongType) and CAST(31, LongType) into Literal(29, long) and Literal(31, long); the rule PushPredicateThroughProject, ColumnPruning and ProjectCollapsing are applied to transform the ordering and the field to project of the Project plan.

alt
Figure 3. Relationship between Logical Plan, Physical Plan and TreeNode library

From Figure 3, we can see that a Logical Plan or Physical Plan is represented by a tree with nodes. Each node may have zero, one, or two children (Leaf, Unary, Binary Node respectively). All nodes in the tree are objects of subclasses of TreeNode class. They are all immutable objects.

Figure 3 also clearly indicates the relationship between the TreeNode Libary, the Logical Plan and Physical Plan. By inheriting the SparkPlan, LogicalPlan, and even the Expression, they will benefit from the rich functionalities offered by the TreeNode library: easily transform and manipulate the tree by applying simple rules to achieve different results.

alt
Figure 4. Tree library’s role

ow rules are declared and applied is clearly stated in the Databrick Developer blog, thus, in the next post, I will give examples of how to declare new rules to optimize 2 or more queries - one problem of my summer internship project.

See you in my next post - the Multi-Query Optimization in SparkSQL :)

Leave a Comment