NOTE: This question has many related questions on StackOverFlow but I was unable to get my answer from any of them.
I'm attempting to parallelize Prophet time series model training across multiple time series in a Spark DataFrame using groupBy().applyInPandas() with a Pandas UDF. However, I'm consistently encountering a PySparkValueError: [INVALID_PANDAS_UDF], and I'm struggling to pinpoint the exact cause after extensive debugging.
My Goal:
To train a separate Prophet model for each unique attribute in my df_train Spark DataFrame, leveraging Spark's distributed processing.
Environment:
- Python 3.12.3
- PySpark (Spark 3.5.6)
- Pandas (2.3.1)
- Prophet library
- Running on remote EC2 server
Input DataFrame Schema (df_train.printSchema() output):
root
|-- attribute: string (nullable = true)
|-- y: long (nullable = true)
|-- ds: timestamp (nullable = true)
My Code:
from pyspark.sql.functions import col, lit, pandas_udf, PandasUDFType
from pyspark.sql.types import StructType, StructField, TimestampType, StringType, LongType
import pandas as pd
from datetime import datetime
from prophet import Prophet # Assuming Prophet is installed
# Assume df_single_hash_sorted_dedup is some spark DF
# I am selecting a specific attribute for testing purpose.
df_train = df_single_hash_sorted_dedup \
.filter(df_single_hash_sorted_dedup.attribute == 'session_id') \
.withColumnRenamed("time_granularity", "ds") \
.withColumnRenamed("count", "y") \
.select("attribute","y", "ds")
forecast_schema = StructType([
StructField("attribute", StringType(), True), # Grouping column
StructField("y", LongType(), True),
StructField("ds", TimestampType(), True)
])
@pandas_udf(forecast_schema, PandasUDFType.GROUPED_MAP)
def train_and_forecast_prophet_multi_metric(pdf: pd.DataFrame) -> pd.DataFrame:
print(f"Type of input pdf: {type(pdf)}")
print(f"Columns of input pdf: {pdf.columns.tolist()}")
return pdf
print("\nApplying debug UDF with applyInPandas...")
df_result = df_train.groupBy("attribute").applyInPandas(train_and_forecast_prophet_multi_metric, schema=forecast_schema)
# Trigger the action to see the error
df_result.show(truncate=False)
The Error I'm Receiving:
PySparkValueError: [INVALID_PANDAS_UDF] Invalid function: pandas_udf with function type GROUPED_MAP or the function in groupby.applyInPandas must take either one argument (data) or two arguments (key, data).
Given that the function signature appears correct and I've attempted to align the schemas, why is groupBy().applyInPandas() still raising this PySparkValueError? What subtle aspect of applyInPandas's validation could I be missing, or could this be an environment/version-specific bug?
applyInPandas()takes a Python native function that maps DataFrame to DataFrame, not one wrapped bypandas_udf. You only need to decorate withpandas_udfif you are going to be callingapply().