There are many ways to extend Apache Spark and one of the easiest is with functions that manipulate one of more columns in a DataFrame. When considering different Spark function types, it is important to not ignore the full set of options available to developers.
Beyond the two types of functions–simple Spark user-defined functions (UDFs) and functions that operate on Column–described in the previous link, there two more types of UDFs: user-defined aggregate functions (UDAFs) and user-defined table-generating functions (UDTFs). sum() is an example of an aggregate function and explode() is an example of a table-generating function. The former processes many rows to create a single value. The latter uses value(s) from a single row to “generate” many rows. Spark supports UDAFs directly and UDTFs indirectly, by converting them to Generator expressions.
Beyond all types of UDFs, Spark’s most exciting functions are Spark’s native functions, which is how the logic of most of Spark’s Column and SparkSQL functions is implemented. Internally, Spark native functions are nodes in the Expression trees that determine column values. Very loosely-speaking, an Expression is the internal Spark representation for a Column, just like a LogicalPlan is the internal representation of a data transformation (Dataset/DataFrame).
Native functions, while a bit more involved to create, have three fundamental advantages: better user experience, flexibility and performance.
Better user experience & flexibility comes from native functions’ lifecycle having two distinct phases:
- Analysis, which happens on the driver, while the transformation DAG is created (before an action is run).
- Execution, which happens on executors/workers, while an action is running.
The analysis phase allows Spark native functions to dynamically validate the type of their inputs to produce better error messages and, if necessary, change the type of their result. For example, the return type of sort_array() depends on the input type. If you pass in an array of strings, you’ll get an array of strings. If you pass in an array of ints, you’ll get an array of ints.
A user-defined function, which internally maps to a strongly-typed Scala/JVM function, cannot do this. We can parameterize an implementation by the type of its input, e.g.,
def mySortArray[A: Ordered](arr: Array[A]): Array[A]
but we cannot create type-parameterized UDFs in Spark, requiring hacks such as
spark.udf.register("my_sort_array_int", mySortArray[Int] _)
spark.udf.register("my_sort_array_long", mySortArray[Long] _)
Think of native functions like macros in a traditional programming language. The power of macros also comes from having a lifecycle with two execution phases: compile-time and runtime.
Performance comes from the fact that Spark native functions operate on the internal Spark representation of rows, which, in many cases, avoids serialization/deserialization to “normal” Scala/Java/Python/R datatypes. For example, internally Spark strings are UTF8String. Further, you can choose to implement the runtime behavior of a native function by code-generating Java and participating in whole-stage code generation (reinforcing the macro analogy) or as a simple method.
Working with Spark’s internal (a.k.a., unsafe) datatypes does require careful coding but Spark’s codebase includes many dozens of examples of native functions: essentially, the entire SparkSQL function library. I encourage you to experiment with native Spark function development. As an example, take a look at array_contains().
For user experience, flexibility and performance reasons, at Swoop we have created a number of native Spark functions. We plan on open-sourcing many of them, as well as other tools we have created for improving Spark productivity and performance, via the spark-alchemy library.