Bring your (py)spark code to the next level with User Defined Functions (UDFs) and the native alternative.
Everyone in the Software Engineering field should know about the benefits of using functions. They allow you to bundle specific logic in a reusable manner, hence minimizing code duplication and increasing developer efficiency. User Defined Functions (UDFs) and its native alternative provides all these benefits, elvating your spark workload.
UDFs enable you to add custom logic blocks in the form of function calls inside your spark code. But there are some important caveats you should be aware of, before you jump in. So, let me guide you through this topic with examples in pyspark and a detailed coverage about which type is the preferred one, to have the most efficient solution.
Types of UDFs
Before we jump into code examples, let’s discuss the different kinds of UDFs.
- Native spark code: Now this is technically not a UDF, but you can wrap functions around your native pyspark code. This is the most efficient way, because it leverages the optimization which comes with using the spark execution planner and still gives you the benefits of functions. The only thing not working will be, that you can’t register them and reuse them within spark SQL.
- JVM native code: As spark is based on the JVM, code written in JVM languages (e.g. Scala) will perform substantially better than e.g. native python code. This is due to extra load due to the required data serialization, once logic is executed outside of the JVM.
- Pandas UDFs: If you decide to work with Python, Pandas UDFs are your best bet. These enable vectorized processing of your data, which is significantly more efficient than procedural execution of standard Python UDFs.
- Python UDFs: These will process your data in a procedural fashion. This is why you should really only consider this type if you have small to medium sized workloads in your spark job. In large data jobs, this will be a guaranteed bottleneck. Both python UDFs strongly benefit from type hints in your function definition, to help.
Implementation of UDFs
Now that we are aware of the different available types of UDFs, let’s focus on bringing them to life within code. For the first general example, I will use the pure python code, because it is the easiest one to understand. This blog post evolves mainly around pyspark, that’s why I will skip the JVM native code UDFs, but you can have a look into a Scala example here.
All basic examples I’ll show you will create a new column based on the first 3 letters of another column from a demo dataset. Let’s take a look on how to define a function which can be used within pyspark and spark SQL:
Python based UDF
from pyspark.sql.functions import substring, col, udf
from pyspark.sql.types import StringType
@udf(StringType())
def get_first_characters(first_name: str) -> str:
return first_name[:3]
df = spark.read.load("/databricks-datasets/learning-spark-v2/people/people-10m.delta")
df_with_python = df.withColumn("first_letters", get_first_characters(col("firstName")))
Here you can see, that we define a normal python function and use the @udf decorator to register it as a UDF. This makes it usable within your spark transformations. There is also another way to register a UDF, like shown here:
def get_first_characters(first_name: str) -> str:
return first_name[:3]
get_first_characters_udf = udf(lambda x:get_first_characters(x),StringType())
This uses a lambda function stored in a variable to explicitly define your function as a UDF. I personally prefer the decorator way, therefore I will only use this is the following examples.
There is also the possibility to register your UDF to make it usable with SQL. This is how it can be done:
spark.udf.register("get_first_characters_udf", get_first_characters)
This enables you to directly use it within spark SQL:
spark.sql(
"""
SELECT
get_first_characters_udf(firstName)
FROM
delta.`/databricks-datasets/learning-spark-v2/people/people-10m.delta`
"""
)
Running a display(df) reveals that everything is working as expected:
df_with_python.display()
The interesting part is somewhere else. If we take a look into the execution plan (via df_with_python.explain()) of our query, we will notice something we should already expect. The execution plan tells us, that the spark engine (in my case photon, because I’m using Databricks) is not able to optimize every step in our process:
This means that the spark engine is not able to optimize the query like it would do with native functions. Again, this could become your bottleneck in your process, so let’s look into a better performing solution.
Pandas UDFs
As mentioned before, if you really rely on a custom logic which can’t be built using spark native functions, try at least to use pandas UDFs. The vectorized execution will be substantially faster than the procedural approach of vanilla python UDFs. So let’s see, how these are implemented:
from pyspark.sql.functions import substring, col, pandas_udf, PandasUDFType
import pandas as pd
@pandas_udf('string')
def get_first_characters_pandas(input_series: pd.Series) -> pd.Series:
return input_series.str[0:3]
df_with_pandas = df.withColumn("first_letters", get_first_characters_pandas(col("firstName")))
Here you can see, how our former example from before looks like with Panads UDFs. If we look into the execution plan, we can see, that the ArrowEvalPython part is new. That’s where the vectorized tranformation is happening:
The performance improvements were already quite substential for this demo. The pure python approach took ~ 2 seconds and the pandas UDF could do the full task in ~1.5 seconds.
The important part here, is that you receive a pandas Series as your function parameter and you’re returning a pandas series again. That means, everything is handled at once in a vector.
Lets take a look, what a “vector” really means. You might already now it from your math courses, but let’s take a look how this is defined in python using a pandas series.
import pandas as pd
vector a= pd.Series([1, 2 , 3 , 4 ,5])
print(vector)
As you can see, a vector is “just” a list of values. It can be a series of integers, or, like in our example UDF, a series of strings. But this should make it clearer, that with using pandas UDFs, you are not applying your logic element by element, you’re applying it to the whole vector.
Now these vectorized transformations are already faster for a single column transformation, but where they really shine, is if you have a calculation with multiple vectors, like:
print(vector * vector)
One other great functionality provided by pandas UDFs, is the fact, that you can create your own aggregation functions. This can be achieved fairly simple, by just not returning a series, but a scalar(single) value. For a quick example, let me mimic the spark native function first, which just return the first occurrence of a column value in a given group.
@pandas_udf('string')
def get_first(input_series: pd.Series) -> str:
return input_series[0]
df_with_pandas_agg = df.groupBy("gender").agg(get_first(col("firstName")))
Note that you will require a groupBy, like with any other aggregation. Here is what the output looks like:
And if we look into the execution plan, we can see that there is a special AggregateInPandas part:
But be aware, that this aggregation is pretty slow, we are talking about 8 sec vs 4 sec for the native spark first method on this example data set.
I hope this provided you with a good understanding of pandas UDFs. If you want to learn more about pandas UDFs, take a look here.
Spark Native
You can also use spark functions directly in your functions to have all the benefits of a function in the most efficient way. Now this is not really a UDF, but it is the most recommended way on implementing functions in Pyspark, that’s why I also want to show it to you.
The usage is pretty straightforward as you can see, we just wrap everything into a function and use the standard pyspark functionalities.
from pyspark.sql.functions import substring, col
from pyspark.sql import DataFrame
def get_first_characters_spark(df: DataFrame, input_col_name, new_col_name) -> DataFrame:
return df.withColumn(new_col_name, substring(input_col_name, 0, 3))
There is no need for the decorators and the way it works is also a little different. You are providing your full spark DataFrame to the function and other parameters is might need to work. Then it returns the full DataFrame again.
Using it works like this:
df_with_spark = get_first_characters_spark(df, "firstName", "first_letters")
That’s it, if we look into the execution plan, we can see that there is only spark related information in it:
The performance also looks like we expect. For this example, the native spark function took ~1.1 seconds. So we have ~2s vs. ~1.5s vs ~1.1s for Python vs. Pandas vs. Spark, which gives you already quite a good idea on the performance costs of not using spark native code. There will be some variation of the difference depending on the specific workload, but the order will almost always be the same.
Conclusion
Functions are a great way to improve your code and I hope you could get a good understanding of how you can integrate this knowledge into your py(spark) code.