0

I am trying to create dataframe with proper schema after fetching data from text file. in RDD, all data types are strings however one of the field data type is interger, which i want to ensure that created as integer. So i created Structtype and created dataframe. but it throws an error as below.

Error Message:

--------------------------------------------------------------------------- Py4JJavaError Traceback (most recent call last) in () ----> 1 df.show()

/Users/nagaraju.n/spark-2.4.3-bin-hadoop2.7/python/pyspark/sql/dataframe.pyc in show(self, n, truncate, vertical) 376 """ 377 if isinstance(truncate, bool) and truncate: --> 378 print(self._jdf.showString(n, 20, vertical)) 379 else: 380 print(self._jdf.showString(n, int(truncate), vertical))

/Applications/anaconda2/lib/python2.7/site-packages/py4j/java_gateway.pyc in call(self, *args) 1284 answer = self.gateway_client.send_command(command) 1285 return_value = get_return_value( -> 1286 answer, self.gateway_client, self.target_id, self.name) 1287 1288 for temp_arg in temp_args:

/Users/nagaraju.n/spark-2.4.3-bin-hadoop2.7/python/pyspark/sql/utils.pyc in deco(*a, **kw) 61 def deco(*a, **kw): 62 try: ---> 63 return f(*a, **kw) 64 except py4j.protocol.Py4JJavaError as e: 65 s = e.java_exception.toString()

/Applications/anaconda2/lib/python2.7/site-packages/py4j/protocol.pyc in get_return_value(answer, gateway_client, target_id, name) 326 raise Py4JJavaError( 327 "An error occurred while calling {0}{1}{2}.\n". --> 328 format(target_id, ".", name), value) 329 else: 330 raise Py4JError(

Py4JJavaError: An error occurred while calling o64.showString. : org.apache.spark.SparkException: Job aborted due to stage failure: Task 0 in stage 3.0 failed 1 times, most recent failure: Lost task 0.0 in stage 3.0 (TID 5, localhost, executor driver): org.apache.spark.api.python.PythonException: Traceback (most recent call last): File "/Users/nagaraju.n/spark-2.4.3-bin-hadoop2.7/python/lib/pyspark.zip/pyspark/worker.py", line 377, in main process() File "/Users/nagaraju.n/spark-2.4.3-bin-hadoop2.7/python/lib/pyspark.zip/pyspark/worker.py", line 372, in process serializer.dump_stream(func(split_index, iterator), outfile) File "/Users/nagaraju.n/spark-2.4.3-bin-hadoop2.7/python/lib/pyspark.zip/pyspark/serializers.py", line 393, in dump_stream vs = list(itertools.islice(iterator, batch)) File "/Users/nagaraju.n/spark-2.4.3-bin-hadoop2.7/python/lib/pyspark.zip/pyspark/util.py", line 99, in wrapper return f(*args, **kwargs) File "/Users/nagaraju.n/spark-2.4.3-bin-hadoop2.7/python/pyspark/sql/session.py", line 730, in prepare verify_func(obj) File "/Users/nagaraju.n/spark-2.4.3-bin-hadoop2.7/python/pyspark/sql/types.py", line 1389, in verify verify_value(obj) File "/Users/nagaraju.n/spark-2.4.3-bin-hadoop2.7/python/pyspark/sql/types.py", line 1370, in verify_struct verifier(v) File "/Users/nagaraju.n/spark-2.4.3-bin-hadoop2.7/python/pyspark/sql/types.py", line 1389, in verify verify_value(obj) File "/Users/nagaraju.n/spark-2.4.3-bin-hadoop2.7/python/pyspark/sql/types.py", line 1315, in verify_integer verify_acceptable_types(obj) File "/Users/nagaraju.n/spark-2.4.3-bin-hadoop2.7/python/pyspark/sql/types.py", line 1278, in verify_acceptable_types % (dataType, obj, type(obj)))) TypeError: field id: IntegerType can not accept object u'1' in type

at org.apache.spark.api.python.BasePythonRunner$ReaderIterator.handlePythonException(PythonRunner.scala:452) at org.apache.spark.api.python.PythonRunner$$anon$1.read(PythonRunner.scala:588) at org.apache.spark.api.python.PythonRunner$$anon$1.read(PythonRunner.scala:571) at org.apache.spark.api.python.BasePythonRunner$ReaderIterator.hasNext(PythonRunner.scala:406) at org.apache.spark.InterruptibleIterator.hasNext(InterruptibleIterator.scala:37) at scala.collection.Iterator$$anon$12.hasNext(Iterator.scala:440) at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:409) at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:409) at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:409) at org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIteratorForCodegenStage1.processNext(Unknown Source) at org.apache.spark.sql.execution.BufferedRowIterator.hasNext(BufferedRowIterator.java:43) at org.apache.spark.sql.execution.WholeStageCodegenExec$$anonfun$13$$anon$1.hasNext(WholeStageCodegenExec.scala:636) at org.apache.spark.sql.execution.SparkPlan$$anonfun$2.apply(SparkPlan.scala:255) at org.apache.spark.sql.execution.SparkPlan$$anonfun$2.apply(SparkPlan.scala:247) at org.apache.spark.rdd.RDD$$anonfun$mapPartitionsInternal$1$$anonfun$apply$24.apply(RDD.scala:836) at org.apache.spark.rdd.RDD$$anonfun$mapPartitionsInternal$1$$anonfun$apply$24.apply(RDD.scala:836) at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52) at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:324) at org.apache.spark.rdd.RDD.iterator(RDD.scala:288) at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52) at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:324) at org.apache.spark.rdd.RDD.iterator(RDD.scala:288) at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:90) at org.apache.spark.scheduler.Task.run(Task.scala:121) at org.apache.spark.executor.Executor$TaskRunner$$anonfun$10.apply(Executor.scala:408) at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1360) at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:414) at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149) at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624) at java.lang.Thread.run(Thread.java:748)

Driver stacktrace: at org.apache.spark.scheduler.DAGScheduler.org$apache$spark$scheduler$DAGScheduler$$failJobAndIndependentStages(DAGScheduler.scala:1889) at org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1877) at org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1876) at scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59) at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:48) at org.apache.spark.scheduler.DAGScheduler.abortStage(DAGScheduler.scala:1876) at org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:926) at org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:926) at scala.Option.foreach(Option.scala:257) at org.apache.spark.scheduler.DAGScheduler.handleTaskSetFailed(DAGScheduler.scala:926) at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.doOnReceive(DAGScheduler.scala:2110) at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:2059) at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:2048) at org.apache.spark.util.EventLoop$$anon$1.run(EventLoop.scala:49) at org.apache.spark.scheduler.DAGScheduler.runJob(DAGScheduler.scala:737) at org.apache.spark.SparkContext.runJob(SparkContext.scala:2061) at org.apache.spark.SparkContext.runJob(SparkContext.scala:2082) at org.apache.spark.SparkContext.runJob(SparkContext.scala:2101) at org.apache.spark.sql.execution.SparkPlan.executeTake(SparkPlan.scala:365) at org.apache.spark.sql.execution.CollectLimitExec.executeCollect(limit.scala:38) at org.apache.spark.sql.Dataset.org$apache$spark$sql$Dataset$$collectFromPlan(Dataset.scala:3383) at org.apache.spark.sql.Dataset$$anonfun$head$1.apply(Dataset.scala:2544) at org.apache.spark.sql.Dataset$$anonfun$head$1.apply(Dataset.scala:2544) at org.apache.spark.sql.Dataset$$anonfun$53.apply(Dataset.scala:3364) at org.apache.spark.sql.execution.SQLExecution$$anonfun$withNewExecutionId$1.apply(SQLExecution.scala:78) at org.apache.spark.sql.execution.SQLExecution$.withSQLConfPropagated(SQLExecution.scala:125) at org.apache.spark.sql.execution.SQLExecution$.withNewExecutionId(SQLExecution.scala:73) at org.apache.spark.sql.Dataset.withAction(Dataset.scala:3363) at org.apache.spark.sql.Dataset.head(Dataset.scala:2544) at org.apache.spark.sql.Dataset.take(Dataset.scala:2758) at org.apache.spark.sql.Dataset.getRows(Dataset.scala:254) at org.apache.spark.sql.Dataset.showString(Dataset.scala:291) at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62) at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) at java.lang.reflect.Method.invoke(Method.java:498) at py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:244) at py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:357) at py4j.Gateway.invoke(Gateway.java:282) at py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:132) at py4j.commands.CallCommand.execute(CallCommand.java:79) at py4j.GatewayConnection.run(GatewayConnection.java:238) at java.lang.Thread.run(Thread.java:748) Caused by: org.apache.spark.api.python.PythonException: Traceback (most recent call last): File "/Users/nagaraju.n/spark-2.4.3-bin-hadoop2.7/python/lib/pyspark.zip/pyspark/worker.py", line 377, in main process() File "/Users/nagaraju.n/spark-2.4.3-bin-hadoop2.7/python/lib/pyspark.zip/pyspark/worker.py", line 372, in process serializer.dump_stream(func(split_index, iterator), outfile) File "/Users/nagaraju.n/spark-2.4.3-bin-hadoop2.7/python/lib/pyspark.zip/pyspark/serializers.py", line 393, in dump_stream vs = list(itertools.islice(iterator, batch)) File "/Users/nagaraju.n/spark-2.4.3-bin-hadoop2.7/python/lib/pyspark.zip/pyspark/util.py", line 99, in wrapper return f(*args, **kwargs) File "/Users/nagaraju.n/spark-2.4.3-bin-hadoop2.7/python/pyspark/sql/session.py", line 730, in prepare verify_func(obj) File "/Users/nagaraju.n/spark-2.4.3-bin-hadoop2.7/python/pyspark/sql/types.py", line 1389, in verify verify_value(obj) File "/Users/nagaraju.n/spark-2.4.3-bin-hadoop2.7/python/pyspark/sql/types.py", line 1370, in verify_struct verifier(v) File "/Users/nagaraju.n/spark-2.4.3-bin-hadoop2.7/python/pyspark/sql/types.py", line 1389, in verify verify_value(obj) File "/Users/nagaraju.n/spark-2.4.3-bin-hadoop2.7/python/pyspark/sql/types.py", line 1315, in verify_integer verify_acceptable_types(obj) File "/Users/nagaraju.n/spark-2.4.3-bin-hadoop2.7/python/pyspark/sql/types.py", line 1278, in verify_acceptable_types % (dataType, obj, type(obj)))) TypeError: field id: IntegerType can not accept object u'1' in type

at org.apache.spark.api.python.BasePythonRunner$ReaderIterator.handlePythonException(PythonRunner.scala:452) at org.apache.spark.api.python.PythonRunner$$anon$1.read(PythonRunner.scala:588) at org.apache.spark.api.python.PythonRunner$$anon$1.read(PythonRunner.scala:571) at org.apache.spark.api.python.BasePythonRunner$ReaderIterator.hasNext(PythonRunner.scala:406) at org.apache.spark.InterruptibleIterator.hasNext(InterruptibleIterator.scala:37) at scala.collection.Iterator$$anon$12.hasNext(Iterator.scala:440) at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:409) at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:409) at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:409) at org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIteratorForCodegenStage1.processNext(Unknown Source) at org.apache.spark.sql.execution.BufferedRowIterator.hasNext(BufferedRowIterator.java:43) at org.apache.spark.sql.execution.WholeStageCodegenExec$$anonfun$13$$anon$1.hasNext(WholeStageCodegenExec.scala:636) at org.apache.spark.sql.execution.SparkPlan$$anonfun$2.apply(SparkPlan.scala:255) at org.apache.spark.sql.execution.SparkPlan$$anonfun$2.apply(SparkPlan.scala:247) at org.apache.spark.rdd.RDD$$anonfun$mapPartitionsInternal$1$$anonfun$apply$24.apply(RDD.scala:836) at org.apache.spark.rdd.RDD$$anonfun$mapPartitionsInternal$1$$anonfun$apply$24.apply(RDD.scala:836) at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52) at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:324) at org.apache.spark.rdd.RDD.iterator(RDD.scala:288) at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52) at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:324) at org.apache.spark.rdd.RDD.iterator(RDD.scala:288) at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:90) at org.apache.spark.scheduler.Task.run(Task.scala:121) at org.apache.spark.executor.Executor$TaskRunner$$anonfun$10.apply(Executor.scala:408) at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1360) at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:414) at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149) at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624) ... 1 more

#!/usr/bin/env python

coding: utf-8

In[11]:

import os import sys from pyspark import SparkContext from pyspark.sql import SparkSession from pyspark.sql.types import * spark=SparkSession.builder.getOrCreate() sc = SparkContext.getOrCreate()

In[12]:

Reads data from file and creates rdd rdd=sc.textFile('/Users/nagaraju.n/Downloads/sample_data.txt')

In[13]:

type(rdd)

In[14]:

rdd_data=rdd.map(lambda p: p.split(","))

In[15]:

rdd_data.collect()

In[16]:

print(rdd_data)

In[17]:

orig_header=rdd_data.first()

In[18]:

type(orig_header)

In[19]:

rdd_withoutheader=rdd_data.filter(lambda p:p != orig_header)

In[20]:

rdd_withoutheader.collect()

In[21]:

Create Schema header = StructType([StructField("id", IntegerType(), True),StructField("first_name", StringType(),

True),StructField("last_name", StringType(), True),StructField("email", StringType(), True),StructField("phone", StringType(), True),StructField("city", StringType(), True),StructField("country", StringType(), True)])

In[22]:

header

In[23]:

df=spark.createDataFrame(rdd_withoutheader,header)

In[24]:

df.show()

1

1 Answer 1

0

/// Part of your code:

header = StructType([StructField("stockticker", StringType(), True),StructField("tradedate", IntegerType(), True),StructField("openprice", FloatType(), True),StructField("highprice", FloatType(), True),StructField("lowprice", FloatType(), True),StructField("closeprice", FloatType(), True),StructField("volume", IntegerType(), True)])

df=spark.createDataFrame(rdd_data,header)

///

My answer:

Schema is used most to avoid a full table scan to infer types and doesn't perform any type casting. Hence above method best works for Json/avro/parquet input files not for text files. For textfiles following are the best methods:

Method 1 based on your code, convert rdd to dataframe and define schema as below:

rdd=sc.textFile('/Users/nagaraju.n/Downloads/sample_data.txt')

df_noType=data.map(lambda p: p.split(",")).toDF(["id", "first_name", "last_name", "email", "phone", "city", "country"])

Now you can type cast either of these ways:

Way1:

df_typecast=df_noType.select(df_noType.id.cast('int'), df_noType.first_name, df_noType.last_name, df_noType.email, df_noType.phone, df_noType.city, df_noType.country)

Note: in above line no need to type cast other fields to string as they are bydefault string

Note: if decimals are there then you can use df_noType.id.cast('float')

(or)

way2:

from pyspark.sql.types import *

df_typecast=df_noType.select(df_noType.id.cast(IntegerType()), df_noType.first_name.cast(StringType()), df_noType.last_name.cast(StringType()), df_noType.email.cast(StringType()), df_noType.phone.cast(StringType()), df_noType.city.cast(StringType()), df_noType.country.cast(StringType()))

Method 2: I usually use this always which I feel best and easy

rdd=sc.textFile('/Users/nagaraju.n/Downloads/sample_data.txt')

from pyspark.sql import Row

df=rdd.map(lambda p: Row(id= int(p.split(",")[0]), first_name= p.split(",")[1], last_name= p.split(",")[2], email= p.split(",")[3], phone= p.split(",")[4], city= p.split(",")[5], country=p.split(",")[6])).toDF()

df.printSchema()

Note: if decimals are there then you can use float(p.split(",")[0])

Sign up to request clarification or add additional context in comments.

2 Comments

Thanks Sri, I got the below error when i try Method2: ValueError: invalid literal for int() with base 10: 'id'
hi you have to remove first line, from your data which is header before applying above function. Hope you got. your data first line is ' id, first_name and so on'. since id is string its giving value error.

Your Answer

By clicking “Post Your Answer”, you agree to our terms of service and acknowledge you have read our privacy policy.

Start asking to get answers

Find the answer to your question by asking.

Ask question

Explore related questions

See similar questions with these tags.