Introduction to Spark SQL

With Spark and RDD core API, we can do almost everything with datasets. Developers define the steps of how to retrieve the data by applying functional transformations on RDDs. They are also the guys who try to optimize his code. With SparkSQL (the replacement of Shark), we can even play further with data by using relational operations: specifying what data we want to retrieve on partitioned collection of tuples schema (table, columns, data types). Furthermore, SparkSQL will do the optimizations for us.

So, what SparkSQL actually does? Transforms SQL queries into optimal RDD transformations and actions.

alt
Figure 1. Spark SQL in the Stack

Module SparkSQL contains following sub-modules:

  • Catalyst: query parser & query optimizer, LogicalPlan representation
  • SparkSQL-core: PhysicalPlan (SparkPlan) representation, DataFrame API, Data Source API, memory representation, UDF, Code Generations, …
  • Spark-hive: Hive support on Spark

alt
Figure 1. Spark SQL Processing Flow 1

alt
Figure 3. SparkSQL Processing Flow in details

For those who are developers, the process flow of SparkSQL is clearly specified in the org.apache.spark.sql.QueryExecution class

// SQLContext.scala
// org.apache.spark.sql.SQLContext#QueryExecution
class QueryExecution(val logical: LogicalPlan){
 lazy val analyzed:LogicalPlan = analyzer.execute(logical)
 lazy val withCachedData: LogicalPlan = cacheManager.useCachedData(analyzed)
 lazy val optimizedPlan: LogicalPlan = optimizer.execute(withCachedData)
 lazy val sparkPlan: SparkPlan = {
       SparkPlan.currentContext.set(self)
       planner.plan(optimizedPlan).next()
     }
 lazy val executedPlan: SparkPlan = prepareForExecution.execute(sparkPlan)
 lazy val toRdd: RDD[InternalRow] = executedPlan.execute()
}

Catalyst

Catalyst provides an execution planning framework

  • SQL parser & analyzer
  • Logical operators & general expressions
  • Logical optimizer
  • Transform operator tree Corresponding to Figure 2, Catalyst runs through following steps: Parse the query => Analyze the unresolved Logical Plan using the Catalog => Optimize the Logical Plan.

Query optimization with simple rules transform:

A simple query example:

SELECT name
FROM (SELECT id, name
FROM People) p
WHERE p.id = 1

alt
Usual Logical plan to Physical Plan

alt
Optimization in Catalyst

alt
Filter Push-Down’ rule example

alt
Filter Push Down implementation

Some optimization rules in Spark-SQL

Optimization Example Rules
NullPropagation 1+null => null
count(null) = 0
ConstantFolding 1+2=>3
BooleanSimplification false AND $right => false
true AND $right => $right
true OR $right => true
false OR $right => $right
if(true, $then, $else) => $then
SimplifyFilters remove trivially filters:
Filter(true, child) => child
Filter(false, child)=> empty
CombineFilters merges two filters:
Filter($fc, Filter($nc, child)) => Filter(AND($fc, $nc), child)
PushPredicateThroughProject pushes Filter operators through project operator:
Filter('i==1', Project(i, j, child)) => Project(i, j, Filter('i==1', child))
PushPredicateThroughJoin pushes Filter operators through join operator:Filter('left.i'.att == 1, Join(left, right)) => Join(Filter('i==1', left), right)
ColumnPruning Eliminates the reading of unused columns: Join(left, right, leftSemi, "left.id".attr == "right.id".attr) => Join(left, Project('id, right'), leftSemi)

Increasing performance by generating byte code at run-time

  • We know that we are dealing with schema data (with rows and columns), therefore, we will have to evaluate a huge number of expressions like: summing columns for each row (eg: tableA[row_i, column_i] + tableA[row_i, column_j]), subtracting rows,…
  • The problem is the abstract implementation of Java for such expressions - generics: can reduce the number of code but disregard performance:
  • A simple call on 2 integers: a + b will cost: boxing cost, garbage collection cost and virtual functions called (to base class) cost.
  • By generating byte code at run-time, we no longer have to worry about boxing cost and virtual function called. Garbage collection is also reduced.

alt

alt

Leave a Comment