4

I'm trying to apply a pandas_udf to my PySpark dataframe for some filtering, following the groupby('Key').apply(UDF) method. To use the pandas_udf I defined an output schema and have a condition on the column Number. As an example, the simplified idea here is that I wish only to return the ID of the rows with odd Number.

This now brings up a problem that sometimes there is no odd Number in a group therefore the UDF just returns an empty dataframe, which is in conflict with the defined schema to return an int for Number.

Is there a way to solve this problem and only output and combine all the odd Number rows as a new dataframe?

schema = StructType([
        StructField("Key", StringType()),
        StructField("Number", IntegerType())
    ])
@pandas_udf(schema, functionType=PandasUDFType.GROUPED_MAP)
    def get_odd(df):
        odd = df.loc[df['Number']%2 == 1]
        return odd[['ID', 'Number']]
3
  • 1
    Because I wish to deploy the algorithm on a cluster and groupby enables distributed computing. Conducting the conditions i have on a huge dataframe is very expensive without groupby. Commented May 17, 2020 at 19:47
  • 1
    Use a if/else to return an empty data frame with columns defined ? Also how does your return match the schema as you have only the ID column returned ? Commented May 17, 2020 at 20:16
  • That's a typo, just fixed it. Commented May 17, 2020 at 20:26

1 Answer 1

4

I come across this issue with null DataFrame in some groups. I solve this by checking for empty DataFrame and return a DataFrame with schema defined:

if df_out.empty:
    # change the schema as needed
    return pd.DataFrame({'fullVisitorId': pd.Series([], dtype='str'),
                         'time': pd.Series([], dtype='datetime64[ns]'),
                         'total_transactions': pd.Series([], dtype='int')})
Sign up to request clarification or add additional context in comments.

1 Comment

I've found that it's sufficient to provide the column names alone. So if something goes wrong in my pandas_udf functions and I want to return an empty pandas dataframe I just do: return pd.DataFrame(columns=schema.fieldNames()), where schema is the schema of the Spark DataFrame (to be returned) which you passed into your pandas_udf function.

Your Answer

By clicking “Post Your Answer”, you agree to our terms of service and acknowledge you have read our privacy policy.

Start asking to get answers

Find the answer to your question by asking.

Ask question

Explore related questions

See similar questions with these tags.