The hidden cost of Spark withColumn

2018-07-11Home

Recently, we've been working on machine learning pipeline with Spark, where Spark SQL & DataFrame is used for data preprocessing and MLlib for training. In one use case, the data source is a very wide Hive table of ~1000 columns. The columns are stored in String so we need to cast them to Integer before they can be fed into model training.

This was what I got initially with DataFrame Scala API (2.2.0).

df.columns.foldLeft(df) { case (df, col) =>
  df.withColumn(col, df(col).cast(IntegerType))
}

Since DataFrame is immutable, I created a new DataFrame for each Column casted using withColumn. I didn't think that was be a big deal since at run time all columns would be casted in one shot thanks to Spark's Catalyst optimizer.

At its core, Catalyst contains a general library for representing trees and applying rules to manipulate them. On top of this framework, we have built libraries specific to relational query processing (e.g., expressions, logical query plans), and several sets of rules that handle different phases of query execution: analysis, logical optimization, physical planning, and code generation to compile parts of queries to Java bytecode

To my surprise, the job stuck in submission for minutes without outputting anything. Luckily, I have a nice colleague Vincent who saved my day with the following fix.

df.select(df.columns.map { col =>
  df(col).cast(IntegerType)
}: _*)

He suspected that it's expensive to call withColumn for a thousand times. Hence, he dived into its implementation and found out the above in the private method withColumns called by withColumn. In his fast version, only one new DataFrame was created.

I wondered why there was a significant cost difference and looked further into it. After turning on the debug log, I saw a lot of === Result of Batch Resolution ===s in my slow version. It suddenly struck me that Catalyst's analysis might not be free. A thousand withColumns were actually a thousand times of analysis, which held true for all APIs on DataFrame. On the other hand, analysis of transform on Column was actually lazy.

The log led me to org.apache.spark.sql.catalyst.rules.RuleExecutor where I spotted a timeMap tracking time running specific rules. What's more exciting, the statistics was exposed through RuleExecutor.dumpTimeSpent which I could add to compare the costs in two versions.

df.columns.foldLeft(df) { case (df, col) =>
  println(RuleExecutor.dumpTimeSpent())
  df.withColumn(col, df(col).cast(IntegerType))
}
println(RuleExecutor.dumpTimeSpent())

df.select(df.columns.map { col =>
  println(RuleExecutor.dumpTimeSpent())
  df(col).cast(IntegerType)
}: _*)
println(RuleExecutor.dumpTimeSpent())

As expected, the time spent increased for each DataFrame#withColumn while that stayed the same for Column#cast. It would take about 100ms for one round of analysis. (I have a table of time spent for all 56 rules in 100 withColumn calls in appendix if you are curious).

Summary

  1. The hidden cost of withColumn is Spark Catalyst's analysis time.
  2. The time spent in Catalyst analysis is usually negligible but it will become an issue when there is a large number of transforms on DataFrame. It's not unusual for a model to have 1000 features which may require preprocessing.
  3. Don't create a new DataFrame for each transform on Column. Create one at last with DataFrame#select

Appendix

RuleNano Time
org.apache.spark.sql.catalyst.analysis.Analyzer$FixNullability489262230
org.apache.spark.sql.catalyst.analysis.Analyzer$ResolveGroupingAnalytics243030776
org.apache.spark.sql.catalyst.analysis.TypeCoercion$PropagateTypes143141555
org.apache.spark.sql.catalyst.analysis.Analyzer$ResolveDeserializer97690381
org.apache.spark.sql.catalyst.analysis.ResolveCreateNamedStruct87845664
org.apache.spark.sql.catalyst.analysis.Analyzer$ResolveWindowFrame85098172
org.apache.spark.sql.catalyst.analysis.Analyzer$ResolveWindowOrder83967566
org.apache.spark.sql.catalyst.analysis.Analyzer$ExtractWindowExpressions63928074
org.apache.spark.sql.catalyst.analysis.Analyzer$ResolveReferences56549170
org.apache.spark.sql.catalyst.analysis.Analyzer$LookupFunctions52411767
org.apache.spark.sql.catalyst.analysis.Analyzer$ExtractGenerator24759815
org.apache.spark.sql.catalyst.analysis.ResolveTimeZone24078761
org.apache.spark.sql.catalyst.analysis.Analyzer$ResolvePivot23264984
org.apache.spark.sql.catalyst.analysis.ResolveInlineTables22864548
org.apache.spark.sql.execution.datasources.FindDataSourceTable22127481
org.apache.spark.sql.catalyst.analysis.DecimalPrecision20855512
org.apache.spark.sql.catalyst.analysis.TypeCoercion$ImplicitTypeCasts19908820
org.apache.spark.sql.catalyst.analysis.TimeWindowing17289560
org.apache.spark.sql.catalyst.analysis.TypeCoercion$DateTimeOperations16691649
org.apache.spark.sql.catalyst.analysis.TypeCoercion$FunctionArgumentConversion16645812
org.apache.spark.sql.catalyst.analysis.ResolveHints$ResolveBroadcastHints16391773
org.apache.spark.sql.catalyst.analysis.Analyzer$ResolveFunctions16094905
org.apache.spark.sql.catalyst.analysis.TypeCoercion$InConversion15937875
org.apache.spark.sql.catalyst.analysis.TypeCoercion$PromoteStrings15659420
org.apache.spark.sql.catalyst.analysis.TypeCoercion$IfCoercion15131194
org.apache.spark.sql.catalyst.analysis.TypeCoercion$BooleanEquality15120505
org.apache.spark.sql.catalyst.analysis.TypeCoercion$Division14657587
org.apache.spark.sql.execution.datasources.PreprocessTableCreation12421808
org.apache.spark.sql.catalyst.analysis.Analyzer$ResolveSubquery12330915
org.apache.spark.sql.catalyst.analysis.UpdateOuterReferences11919954
org.apache.spark.sql.catalyst.analysis.TypeCoercion$CaseWhenCoercion11807169
org.apache.spark.sql.catalyst.analysis.EliminateUnions11761260
org.apache.spark.sql.catalyst.analysis.SubstituteUnresolvedOrdinals11683297
org.apache.spark.sql.catalyst.analysis.ResolveHints$RemoveAllHints11363987
org.apache.spark.sql.execution.datasources.DataSourceAnalysis11253060
org.apache.spark.sql.catalyst.analysis.Analyzer$HandleNullInputsForUDF11075682
org.apache.spark.sql.execution.datasources.PreprocessTableInsertion11061610
org.apache.spark.sql.catalyst.analysis.Analyzer$GlobalAggregates10708386
org.apache.spark.sql.catalyst.analysis.CleanupAliases9447785
org.apache.spark.sql.catalyst.analysis.Analyzer$ResolveAliases4725210
org.apache.spark.sql.catalyst.analysis.Analyzer$ResolveNewInstance3634067
org.apache.spark.sql.catalyst.analysis.Analyzer$CTESubstitution2359406
org.apache.spark.sql.catalyst.analysis.Analyzer$ResolveOrdinalInOrderByAndGroupBy2191643
org.apache.spark.sql.catalyst.analysis.ResolveTableValuedFunctions2160003
org.apache.spark.sql.catalyst.analysis.Analyzer$ResolveRelations2095181
org.apache.spark.sql.catalyst.analysis.Analyzer$ResolveAggregateFunctions2029468
org.apache.spark.sql.catalyst.analysis.TypeCoercion$WidenSetOperationTypes1999994
org.apache.spark.sql.execution.datasources.ResolveSQLOnFile1891759
org.apache.spark.sql.catalyst.analysis.Analyzer$ResolveMissingReferences1864083
org.apache.spark.sql.catalyst.analysis.Analyzer$ResolveNaturalAndUsingJoin1856631
org.apache.spark.sql.catalyst.analysis.Analyzer$ResolveAggAliasInGroupBy1740242
org.apache.spark.sql.catalyst.analysis.Analyzer$ResolveGenerate1714332
org.apache.spark.sql.catalyst.analysis.Analyzer$ResolveUpCast1686660
org.apache.spark.sql.catalyst.analysis.Analyzer$PullOutNondeterministic1602061
org.apache.spark.sql.catalyst.analysis.Analyzer$WindowsSubstitution1406648
org.apache.spark.sql.catalyst.analysis.AliasViewChild1184166