0

I want to do group cols some aggregations operations like count, count_distinct or nunique.
For examples,

# the samples values in `date` column are all unique
df.show(7)
+--------------------+---------------------------------+-------------------+---------+
|            category|                             tags|           datetime|     date|
+--------------------+---------------------------------+-------------------+---------+
|                null|      ,industry,display,Merchants|2018-01-08 14:30:32| 20200704|
|        social,smart|    smart,swallow,game,Experience|2019-06-17 04:34:51| 20200705|
|      ,beauty,social|            social,picture,social|2017-08-19 09:01:37| 20200706|
|             default|        default,game,us,adventure|2019-10-02 14:18:56| 20200707|
|financial management|financial management,loan,product|2018-07-17 02:07:39| 20200708|
|              system|  system,font,application,setting|2015-07-18 00:45:57| 20200709|
|                null|     ,system,profile,optimization|2018-09-07 19:59:03| 20200710|

df.printSchema()
root
 |-- category: string (nullable = true)
 |-- tags: string (nullable = true)
 |-- datetime: string (nullable = true)
 |-- date: string (nullable = true)
# I want to do some group aggregations by PySpark like follows in pandas
group_date_tags_cnt_df = df.groupby('date')['tags'].count()
group_date_tags_nunique_df = df.groupby('date')['tags'].nunique()

group_date_category_cnt_df = df.groupby('date')['category'].count()
group_date_category_nunique_df = df.groupby('date')['category'].nunique()

# expected output here
# AND all results should ignore ',' in the splitted result and `null` value in aggregations operations
group_date_tags_cnt_df.show(4)
+---------+---------+
|     date|    count|
+---------+---------+
| 20200704|        3|
| 20200705|        4|
| 20200706|        3|
| 20200707|        4|

group_date_tags_nunique_df.show(4)
+---------+---------------------------------+
|     date|              count(DISTINCT tag)|
+---------+---------------------------------+
| 20200704|                                3|
| 20200705|                                4|
| 20200706|                                3|
| 20200707|                                4|

# It should ignore `null` here
group_date_category_cnt_df.show(4)
+---------+---------+
|     date|    count|
+---------+---------+
| 20200704|        0|
| 20200705|        2|
| 20200706|        2|
| 20200707|        1|

group_date_category_nunique_df.show(4)
+---------+----------------------------+
|     date|    count(DISTINCT category)|
+---------+----------------------------+
| 20200704|                           1|
| 20200705|                           2|
| 20200706|                           2|
| 20200707|                           1|

But the tags and category columns are string type here.
So I think I should do split way first and do group aggregations operations based on.
But I am a little awkward to implement it.
So could anyone help me?

2
  • 1
    Please add sample input and expected output. Commented Jul 14, 2020 at 13:03
  • 1
    split category, split tags. then explode. Commented Jul 14, 2020 at 13:05

2 Answers 2

1
  case class d(
              category: Option[String],
              tags: String,
              datetime: String,
              date: String
              )

  val sourceDF = Seq(
    d(None, ",industry,display,Merchants", "2018-01-08 14:30:32", "20200704"),
    d(Some("social,smart"), "smart,swallow,game,Experience", "2019-06-17 04:34:51", "20200704"),
    d(Some(",beauty,social"), "social,picture,social", "2017-08-19 09:01:37", "20200704")
  ).toDF("category", "tags", "datetime", "date")


  val df1 = sourceDF.withColumn("category", split('category, ","))
    .withColumn("tags", split('tags, ","))


  val df2 = df1.select('datetime, 'date, 'tags,
    explode(
      when(col("category").isNotNull, col("category"))
        .otherwise(array(lit(null).cast("string")))).alias("category")
  )

  val df3 = df2.select('category, 'datetime, 'date,
    explode(
      when(col("tags").isNotNull, col("tags"))
        .otherwise(array(lit(null).cast("string")))).alias("tags")
  )

  val resDF = df3.select('category, 'tags, 'datetime, 'date)

  resDF.show
//  +--------+----------+-------------------+--------+
//  |category|      tags|           datetime|    date|
//  +--------+----------+-------------------+--------+
//  |    null|          |2018-01-08 14:30:32|20200704|
//  |    null|  industry|2018-01-08 14:30:32|20200704|
//  |    null|   display|2018-01-08 14:30:32|20200704|
//  |    null| Merchants|2018-01-08 14:30:32|20200704|
//  |  social|     smart|2019-06-17 04:34:51|20200704|
//  |  social|   swallow|2019-06-17 04:34:51|20200704|
//  |  social|      game|2019-06-17 04:34:51|20200704|
//  |  social|Experience|2019-06-17 04:34:51|20200704|
//  |   smart|     smart|2019-06-17 04:34:51|20200704|
//  |   smart|   swallow|2019-06-17 04:34:51|20200704|
//  |   smart|      game|2019-06-17 04:34:51|20200704|
//  |   smart|Experience|2019-06-17 04:34:51|20200704|
//  |        |    social|2017-08-19 09:01:37|20200704|
//  |        |   picture|2017-08-19 09:01:37|20200704|
//  |        |    social|2017-08-19 09:01:37|20200704|
//  |  beauty|    social|2017-08-19 09:01:37|20200704|
//  |  beauty|   picture|2017-08-19 09:01:37|20200704|
//  |  beauty|    social|2017-08-19 09:01:37|20200704|
//  |  social|    social|2017-08-19 09:01:37|20200704|
//  |  social|   picture|2017-08-19 09:01:37|20200704|
//  +--------+----------+-------------------+--------+




  val group1DF = resDF.groupBy('date, 'category).count()
  group1DF.show
//  +--------+--------+-----+
//  |    date|category|count|
//  +--------+--------+-----+
//  |20200704|  social|    7|
//  |20200704|        |    3|
//  |20200704|   smart|    4|
//  |20200704|  beauty|    3|
//  |20200704|    null|    4|
//  +--------+--------+-----+

  val group2DF = resDF.groupBy('datetime, 'category).count()
  group2DF.show
//  +-------------------+--------+-----+
//  |           datetime|category|count|
//  +-------------------+--------+-----+
//  |2017-08-19 09:01:37|  social|    3|
//  |2017-08-19 09:01:37|  beauty|    3|
//  |2019-06-17 04:34:51|   smart|    4|
//  |2019-06-17 04:34:51|  social|    4|
//  |2018-01-08 14:30:32|    null|    4|
//  |2017-08-19 09:01:37|        |    3|
//  +-------------------+--------+-----+
Sign up to request clarification or add additional context in comments.

Comments

1

Pyspark code for which solves your problem, I have taken the 3 dates data 20200702, 20200704, 20200705

from pyspark.sql import Row
from pyspark.sql.functions import *

drow = Row("category","tags","datetime","date")

data = [drow("", ",industry,display,Merchants","2018-01-08 14:30:32","20200704"),drow("social,smart","smart,swallow,game,Experience","2019-06-17 04:34:51","20200702"),drow(",beauty,social", "social,picture,social", "2017-08-19 09:01:37", "20200705")]
df = spark.createDataFrame(data)

final_df=df.withColumn("category", split(df['category'], ",")).withColumn("tags", split(df['tags'], ",")).select('datetime', 'date', 'tags', explode(when(col("category").isNotNull(), col("category")).otherwise(array(lit("").cast("string")))).alias("category")).select('datetime', 'date', 'category', explode(when(col("tags").isNotNull(), col("tags")).otherwise(array(lit("").cast("string")))).alias("tags")).alias("tags")

final_df.show()
'''
+-------------------+--------+--------+----------+
|           datetime|    date|category|      tags|
+-------------------+--------+--------+----------+
|2018-01-08 14:30:32|20200704|        |          |
|2018-01-08 14:30:32|20200704|        |  industry|
|2018-01-08 14:30:32|20200704|        |   display|
|2018-01-08 14:30:32|20200704|        | Merchants|
|2019-06-17 04:34:51|20200702|  social|     smart|
|2019-06-17 04:34:51|20200702|  social|   swallow|
|2019-06-17 04:34:51|20200702|  social|      game|
|2019-06-17 04:34:51|20200702|  social|Experience|
|2019-06-17 04:34:51|20200702|   smart|     smart|
|2019-06-17 04:34:51|20200702|   smart|   swallow|
|2019-06-17 04:34:51|20200702|   smart|      game|
|2019-06-17 04:34:51|20200702|   smart|Experience|
|2017-08-19 09:01:37|20200705|        |    social|
|2017-08-19 09:01:37|20200705|        |   picture|
|2017-08-19 09:01:37|20200705|        |    social|
|2017-08-19 09:01:37|20200705|  beauty|    social|
|2017-08-19 09:01:37|20200705|  beauty|   picture|
|2017-08-19 09:01:37|20200705|  beauty|    social|
|2017-08-19 09:01:37|20200705|  social|    social|
|2017-08-19 09:01:37|20200705|  social|   picture|
+-------------------+--------+--------+----------+
only showing top 20 rows'''


final_df.groupBy('date','tags').count().show()
'''
+--------+----------+-----+
|    date|      tags|count|
+--------+----------+-----+
|20200702|     smart|    2|
|20200705|   picture|    3|
|20200702|   swallow|    2|
|20200704|  industry|    1|
|20200704|   display|    1|
|20200702|      game|    2|
|20200704|          |    1|
|20200704| Merchants|    1|
|20200702|Experience|    2|
|20200705|    social|    6|
+--------+----------+-----+
'''

final_df.groupBy('date','category').count().show()
'''
+--------+--------+-----+
|    date|category|count|
+--------+--------+-----+
|20200702|   smart|    4|
|20200702|  social|    4|
|20200705|        |    3|
|20200705|  beauty|    3|
|20200704|        |    4|
|20200705|  social|    3|
+--------+--------+-----+
'''

Comments

Your Answer

By clicking “Post Your Answer”, you agree to our terms of service and acknowledge you have read our privacy policy.

Start asking to get answers

Find the answer to your question by asking.

Ask question

Explore related questions

See similar questions with these tags.