I'm at the Spark Summit in Dublin and have been in a session with Team Elastacloud today on Spark optimisation. It has been a pretty good session. Many of the things I knew already; several things on partitions, optimisations of queries and parallelism. All good stuff. I didn't have the chance to look at optimisation of the Catalyst engine for Spark SQL.
I'll try and explain some of my learnings and reasonings. This explicitly is an example I did today with a few variations. The initial key to this is to ensure that you always use a LogicalPlan and that the plan itself is part of a pipeline which takes in a plan and gives you back a plan so this step effectively takes the Divide operation and changes it based on a case statement.
import org.apache.spark.sql.catalyst.expressions.{Add, Cast, Literal, Multiply}
import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan
import org.apache.spark.sql.catalyst.rules.Rule
import org.apache.spark.sql.types.IntegerType
import org.apache.spark.sql.catalyst.expressions.Divide
import org.apache.spark.sql.types.NullType
Once we've done out imports we build a class which extends Rule[LogicalPlan] and this will be the rule that we add to the pipeline. So we create a singleton and test for a -1 - if our condition is triggered we want to print out a message to the console and also return a null (not sure what the use case is for this but it's illustrative rather than practical!)
object DivideByMinusOneRule extends Rule[LogicalPlan] {
override def apply(plan: LogicalPlan): LogicalPlan = {
plan transformAllExpressions {
case Divide(left,right) if right.asInstanceOf[Literal].value.asInstanceOf[Double] == -1 => {
println("division by -1 occurred!")
Cast(Literal(null), IntegerType)
}
}
}
}
At this stage we add the divide rule to the catalyst optimisations.
spark.experimental.extraOptimizations = Seq(DivideByMinusOneRule)
When we've done this we read in a number column from our DataFrame df and divide every value by -1.
val dfOut = df.select($"number" / -1)
dfOut.explain()
dfOut.show(5)
The explain method will confirm that the extra optimisations have been added to the catalyst process pipeline. Then we can show the results and we should see that our message has been printed to the console as well as null value shown in the return.