2019-02-23Home
This is the log of a bizarre NullPointerException(NPE)
from a Spark application that extends scala.App
and a minor bug of Spark hidden for years.
Last week, I was developing the following Spark (2.3.1) application that pushes data from Hive
to a database.
object Test extends App {
val foo = args(0)
val bar = args(1)
val spark = SparkSession.builder
.getOrCreate()
spark.sql("select * from test")
.foreachPartition { rowIter: Internal[Row] =>
val client = new DbClient
client.setFoo(foo) // throws NullPointerException
client.setBar(bar)
client.init
client.put(rowIter.map { row =>
row.getLong(0) -> row.getLong(1)
}.toMap)
}
}
The local variables foo
and bar
are used to configure the DbClient
on remote executors. I was expecting them to be shipped through closures but somehow they weren't.
My colleague Vincent pointed me to ClosureCleaner
which logDebug
all fields, methods and classes in closures. I enabled driver's debug log and the absence of foo
and bar
assured me that they were not captured by closure.
Then I looked into what happens in foreachPartition
. The byte code reminded me that foo
and bar
were static members of Test$
rather than local variables.
9: getstatic #26 // Field Test$.MODULE$:LTest$;
12: invokevirtual #30 // Method Test$.foo:()Ljava/lang/String;
15: invokevirtual #34 // Method DbClient.setFoo:(Ljava/lang/String;)V
It suddenly struck me that the initialization of the class (Test$
) that extends scala.App
is actually delayed to its main()
method. That's why foo
and bar
were uninitialized as confirmed by scala.App's doc,
It should be noted that this trait is implemented using the DelayedInit functionality, which means that fields of the object will not have been initialized before the main method has been executed.
Meanwhile, I googled "Spark closure problems" which led me to an ancient Spark jira Closure problems when running Scala app that "extends App". There was a fix that would print a warning when an application extends App
.
if (classOf[scala.App].isAssignableFrom(mainClass)) {
printWarning("Subclasses of scala.App may not work correctly. Use a main() method instead.")
}
Why have I never been warned ? After some digging, I realized the above "if clause" would never be true. This is because Scala compiler generate two Java classes Test
and Test$
from object Test
and the mainClass
(Test
) I passed in is not that (Test$
) extends scala.App
. It's recorded in SPARK-26977.
The solution is not (never for Spark) to extend scala.App
and put everything in Test's own main method. Then foo
and bar
are now initialized local variables and shipped remotely through closure.
How to decompile the anonymous function passed to foreachPartition
?
javap -c Test\$\$anonfun\$1
How to enable driver's debug log ?
Prepare a log4j.properties
file with
log4j.rootLogger=DEBUG
and submit a Spark application with
--driver-java-options "-Dlog4j.configuration=file:/path/to/log4j.properties"