Pyspark – ValueError: no se pudo convertir la cadena a un literal flotante / inválido para float ()

Estoy tratando de usar datos de un dataframe de chispa como entrada para mi modelo de k-means. Sin embargo sigo recibiendo errores. (Ver sección después del código)

Mi dataframe de chispa y se ve así (y tiene alrededor de 1M filas):

ID col1 col2 Latitude Longitude 13 ... ... 22.2 13.5 62 ... ... 21.4 13.8 24 ... ... 21.8 14.1 71 ... ... 28.9 18.0 ... ... ... .... .... 

Aquí está mi código:

 from pyspark.ml.clustering import KMeans from pyspark.ml.linalg import Vectors df = spark.read.csv("file.csv") spark_rdd = df.rdd.map(lambda row: (row["ID"], Vectors.dense(row["Latitude"],row["Longitude"]))) feature_df = spark_rdd.toDF(["ID", "features"]) kmeans = KMeans().setK(2).setSeed(1) model = kmeans.fit(feature_df) sum_of_square_error = model.computeCost(feature_df) print str(sum_of_square_error) centers = model.clusterCenters() for center in centers: print(center) 

Sin embargo, me sale el error:


 Py4JJavaError Traceback (most recent call last)  in () 7 8 kmeans = KMeans().setK(2).setSeed(1) ----> 9 model = kmeans.fit(feature_df) 10 11 ~/Downloads/spark-2.1.0-bin-hadoop2.7/python/pyspark/ml/base.py in fit(self, dataset, params) 62 return self.copy(params)._fit(dataset) 63 else: ---> 64 return self._fit(dataset) 65 else: 66 raise ValueError("Params must be either a param map or a list/tuple of param maps, " ~/Downloads/spark-2.1.0-bin-hadoop2.7/python/pyspark/ml/wrapper.py in _fit(self, dataset) 234 235 def _fit(self, dataset): --> 236 java_model = self._fit_java(dataset) 237 return self._create_model(java_model) 238 ~/Downloads/spark-2.1.0-bin-hadoop2.7/python/pyspark/ml/wrapper.py in _fit_java(self, dataset) 231 """ 232 self._transfer_params_to_java() --> 233 return self._java_obj.fit(dataset._jdf) 234 235 def _fit(self, dataset): /usr/local/lib/python2.7/dist-packages/py4j/java_gateway.pyc in __call__(self, *args) 1131 answer = self.gateway_client.send_command(command) 1132 return_value = get_return_value( -> 1133 answer, self.gateway_client, self.target_id, self.name) 1134 1135 for temp_arg in temp_args: ~/Downloads/spark-2.1.0-bin-hadoop2.7/python/pyspark/sql/utils.pyc in deco(*a, **kw) 61 def deco(*a, **kw): 62 try: ---> 63 return f(*a, **kw) 64 except py4j.protocol.Py4JJavaError as e: 65 s = e.java_exception.toString() /usr/local/lib/python2.7/dist-packages/py4j/protocol.pyc in get_return_value(answer, gateway_client, target_id, name) 317 raise Py4JJavaError( 318 "An error occurred while calling {0}{1}{2}.\n". --> 319 format(target_id, ".", name), value) 320 else: 321 raise Py4JError( Py4JJavaError: An error occurred while calling o3552.fit. : org.apache.spark.SparkException: Job aborted due to stage failure: Task 5 in stage 457.0 failed 4 times, most recent failure: Lost task 5.3 in stage 457.0 (TID 2308, 10.3.1.31, executor 1): org.apache.spark.api.python.PythonException: Traceback (most recent call last): File "~/Downloads/spark-2.1.0-bin-hadoop2.7/python/pyspark/worker.py", line 174, in main process() File "~/Downloads/spark-2.1.0-bin-hadoop2.7/python/pyspark/worker.py", line 169, in process serializer.dump_stream(func(split_index, iterator), outfile) File "~/Downloads/spark-2.1.0-bin-hadoop2.7/python/pyspark/serializers.py", line 268, in dump_stream vs = list(itertools.islice(iterator, batch)) File "", line 4, in  File "~/Downloads/spark-2.1.0-bin-hadoop2.7/python/pyspark/ml/linalg/__init__.py", line 790, in dense return DenseVector(elements) File "~/Downloads/spark-2.1.0-bin-hadoop2.7/python/pyspark/ml/linalg/__init__.py", line 275, in __init__ ar = np.array(ar, dtype=np.float64) ValueError: could not convert string to float: GOLF at org.apache.spark.api.python.PythonRunner$$anon$1.read(PythonRDD.scala:193) at org.apache.spark.api.python.PythonRunner$$anon$1.next(PythonRDD.scala:156) at org.apache.spark.api.python.PythonRunner$$anon$1.next(PythonRDD.scala:152) at org.apache.spark.InterruptibleIterator.next(InterruptibleIterator.scala:43) at scala.collection.Iterator$$anon$12.nextCur(Iterator.scala:434) at scala.collection.Iterator$$anon$12.hasNext(Iterator.scala:440) at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:408) at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:408) at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:408) at org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIterator.processNext(Unknown Source) at org.apache.spark.sql.execution.BufferedRowIterator.hasNext(BufferedRowIterator.java:43) at org.apache.spark.sql.execution.WholeStageCodegenExec$$anonfun$8$$anon$1.hasNext(WholeStageCodegenExec.scala:377) at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:408) at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:408) at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:408) at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:408) at org.apache.spark.storage.memory.MemoryStore.putIteratorAsValues(MemoryStore.scala:215) at org.apache.spark.storage.BlockManager$$anonfun$doPutIterator$1.apply(BlockManager.scala:957) at org.apache.spark.storage.BlockManager$$anonfun$doPutIterator$1.apply(BlockManager.scala:948) at org.apache.spark.storage.BlockManager.doPut(BlockManager.scala:888) at org.apache.spark.storage.BlockManager.doPutIterator(BlockManager.scala:948) at org.apache.spark.storage.BlockManager.getOrElseUpdate(BlockManager.scala:694) at org.apache.spark.rdd.RDD.getOrCompute(RDD.scala:334) at org.apache.spark.rdd.RDD.iterator(RDD.scala:285) at org.apache.spark.rdd.ZippedPartitionsRDD2.compute(ZippedPartitionsRDD.scala:89) at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:323) at org.apache.spark.rdd.RDD.iterator(RDD.scala:287) at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38) at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:323) at org.apache.spark.rdd.RDD.iterator(RDD.scala:287) at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:87) at org.apache.spark.scheduler.Task.run(Task.scala:99) at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:282) at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142) at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617) at java.lang.Thread.run(Thread.java:748) Driver stacktrace: at org.apache.spark.scheduler.DAGScheduler.org$apache$spark$scheduler$DAGScheduler$$failJobAndIndependentStages(DAGScheduler.scala:1435) at org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1423) at org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1422) at scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59) at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:48) at org.apache.spark.scheduler.DAGScheduler.abortStage(DAGScheduler.scala:1422) at org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:802) at org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:802) at scala.Option.foreach(Option.scala:257) at org.apache.spark.scheduler.DAGScheduler.handleTaskSetFailed(DAGScheduler.scala:802) at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.doOnReceive(DAGScheduler.scala:1650) at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:1605) at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:1594) at org.apache.spark.util.EventLoop$$anon$1.run(EventLoop.scala:48) at org.apache.spark.scheduler.DAGScheduler.runJob(DAGScheduler.scala:628) at org.apache.spark.SparkContext.runJob(SparkContext.scala:1918) at org.apache.spark.SparkContext.runJob(SparkContext.scala:1931) at org.apache.spark.SparkContext.runJob(SparkContext.scala:1944) at org.apache.spark.SparkContext.runJob(SparkContext.scala:1958) at org.apache.spark.rdd.RDD.count(RDD.scala:1157) at org.apache.spark.rdd.RDD$$anonfun$takeSample$1.apply(RDD.scala:567) at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151) at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:112) at org.apache.spark.rdd.RDD.withScope(RDD.scala:362) at org.apache.spark.rdd.RDD.takeSample(RDD.scala:556) at org.apache.spark.mllib.clustering.KMeans.initKMeansParallel(KMeans.scala:353) at org.apache.spark.mllib.clustering.KMeans.runAlgorithm(KMeans.scala:256) at org.apache.spark.mllib.clustering.KMeans.run(KMeans.scala:227) at org.apache.spark.ml.clustering.KMeans.fit(KMeans.scala:319) at sun.reflect.GeneratedMethodAccessor89.invoke(Unknown Source) at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) at java.lang.reflect.Method.invoke(Method.java:498) at py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:244) at py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:357) at py4j.Gateway.invoke(Gateway.java:280) at py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:132) at py4j.commands.CallCommand.execute(CallCommand.java:79) at py4j.GatewayConnection.run(GatewayConnection.java:214) at java.lang.Thread.run(Thread.java:748) Caused by: org.apache.spark.api.python.PythonException: Traceback (most recent call last): File "~/Downloads/spark-2.1.0-bin-hadoop2.7/python/pyspark/worker.py", line 174, in main process() File "~/Downloads/spark-2.1.0-bin-hadoop2.7/python/pyspark/worker.py", line 169, in process serializer.dump_stream(func(split_index, iterator), outfile) File "~/Downloads/spark-2.1.0-bin-hadoop2.7/python/pyspark/serializers.py", line 268, in dump_stream vs = list(itertools.islice(iterator, batch)) File "", line 4, in  File "~/Downloads/spark-2.1.0-bin-hadoop2.7/python/pyspark/ml/linalg/__init__.py", line 790, in dense return DenseVector(elements) File "~/Downloads/spark-2.1.0-bin-hadoop2.7/python/pyspark/ml/linalg/__init__.py", line 275, in __init__ ar = np.array(ar, dtype=np.float64) ValueError: could not convert string to float: GOLFE at org.apache.spark.api.python.PythonRunner$$anon$1.read(PythonRDD.scala:193) at org.apache.spark.api.python.PythonRunner$$anon$1.next(PythonRDD.scala:156) at org.apache.spark.api.python.PythonRunner$$anon$1.next(PythonRDD.scala:152) at org.apache.spark.InterruptibleIterator.next(InterruptibleIterator.scala:43) at scala.collection.Iterator$$anon$12.nextCur(Iterator.scala:434) at scala.collection.Iterator$$anon$12.hasNext(Iterator.scala:440) at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:408) at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:408) at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:408) at org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIterator.processNext(Unknown Source) at org.apache.spark.sql.execution.BufferedRowIterator.hasNext(BufferedRowIterator.java:43) at org.apache.spark.sql.execution.WholeStageCodegenExec$$anonfun$8$$anon$1.hasNext(WholeStageCodegenExec.scala:377) at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:408) at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:408) at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:408) at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:408) at org.apache.spark.storage.memory.MemoryStore.putIteratorAsValues(MemoryStore.scala:215) at org.apache.spark.storage.BlockManager$$anonfun$doPutIterator$1.apply(BlockManager.scala:957) at org.apache.spark.storage.BlockManager$$anonfun$doPutIterator$1.apply(BlockManager.scala:948) at org.apache.spark.storage.BlockManager.doPut(BlockManager.scala:888) at org.apache.spark.storage.BlockManager.doPutIterator(BlockManager.scala:948) at org.apache.spark.storage.BlockManager.getOrElseUpdate(BlockManager.scala:694) at org.apache.spark.rdd.RDD.getOrCompute(RDD.scala:334) at org.apache.spark.rdd.RDD.iterator(RDD.scala:285) at org.apache.spark.rdd.ZippedPartitionsRDD2.compute(ZippedPartitionsRDD.scala:89) at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:323) at org.apache.spark.rdd.RDD.iterator(RDD.scala:287) at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38) at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:323) at org.apache.spark.rdd.RDD.iterator(RDD.scala:287) at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:87) at org.apache.spark.scheduler.Task.run(Task.scala:99) at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:282) at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142) at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617) ... 1 more 

Lo extraño es que el error es diferente cada vez que lo ejecuto. Los 3 tipos de errores que recibo son:

UnicodeEncodeError: 'decimal' codec can't encode characters in position 3-5: invalid decimal Unicode string

invalid literal for float(): 2017-04

ValueError: could not convert string to float: GOLF

Corríjame si estoy equivocado, pero creo que algún valor de los datos en las columnas puede ser incorrecto (por ejemplo, cadenas ocasionales dentro de la columna de latitud y longitud)

¿Hay alguna forma de verificar si el valor en cada fila de ‘Latitude’ es de hecho un flotante? ¿Hay alguna forma de verificar si el valor en cada fila de ‘ID’ es un entero? Me gustaría descartar las filas que contienen valores del tipo de datos incorrecto. Tal vez hay una manera de hacerlo usando df.filter() ?

Apreciaría mucho cualquier ayuda. Gracias.

ACTUALIZACIÓN: Incluso probé df.describe('ID', 'Latitude', 'Longitude').show() y devuelve valores numéricos para conteo, media, stddev, min, max para cada columna, indicándome que todos deben ser números ..?

quizás deberías haber continuado en el mismo hilo ya que es el mismo problema. Para referencia: Preprocesamiento de datos en pyspark.

Aquí debe convertir la Latitude / Longitude a flotar y eliminar los valores nulos con dropna antes de inyectar los datos en Kmean, porque parece que estas columnas contienen algunas cadenas que no se pueden convertir a un valor numérico, por lo tanto preprocesar df con algo como:

 df2 = (df .withColumn("Latitude", col("Latitude").cast("float")) .withColumn("Longitude", col("Longitude").cast("float")) .dropna() ) spark_rdd = df2.rdd ...