0

I am trying to convert a csv file to a dataframe in Spark 1.5.2 with Scala without the use of the library databricks, as it is a community project and this library is not available. My approach was the following:

var inputPath  = "input.csv"
var text = sc.textFile(inputPath)
var rows = text.map(line => line.split(",").map(_.trim))
var header = rows.first()
var data = rows.filter(_(0) != header(0))
var df = sc.makeRDD(1 to data.count().toInt).map(i => (data.take(i).drop(i-1)(0)(0), data.take(i).drop(i-1)(0)(1), data.take(i).drop(i-1)(0)(2), data.take(i).drop(i-1)(0)(3), data.take(i).drop(i-1)(0)(4))).toDF(header(0), header(1), header(2), header(3), header(4))

This code, even though it is quite a mess, works without returning any error messages. The problem comes when trying to display the data inside dfin order to verify the correctness of this method and later try to do some queries in df. The error code I am getting after executing df.show() is SPARK-5063. My questions are:

1) Why is it not possible to print the content of df?

2) Is there any other more straightforward method to convert a csv to a dataframe in Spark 1.5.2 without using the library databricks?

3
  • 1
    "it is a community project" -- are you serious? Do you know that Databricks is the company which drives Spark development? Do you know that the spark-csv plug-in has been merged into Spark 2.x core libraries? Commented Mar 24, 2017 at 10:03
  • The problem is that I do not have the chance to change that, therefore I am looking for alternative ways to parse a csv to a dataframe without using Databricks. Commented Mar 24, 2017 at 10:11
  • "change that" -- what do you mean? That you cannot download the JAR (once and for all) and attach it to the job with --jars, along with its commons-csv dependency? It worked pretty well for me with Spark bundled in the CDH distro (note that with an Apache build, --jars did not work well with CDH, I had to go for spark.driver.extraClasspath prop and an explicit sc.addJar() as a workaround) Commented Mar 24, 2017 at 10:19

3 Answers 3

4

For spark 1.5.x can be used code snippet below to convert input into DF

val sqlContext = new org.apache.spark.sql.SQLContext(sc)
// this is used to implicitly convert an RDD to a DataFrame.
import sqlContext.implicits._

// Define the schema using a case class.
// Note: Case classes in Scala 2.10 can support only up to 22 fields. To work around this limit,
// you can use custom classes that implement the DataClass interface with 5 fields.
case class DataClass(id: Int, name: String, surname: String, bdate: String, address: String)

// Create an RDD of DataClass objects and register it as a table.
val peopleData = sc.textFile("input.csv").map(_.split(",")).map(p => DataClass(p(0).trim.toInt, p(1).trim, p(2).trim, p(3).trim, p(4).trim)).toDF()
peopleData.registerTempTable("dataTable")

val peopleDataFrame = sqlContext.sql("SELECT * from dataTable")

peopleDataFrame.show()

Spark 1.5

Sign up to request clarification or add additional context in comments.

Comments

2

You can create like this:

SparkSession spark = SparkSession
                .builder()
                .appName("RDDtoDF_Updated")
                .master("local[2]")
                .config("spark.some.config.option", "some-value")
                .getOrCreate();

        StructType schema = DataTypes
                .createStructType(new StructField[] {
                        DataTypes.createStructField("eid", DataTypes.IntegerType, false),
                        DataTypes.createStructField("eName", DataTypes.StringType, false),
                        DataTypes.createStructField("eAge", DataTypes.IntegerType, true),
                        DataTypes.createStructField("eDept", DataTypes.IntegerType, true),
                        DataTypes.createStructField("eSal", DataTypes.IntegerType, true),
                        DataTypes.createStructField("eGen", DataTypes.StringType,true)});


        String filepath = "F:/Hadoop/Data/EMPData.txt";
        JavaRDD<Row> empRDD = spark.read()
                .textFile(filepath)
                .javaRDD()
                .map(line -> line.split("\\,"))
                .map(r -> RowFactory.create(Integer.parseInt(r[0]), r[1].trim(),Integer.parseInt(r[2]),
                        Integer.parseInt(r[3]),Integer.parseInt(r[4]),r[5].trim() ));


        Dataset<Row> empDF = spark.createDataFrame(empRDD, schema);
        empDF.groupBy("edept").max("esal").show();

Comments

0

Using Spark with Scala.

import org.apache.spark.sql.Row
import org.apache.spark.sql.types._

var hiveCtx = new HiveContext(sc)
var inputPath  = "input.csv"
var text = sc.textFile(inputPath)
var rows = text.map(line => line.split(",").map(_.trim)).map(a => Row.fromSeq(a))
var header = rows.first()
val schema = StructType(header.map(fieldName => StructField(fieldName.asInstanceOf[String],StringType,true)))

val df = hiveCtx.createDataframe(rows,schema)

This should work.

But for creating dataframe, would recommend you to use Spark-CSV.

Comments

Your Answer

By clicking “Post Your Answer”, you agree to our terms of service and acknowledge you have read our privacy policy.

Start asking to get answers

Find the answer to your question by asking.

Ask question

Explore related questions

See similar questions with these tags.