0

I have a Dataframe like this:

+--+--------+--------+----+-------------+------------------------------+
|id|name    |lastname|age |timestamp    |creditcards                   |
+--+--------+--------+----+-------------+------------------------------+
|1 |michel  |blanc   |35  |1496756626921|[[hr6,3569823], [ee3,1547869]]|
|2 |peter   |barns   |25  |1496756626551|[[ye8,4569872], [qe5,3485762]]|
+--+--------+--------+----+-------------+------------------------------+

where the schema of my df is like below:

root
 |-- id: string (nullable = true)
 |-- name: string (nullable = true)
 |-- lastname: string (nullable = true)
 |-- age: string (nullable = true)
 |-- timestamp: string (nullable = true)
 |-- creditcards: array (nullable = true)
 |    |-- element: struct (containsNull = true)
 |    |    |-- id: string (nullable = true)
 |    |    |-- number: string (nullable = true)

I would like to convert each line to a json string knowing my schema. So this dataframe would have one column string containing the json. first line should be like this:

{
    "id":"1",
    "name":"michel",
    "lastname":"blanc",
    "age":"35",
    "timestamp":"1496756626921",
    "creditcards":[
        {
            "id":"hr6",
            "number":"3569823"
        },
        {
            "id":"ee3",
            "number":"1547869"  
        }
    ]
}

and the secone line of the dataframe should be like this:

{
    "id":"2",
    "name":"peter",
    "lastname":"barns",
    "age":"25",
    "timestamp":"1496756626551",
    "creditcards":[
        {
            "id":"ye8",
            "number":"4569872"
        },
        {
            "id":"qe5",
            "number":"3485762"  
        }
    ]
}

my goal is not to write the dataframe to json file. My goal is to convert df1 to a second df2 in order to push each json line of df2 to kafka topic I have this code to create the dataframe:

    val line1 = """{"id":"1","name":"michel","lastname":"blanc","age":"35","timestamp":"1496756626921","creditcards":[{"id":"hr6","number":"3569823"},{"id":"ee3","number":"1547869"}]}"""
    val line2 = """{"id":"2","name":"peter","lastname":"barns","age":"25","timestamp":"1496756626551","creditcards":[{"id":"ye8","number":"4569872"}, {"id":"qe5","number":"3485762"}]}"""

    val rdd = sc.parallelize(Seq(line1, line2))
    val df = sqlContext.read.json(rdd)
    df show false
    df printSchema

Do you have any idea?

0

2 Answers 2

6

If all you need is a single-column DataFrame/Dataset with each column value representing each row of the original DataFrame in JSON, you can simply apply toJSON to your DataFrame, as in the following:

df.show
// +---+------------------------------+---+--------+------+-------------+
// |age|creditcards                   |id |lastname|name  |timestamp    |
// +---+------------------------------+---+--------+------+-------------+
// |35 |[[hr6,3569823], [ee3,1547869]]|1  |blanc   |michel|1496756626921|
// |25 |[[ye8,4569872], [qe5,3485762]]|2  |barns   |peter |1496756626551|
// +---+------------------------------+---+--------+------+-------------+

val dsJson = df.toJSON
// dsJson: org.apache.spark.sql.Dataset[String] = [value: string]

dsJson.show
// +--------------------------------------------------------------------------+
// |value                                                                     |
// +--------------------------------------------------------------------------+
// |{"age":"35","creditcards":[{"id":"hr6","number":"3569823"},{"id":"ee3",...|
// |{"age":"25","creditcards":[{"id":"ye8","number":"4569872"},{"id":"qe5",...|
// +--------------------------------------------------------------------------+

[UPDATE]

To add name as an additional column, you can extract it from the JSON column using from_json:

val result = dsJson.withColumn("name", from_json($"value", df.schema)("name"))

result.show
// +--------------------+------+
// |               value|  name|
// +--------------------+------+
// |{"age":"35","cred...|michel|
// |{"age":"25","cred...| peter|
// +--------------------+------+
Sign up to request clarification or add additional context in comments.

Comments

1

For that, you can directly convert your dataframe to a Dataset of JSON string using

val jsonDataset: Dataset[String] = df.toJSON

You can convert it into a dataframe using

val jsonDF: DataFrame = jsonDataset.toDF

Here the json will be alphabetically ordered so the output of

jsonDF show false

will be

    +--------------------------------------------------------------------------------------------------------------------------------------------------------------------+
    |value                                                                                                                                                               |
    +--------------------------------------------------------------------------------------------------------------------------------------------------------------------+
    |{"age":"35","creditcards":[{"id":"hr6","number":"3569823"},{"id":"ee3","number":"1547869"}],"id":"1","lastname":"blanc","name":"michel","timestamp":"1496756626921"}|
    |{"age":"25","creditcards":[{"id":"ye8","number":"4569872"},{"id":"qe5","number":"3485762"}],"id":"2","lastname":"barns","name":"peter","timestamp":"1496756626551"} |
    +--------------------------------------------------------------------------------------------------------------------------------------------------------------------+

Comments

Your Answer

By clicking “Post Your Answer”, you agree to our terms of service and acknowledge you have read our privacy policy.

Start asking to get answers

Find the answer to your question by asking.

Ask question

Explore related questions

See similar questions with these tags.