PySpark UDFs and Higher-Order Functions: When to Use UDFs, Performance Pitfalls, Pandas UDFs, and Array/Map Functions for Complex Transformations
PySpark’s built-in functions (col(), when(), regexp_replace()) handle 90% of transformations. But what about the other 10%? Custom business logic, complex parsing, third-party library calls — things that do not map to a built-in function.
That is where User-Defined Functions (UDFs) come in. But UDFs have a dark side — they can be 10-100x slower than built-in functions if used incorrectly. This post covers when to use UDFs, the performance trap, the Pandas UDF solution, and the often-overlooked higher-order functions that eliminate the need for UDFs in many cases.
Think of built-in PySpark functions like assembly line machines — fast, optimized, and designed for specific tasks. UDFs are like hand-crafting each item — flexible but slow because every piece must leave the assembly line, go to a workbench (Python interpreter), and come back. Pandas UDFs are like bringing a specialized workstation onto the assembly line — still custom work, but batched and much faster.
Table of Contents
- Built-in Functions vs UDFs
- Standard Python UDFs
- Creating a UDF
- UDF with Multiple Columns
- The Performance Problem with UDFs
- Pandas UDFs (Vectorized UDFs)
- Scalar Pandas UDF
- Grouped Map Pandas UDF
- UDF vs Pandas UDF Performance Comparison
- Higher-Order Functions (No UDF Needed)
- transform (Apply to Each Array Element)
- filter (Filter Array Elements)
- aggregate (Reduce Array to Single Value)
- exists (Check if Any Element Matches)
- Working with Complex Types
- Arrays
- Maps
- Structs
- When to Use What
- Common Mistakes
- Interview Questions
- Wrapping Up
Built-in Functions vs UDFs
| Feature | Built-in Functions | UDFs |
|---|---|---|
| Speed | Optimized (runs in JVM/Spark engine) | Slow (serializes data to Python and back) |
| Catalyst optimizer | Can optimize (predicate pushdown, etc.) | Black box — optimizer cannot optimize |
| Use case | Standard transformations | Custom logic not available in built-ins |
| Examples | col(), when(), regexp_replace(), concat() | Custom scoring, API calls, complex parsing |
Rule of thumb: Always try built-in functions first. Only use UDFs when there is genuinely no built-in alternative.
Standard Python UDFs
Creating a UDF
from pyspark.sql.functions import udf, col
from pyspark.sql.types import StringType, IntegerType, FloatType
# Method 1: decorator
@udf(returnType=StringType())
def categorize_age(age):
if age is None:
return "Unknown"
elif age < 18:
return "Minor"
elif age < 35:
return "Young Adult"
elif age < 60:
return "Adult"
else:
return "Senior"
df_result = df.withColumn("age_category", categorize_age(col("age")))
# Method 2: register function
def calculate_bmi(weight_kg, height_m):
if weight_kg is None or height_m is None or height_m == 0:
return None
return round(weight_kg / (height_m ** 2), 1)
bmi_udf = udf(calculate_bmi, FloatType())
df_result = df.withColumn("bmi", bmi_udf(col("weight"), col("height")))
UDF with Multiple Columns
# UDF that takes multiple input columns
@udf(returnType=StringType())
def full_address(street, city, state, zip_code):
parts = [p for p in [street, city, state, zip_code] if p is not None]
return ", ".join(parts) if parts else None
df_result = df.withColumn("full_address",
full_address(col("street"), col("city"), col("state"), col("zip"))
)
The Performance Problem with UDFs
What happens when a Python UDF runs on Spark:
For EACH ROW:
1. Spark (JVM) serializes the row data → Python format
2. Data crosses JVM → Python interpreter boundary
3. Python function executes on the single row
4. Result serializes Python → JVM format
5. Data crosses Python → JVM boundary
For 10 million rows = 10 million round trips!
Built-in functions: process entire columns at once in the JVM
UDFs: process one row at a time with serialization overhead
Typical performance difference:
Built-in: 10 million rows in 2 seconds
Python UDF: 10 million rows in 45 seconds (20x slower)
Pandas UDF: 10 million rows in 5 seconds (close to built-in!)
Pandas UDFs (Vectorized UDFs)
Pandas UDFs solve the performance problem by processing data in batches (as pandas Series or DataFrames) instead of row by row. Spark sends chunks of data to Python, and the UDF processes an entire chunk at once using vectorized pandas operations.
Scalar Pandas UDF
import pandas as pd
from pyspark.sql.functions import pandas_udf
# Scalar: takes a pandas Series, returns a pandas Series (same length)
@pandas_udf(StringType())
def categorize_age_fast(ages: pd.Series) -> pd.Series:
return ages.apply(lambda age:
"Unknown" if pd.isna(age) else
"Minor" if age < 18 else
"Young Adult" if age < 35 else
"Adult" if age < 60 else
"Senior"
)
# Usage is identical to regular UDFs
df_result = df.withColumn("age_category", categorize_age_fast(col("age")))
# But runs 5-10x faster than the regular UDF version!
# Pandas UDF with numpy for heavy computation
import numpy as np
@pandas_udf(FloatType())
def calculate_bmi_fast(weights: pd.Series, heights: pd.Series) -> pd.Series:
return np.round(weights / (heights ** 2), 1)
df_result = df.withColumn("bmi", calculate_bmi_fast(col("weight"), col("height")))
Grouped Map Pandas UDF
# Grouped Map: takes a pandas DataFrame (one group), returns a pandas DataFrame
# Perfect for: normalization per group, custom aggregations, statistical functions
from pyspark.sql.functions import pandas_udf, PandasUDFType
# Normalize salary within each department
@pandas_udf(df.schema, PandasUDFType.GROUPED_MAP)
def normalize_salary(pdf: pd.DataFrame) -> pd.DataFrame:
pdf["salary_normalized"] = (
(pdf["salary"] - pdf["salary"].mean()) / pdf["salary"].std()
)
return pdf
df_normalized = df.groupBy("department").apply(normalize_salary)
# Each department's salaries are normalized independently
# Z-score: 0 = average salary for that department
UDF vs Pandas UDF Performance Comparison
| Approach | 10M Rows | How It Processes | When to Use |
|---|---|---|---|
| Built-in functions | ~2 sec | Entire column in JVM | Always first choice |
| Pandas UDF | ~5 sec | Batches (pandas Series) | Custom logic with numpy/pandas |
| Python UDF | ~45 sec | Row by row (serialized) | Last resort only |
Higher-Order Functions (No UDF Needed)
PySpark has built-in functions for operating on arrays and maps that eliminate the need for UDFs in many scenarios. These run at JVM speed — no Python serialization.
transform (Apply to Each Array Element)
from pyspark.sql.functions import transform, col, upper
# Sample: each row has an array of tags
# | id | tags |
# | 1 | ["python", "spark"] |
# | 2 | ["sql", "data", "etl"] |
# Uppercase every element in the array
df_result = df.withColumn("tags_upper", transform(col("tags"), lambda x: upper(x)))
# | id | tags_upper |
# | 1 | ["PYTHON", "SPARK"] |
# | 2 | ["SQL", "DATA", "ETL"] |
# Add prefix to each element
df_result = df.withColumn("tags_prefixed",
transform(col("tags"), lambda x: concat(lit("skill_"), x))
)
filter (Filter Array Elements)
from pyspark.sql.functions import filter as spark_filter
# Keep only tags that start with "py"
df_result = df.withColumn("py_tags",
spark_filter(col("tags"), lambda x: x.startswith("py"))
)
# | id | py_tags |
# | 1 | ["python"] |
# | 2 | [] |
# Filter array of numbers: keep only values > 50
df_result = df.withColumn("high_scores",
spark_filter(col("scores"), lambda x: x > 50)
)
aggregate (Reduce Array to Single Value)
from pyspark.sql.functions import aggregate
# Sum all elements in an array
df_result = df.withColumn("total_score",
aggregate(col("scores"), lit(0), lambda acc, x: acc + x)
)
# scores: [10, 20, 30] → total_score: 60
# Concatenate array elements into a comma-separated string
df_result = df.withColumn("tags_str",
aggregate(col("tags"), lit(""),
lambda acc, x: when(acc == "", x).otherwise(concat(acc, lit(", "), x)))
)
exists (Check if Any Element Matches)
from pyspark.sql.functions import exists
# Check if any tag is "python"
df_result = df.withColumn("knows_python",
exists(col("tags"), lambda x: x == "python")
)
# | id | knows_python |
# | 1 | true |
# | 2 | false |
Working with Complex Types
Arrays
from pyspark.sql.functions import array, explode, collect_list, size, array_contains
# Create array from columns
df = df.withColumn("name_parts", array(col("first_name"), col("last_name")))
# Explode: one row per array element
df_exploded = df.select("id", explode(col("tags")).alias("tag"))
# | id | tag |
# | 1 | python |
# | 1 | spark |
# Collect back: reverse of explode
df_grouped = df_exploded.groupBy("id").agg(collect_list("tag").alias("tags"))
# Array operations
df.withColumn("tag_count", size(col("tags")))
df.withColumn("has_python", array_contains(col("tags"), "python"))
Maps
from pyspark.sql.functions import create_map, map_keys, map_values, explode
# Create map from columns
df = df.withColumn("properties", create_map(
lit("name"), col("name"),
lit("city"), col("city")
))
# Access map values
df.withColumn("city", col("properties")["city"])
df.withColumn("keys", map_keys(col("properties")))
df.withColumn("values", map_values(col("properties")))
Structs
from pyspark.sql.functions import struct
# Create a struct (nested object)
df = df.withColumn("address", struct(
col("street"), col("city"), col("state"), col("zip")
))
# Access struct fields
df.select("address.city", "address.state")
When to Use What
| Scenario | Approach | Why |
|---|---|---|
| String cleaning, date parsing | Built-in functions | Fastest, optimizable by Catalyst |
| Conditional logic | when().otherwise() |
Built-in, no UDF needed |
| Transform array elements | transform() |
Higher-order function, JVM speed |
| Custom scoring / business rules | Pandas UDF | Custom logic but vectorized (fast) |
| Call external API per row | Python UDF (with caution) | No alternative, but batch if possible |
| Group-level normalization | Grouped Map Pandas UDF | Full pandas power per group |
| Simple math on arrays | aggregate() |
Built-in, no serialization |
Common Mistakes
- Using UDFs when built-in functions exist — always check the PySpark SQL functions documentation first. Functions like
regexp_replace,when/otherwise,coalesce, andtransformhandle most cases without UDFs. - Using Python UDFs instead of Pandas UDFs — if you must write a UDF, use Pandas UDFs for 5-10x better performance. The API is almost identical.
- Not handling nulls in UDFs — UDFs receive null values as Python
None. If your function does not check forNone, it will throw a TypeError on the first null row. - Returning wrong types from UDFs — if you declare
returnType=IntegerType()but return a string, Spark silently returns null. Always match the return type declaration to the actual return value. - Using Python loops instead of higher-order functions —
for element in array_columndoes not work in PySpark. Usetransform(),filter(),aggregate(), orexplode()instead.
Interview Questions
Q: Why are PySpark UDFs slow and how do you improve performance? A: Python UDFs serialize each row from JVM to Python and back — one round trip per row. For 10 million rows, that is 10 million serialization cycles. Use Pandas UDFs instead — they process data in batches (as pandas Series), reducing serialization to a few hundred chunks instead of millions of rows. Pandas UDFs are typically 5-10x faster than regular Python UDFs.
Q: What are higher-order functions in PySpark?
A: Built-in functions that operate on arrays without UDFs: transform() applies a function to each element, filter() keeps elements matching a condition, aggregate() reduces an array to a single value, and exists() checks if any element matches. They run at JVM speed with no Python serialization overhead.
Q: When should you use a UDF vs a built-in function? A: Always prefer built-in functions — they are optimized by the Catalyst query optimizer and run in the JVM. Use UDFs only when no built-in function can express your logic — complex business rules, third-party library calls, or custom parsing that regexp cannot handle. When you must use a UDF, use a Pandas UDF for vectorized performance.
Wrapping Up
The hierarchy is clear: built-in functions first, higher-order functions for arrays, Pandas UDFs for custom logic, and regular Python UDFs only as a last resort. Understanding this hierarchy is the difference between a PySpark job that runs in 2 minutes and one that runs in 45 minutes. Check the built-in functions documentation before writing any UDF — you will be surprised how often the function you need already exists.
Related posts: – PySpark Transformations Cookbook – PySpark Foundations – Lazy Evaluation in PySpark
Naveen Vuppula is a Senior Data Engineering Consultant and app developer based in Ontario, Canada. He writes about Python, SQL, AWS, Azure, and everything data engineering at DriveDataScience.com.