Introduction to Spark SQL

With Spark and RDD core API, we can do almost everything with datasets. Developers define the steps of how to retrieve the data by applying functional transformations on RDDs. They are also the guys who try to optimize his code. With SparkSQL (the replacement of Shark), we can even play further with data by using relational operations: specifying what data we want to retrieve on partitioned collection of tuples schema (table, columns, data types). Furthermore, SparkSQL will do the optimizations for us.

So, what SparkSQL actually does? Transforms SQL queries into optimal RDD transformations and actions.

Figure 1. Spark SQL in the Stack

Module SparkSQL contains following sub-modules:

  • Catalyst: query parser & query optimizer, LogicalPlan representation
  • SparkSQL-core: PhysicalPlan (SparkPlan) representation, DataFrame API, Data Source API, memory representation, UDF, Code Generations, …
  • Spark-hive: Hive support on Spark

Figure 1. Spark SQL Processing Flow 1

Figure 3. SparkSQL Processing Flow in details

For those who are developers, the process flow of SparkSQL is clearly specified in the org.apache.spark.sql.QueryExecution class

// SQLContext.scala
// org.apache.spark.sql.SQLContext#QueryExecution
class QueryExecution(val logical: LogicalPlan){
 lazy val analyzed:LogicalPlan = analyzer.execute(logical)
 lazy val withCachedData: LogicalPlan = cacheManager.useCachedData(analyzed)
 lazy val optimizedPlan: LogicalPlan = optimizer.execute(withCachedData)
 lazy val sparkPlan: SparkPlan = {
 lazy val executedPlan: SparkPlan = prepareForExecution.execute(sparkPlan)
 lazy val toRdd: RDD[InternalRow] = executedPlan.execute()


Catalyst provides an execution planning framework

  • SQL parser & analyzer
  • Logical operators & general expressions
  • Logical optimizer
  • Transform operator tree Corresponding to Figure 2, Catalyst runs through following steps: Parse the query => Analyze the unresolved Logical Plan using the Catalog => Optimize the Logical Plan.

Query optimization with simple rules transform:

A simple query example:

FROM (SELECT id, name
FROM People) p

Usual Logical plan to Physical Plan

Optimization in Catalyst

Filter Push-Down’ rule example

Filter Push Down implementation

Some optimization rules in Spark-SQL

Optimization Example Rules
NullPropagation 1+null => null
count(null) = 0
ConstantFolding 1+2=>3
BooleanSimplification false AND $right => false
true AND $right => $right
true OR $right => true
false OR $right => $right
if(true, $then, $else) => $then
SimplifyFilters remove trivially filters:
Filter(true, child) => child
Filter(false, child)=> empty
CombineFilters merges two filters:
Filter($fc, Filter($nc, child)) => Filter(AND($fc, $nc), child)
PushPredicateThroughProject pushes Filter operators through project operator:
Filter('i==1', Project(i, j, child)) => Project(i, j, Filter('i==1', child))
PushPredicateThroughJoin pushes Filter operators through join operator:Filter('left.i'.att == 1, Join(left, right)) => Join(Filter('i==1', left), right)
ColumnPruning Eliminates the reading of unused columns: Join(left, right, leftSemi, "".attr == "".attr) => Join(left, Project('id, right'), leftSemi)

Increasing performance by generating byte code at run-time

  • We know that we are dealing with schema data (with rows and columns), therefore, we will have to evaluate a huge number of expressions like: summing columns for each row (eg: tableA[row_i, column_i] + tableA[row_i, column_j]), subtracting rows,…
  • The problem is the abstract implementation of Java for such expressions - generics: can reduce the number of code but disregard performance:
  • A simple call on 2 integers: a + b will cost: boxing cost, garbage collection cost and virtual functions called (to base class) cost.
  • By generating byte code at run-time, we no longer have to worry about boxing cost and virtual function called. Garbage collection is also reduced.



Leave a Comment