0

NOTE: This question has many related questions on StackOverFlow but I was unable to get my answer from any of them.

I'm attempting to parallelize Prophet time series model training across multiple time series in a Spark DataFrame using groupBy().applyInPandas() with a Pandas UDF. However, I'm consistently encountering a PySparkValueError: [INVALID_PANDAS_UDF], and I'm struggling to pinpoint the exact cause after extensive debugging.

My Goal: To train a separate Prophet model for each unique attribute in my df_train Spark DataFrame, leveraging Spark's distributed processing.

Environment:

  • Python 3.12.3
  • PySpark (Spark 3.5.6)
  • Pandas (2.3.1)
  • Prophet library
  • Running on remote EC2 server

Input DataFrame Schema (df_train.printSchema() output):

root
|-- attribute: string (nullable = true)
|-- y: long (nullable = true)
|-- ds: timestamp (nullable = true)

My Code:


    from pyspark.sql.functions import col, lit, pandas_udf, PandasUDFType
    from pyspark.sql.types import StructType, StructField, TimestampType, StringType, LongType
    import pandas as pd
    from datetime import datetime
    from prophet import Prophet # Assuming Prophet is installed
    
    # Assume df_single_hash_sorted_dedup is some spark DF
    # I am selecting a specific attribute for testing purpose.

    df_train = df_single_hash_sorted_dedup \
        .filter(df_single_hash_sorted_dedup.attribute == 'session_id') \
        .withColumnRenamed("time_granularity", "ds") \
        .withColumnRenamed("count", "y") \
        .select("attribute","y", "ds")
    
    forecast_schema = StructType([
        StructField("attribute", StringType(), True), # Grouping column
        StructField("y", LongType(), True),           
        StructField("ds", TimestampType(), True)
    ])
    
    @pandas_udf(forecast_schema, PandasUDFType.GROUPED_MAP)
    def train_and_forecast_prophet_multi_metric(pdf: pd.DataFrame) -> pd.DataFrame:
        print(f"Type of input pdf: {type(pdf)}")
        print(f"Columns of input pdf: {pdf.columns.tolist()}")
        return pdf
    
    print("\nApplying debug UDF with applyInPandas...")
    df_result = df_train.groupBy("attribute").applyInPandas(train_and_forecast_prophet_multi_metric, schema=forecast_schema)
    
    # Trigger the action to see the error
    df_result.show(truncate=False)

The Error I'm Receiving:

PySparkValueError: [INVALID_PANDAS_UDF] Invalid function: pandas_udf with function type GROUPED_MAP or the function in groupby.applyInPandas must take either one argument (data) or two arguments (key, data).

Given that the function signature appears correct and I've attempted to align the schemas, why is groupBy().applyInPandas() still raising this PySparkValueError? What subtle aspect of applyInPandas's validation could I be missing, or could this be an environment/version-specific bug?

1
  • applyInPandas() takes a Python native function that maps DataFrame to DataFrame, not one wrapped by pandas_udf. You only need to decorate with pandas_udf if you are going to be calling apply(). Commented Jul 30 at 9:23

3 Answers 3

0
from pyspark.sql import SparkSession
from pyspark.sql.functions import col, lit, pandas_udf, PandasUDFType, rand
from pyspark.sql.types import StructType, StructField, TimestampType, StringType, FloatType
import pandas as pd
from datetime import datetime

# Initialize Spark session
spark = SparkSession.builder.appName("PySparkTest").getOrCreate()

# Define the number of rows you want
num_rows = 100

# Create a DataFrame with two random columns and one timestamp column
df_train = spark.range(num_rows) \
    .withColumn("attribute", lit("a")) \
    .withColumn("y", rand()) \
    .withColumn("ds", rand()) \
    .drop("id")

df_train.show(5)

forecast_schema = StructType([
    StructField("attribute", StringType(), True), # Grouping column
    StructField("y", FloatType(), True),           
    StructField("ds", FloatType(), True)
])

@pandas_udf(forecast_schema, PandasUDFType.GROUPED_MAP)
def train_and_forecast_prophet_multi_metric(pdf: pd.DataFrame) -> pd.DataFrame:
    print(f"Type of input pdf: {type(pdf)}")
    print(f"Columns of input pdf: {pdf.columns.tolist()}")
    return pdf

print("\nApplying debug UDF with applyInPandas...")
df_result = df_train.groupBy("attribute").apply(train_and_forecast_prophet_multi_metric)

# Trigger the action to see the error
df_result.show(truncate=False)
Sign up to request clarification or add additional context in comments.

Comments

0
from pyspark.sql.functions import pandas_udf
from pyspark.sql.types import StructType, StructField, TimestampType, StringType, LongType
import pandas as pd

forecast_schema = StructType([
    StructField("attribute", StringType(), True), # Grouping column
    StructField("y", LongType(), True),
    StructField("ds", TimestampType(), True)
])

@pandas_udf(forecast_schema)
def train_and_forecast_prophet_multi_metric(pdf: pd.DataFrame) -> pd.DataFrame:
    print(f"Type of input pdf: {type(pdf)}")
    print(f"Columns of input pdf: {pdf.columns.tolist()}")
    return pdf

df_result = df_train.groupBy("attribute").applyInPandas(train_and_forecast_prophet_multi_metric, schema=forecast_schema)
df_result.show(truncate=False)

Comments

0

The key is in buried in the error message, but it might not be obvious:

... or the function in groupby.applyInPandas must take either one argument (data) or two arguments (key, data)

GroupedData.applyInPandas() expects a regular Python function, not a UDF. It is a convenience function that saves you the trouble of having to decorate your functions.

def bar(data: pd.DataFrame) -> pd.DataFrame:
   ...

df.groupBy("foo").applyInPandas(bar, schema=schema)

is practically equivalent to:

@pandas_udf(schema, PandasUDFType.GROUPED_MAP)
def bar(data: pd.DataFrame) -> pd.DataFrame:
   ...

df.groupBy("foo").apply(bar)

The difference between the two approaches is that applyInPandas() always calls pandas_udf to create and register a UDF, which is a bit slower than decorating your function with @pandas_udf which creates a single instance of the UDF.

Note: @pandas_udf is a decorator. It is a syntactic sugar for functional composition.

@pandas_udf(schema, PandasUDFType.GROUPED_MAP)
def foo(data: pd.DataFrame) -> pd.DataFrame:
   ...

is the same as:

def _foo(data: pd.DataFrame) -> pd.DataFrame:
   ...

foo = pandas_udf(schema, PandasUDFType.GROUPED_MAP)(_foo)

and foo is no longer the original function with a single DataFrame argument that returns a DataFrame but whatever UDF object pandas_udf returns.

Comments

Your Answer

By clicking “Post Your Answer”, you agree to our terms of service and acknowledge you have read our privacy policy.

Start asking to get answers

Find the answer to your question by asking.

Ask question

Explore related questions

See similar questions with these tags.