3

I have a dataframe with "id" column and a column which has an array of struct. The schema:

root
 |-- id: string (nullable = true)
 |-- desc: array (nullable = false)
 |    |-- element: struct (containsNull = true)
 |    |    |-- name: string (nullable = true)
 |    |    |-- age: long (nullable = false)

The array "desc" can have any number of null values. I would like to create the final dataframe with the array having none of the null values using Spark 1.6:

An example would be:

Key  .   Value
1010 .   [[George,21],null,[MARIE,13],null]
1023 .   [null,[Watson,11],[John,35],null,[Kyle,33]]

I want the final dataframe as:

id   .   desc
1010 .   [[George,21],[MARIE,13]]
1023 .   [[Watson,11],[John,35],[Kyle,33]]

I tried doing this with UDF and case class but got

java.lang.ClassCastException: org.apache.spark.sql.catalyst.expressions.GenericRowWithSchema cannot be cast to....

Any help is greatly appreciated and I would prefer doing it without converting to RDDs if needed.

3 Answers 3

2

Here is another version:

case class Person(name: String, age: Int)

root
 |-- id: string (nullable = true)
 |-- desc: array (nullable = true)
 |    |-- element: struct (containsNull = true)
 |    |    |-- name: string (nullable = true)
 |    |    |-- age: integer (nullable = false)

+----+-----------------------------------------------+
|id  |desc                                           |
+----+-----------------------------------------------+
|1010|[[George,21], null, [MARIE,13], null]          |
|1023|[[Watson,11], null, [John,35], null, [Kyle,33]]|
+----+-----------------------------------------------+


val filterOutNull = udf((xs: Seq[Row]) => {
  xs.flatMap {
    case null => Nil
    // convert the Row back to your specific struct:
    case Row(s: String,i: Int) => List(Person(s, i))
  }
})

val result = df.withColumn("filteredListDesc", filterOutNull($"desc"))

+----+-----------------------------------------------+-----------------------------------+
|id  |desc                                           |filteredListDesc                   |
+----+-----------------------------------------------+-----------------------------------+
|1010|[[George,21], null, [MARIE,13], null]          |[[George,21], [MARIE,13]]          |
|1023|[[Watson,11], null, [John,35], null, [Kyle,33]]|[[Watson,11], [John,35], [Kyle,33]]|
+----+-----------------------------------------------+-----------------------------------+
Sign up to request clarification or add additional context in comments.

2 Comments

Hi there. I have a question.....Why does Seq[Person] not used in the UDF instead Seq[Row] is used. Why is using Seq[Person] giving a casting error when a case class is basically a structure same as defined in the schema?
In short, it's easy for spark to use its internal types like row/struct/arrays instead of JVM objects like case classes.
1

Given that the original dataframe has following schema

root
 |-- id: string (nullable = true)
 |-- desc: array (nullable = true)
 |    |-- element: struct (containsNull = true)
 |    |    |-- name: string (nullable = true)
 |    |    |-- age: long (nullable = false)

Defining a udf function to remove the null values from the array should work for you

import org.apache.spark.sql.functions._
def removeNull = udf((array: Seq[Row])=> array.filterNot(_ == null).map(x => element(x.getAs[String]("name"), x.getAs[Long]("age"))))

df.withColumn("desc", removeNull(col("desc")))

where element is a case class

case class element(name: String, age: Long)

and you should get

+----+-----------------------------------+
|id  |desc                               |
+----+-----------------------------------+
|1010|[[George,21], [MARIE,13]]          |
|1010|[[Watson,11], [John,35], [Kyle,33]]|
+----+-----------------------------------+

2 Comments

I got the part till array.filternot(_ == null ) but why use a map then? I mean could you clarify what the map is doing?
map is just creating structs to be returned
1

Spark 3.4+

array_compact($"desc")

Example input:

case class Person(name: String, age: Long)
val df1 = Seq(
    ("1010", Seq(Person("George", 21), null, Person("MARIE", 13), null)),
    ("1023", Seq(null, Person("Watson", 11), Person("John", 35), null, Person("Kyle", 33)))
).toDF("id", "desc")

df1.show(truncate=false)
// +----+--------------------------------------------------+
// |id  |desc                                              |
// +----+--------------------------------------------------+
// |1010|[{George, 21}, null, {MARIE, 13}, null]           |
// |1023|[null, {Watson, 11}, {John, 35}, null, {Kyle, 33}]|
// +----+--------------------------------------------------+

df1.printSchema()
// root
//  |-- id: string (nullable = true)
//  |-- desc: array (nullable = true)
//  |    |-- element: struct (containsNull = true)
//  |    |    |-- name: string (nullable = true)
//  |    |    |-- age: long (nullable = false)

Using array_compact:

val df2 = df1.withColumn("desc", array_compact($"desc"))

df2.show(truncate=false)
// +----+--------------------------------------+
// |id  |desc                                  |
// +----+--------------------------------------+
// |1010|[{George, 21}, {MARIE, 13}]           |
// |1023|[{Watson, 11}, {John, 35}, {Kyle, 33}]|
// +----+--------------------------------------+

Comments

Your Answer

By clicking “Post Your Answer”, you agree to our terms of service and acknowledge you have read our privacy policy.

Start asking to get answers

Find the answer to your question by asking.

Ask question

Explore related questions

See similar questions with these tags.