1

I am trying to process JSON column from PostgreSQL database. I am able to connect to database using:

import os
import pyspark
import findspark
from pyspark import SparkContext
from pyspark.sql import SQLContext
findspark.init(os.environ['SPARK_HOME'])

# DB credentials
user = os.environ['EVENTS_DEV_UID']
password = os.environ['EVENTS_DEV_PWD']
host = os.environ['EVENTS_DEV_HOST']
port = os.environ['EVENTS_DEV_PORT']
db = os.environ['EVENTS_DEV_DBNAME']

# Initiate spark session
sc = SparkContext()
spark = SQLContext(sc)

# Set properties
properties = {"user": user, "password": password, "driver": "org.postgresql.Driver"}

# Load data
df = spark.read.jdbc(
    url = 'jdbc:postgresql://' + host + ':' + port + '/' + db,
    table = 'events',
    properties = properties)

The problem starts with casting JSON field. Spark doesn't recognize struct format of params. When I print schema:

df.printSchema()

root
|-- time: timestamp (nullable = true)
|-- name: string (nullable = true)
|-- params: string (nullable = true)

When I try to cast string to struct:

df = df.withColumn('params', df.params.cast('struct'))

I am getting following error:

ParseException: '\nDataType struct is not supported.(line 1, pos 0)\n\n== SQL ==\nstruct\n^^^\n'

I guess problem is the escape characters. Anybody has idea how to proceed?

1 Answer 1

2

"struct" is not a valid casting type. You can define your own UDF using python's json.loads function. Let's start with a sample data frame:

df = sc.parallelize([[1, "a", "{\"a\":1, \"b\":2}"], [2, "b", "{\"a\":3, \"b\":4}"]])\
    .toDF(["col1", "col2", "json_col"])
df.show()

    +----+----+--------------+
    |col1|col2|      json_col|
    +----+----+--------------+
    |   1|   a|{"a":1, "b":2}|
    |   2|   b|{"a":3, "b":4}|
    +----+----+--------------+

Then the output StructType would have schema:

from pyspark.sql.types import IntegerType, StructField, StructType
schema = StructType([StructField("a", IntegerType()), StructField("b", IntegerType())])

You cannot cast StringType to StructType, hence the UDF:

import pyspark.sql.functions as psf
import json
json_load = psf.udf(json.loads, schema)

Now we can process json_col:

df_parsed = df.withColumn("parsed_json", json_load("json_col"))
df_parsed.show()
df_parsed.printSchema()

    +----+----+--------------+-----------+
    |col1|col2|      json_col|parsed_json|
    +----+----+--------------+-----------+
    |   1|   a|{"a":1, "b":2}|      [1,2]|
    |   2|   b|{"a":3, "b":4}|      [3,4]|
    +----+----+--------------+-----------+

    root
     |-- col1: long (nullable = true)
     |-- col2: string (nullable = true)
     |-- json_col: string (nullable = true)
     |-- parsed_json: struct (nullable = true)
     |    |-- a: integer (nullable = true)
     |    |-- b: integer (nullable = true)

You can also try passing the schema directly when loading the data frame.

Sign up to request clarification or add additional context in comments.

Comments

Your Answer

By clicking “Post Your Answer”, you agree to our terms of service and acknowledge you have read our privacy policy.

Start asking to get answers

Find the answer to your question by asking.

Ask question

Explore related questions

See similar questions with these tags.