Articles of pyspark

Spark (pyspark) que tiene dificultades para llamar a métodos de estadísticas en el nodo de trabajo

Estoy ejecutando un error de biblioteca cuando ejecuto pyspark (desde un ipython-notebook), quiero usar Statistics.chiSqTest(obs) de pyspark.mllib.stat en una operación .mapValues en mi RDD que contiene (key, list (int) ) pares. En el nodo maestro, si colecciono el RDD como un mapa, e itero sobre los valores como, entonces no tengo problemas keys_to_bucketed = vectors.collectAsMap() […]

spark en python: creando un rdd cargando datos binarios con numpy.fromfile

La api de python de la chispa actualmente tiene soporte limitado para cargar archivos de datos binarios de gran tamaño, por lo que traté de obtener numpy.fromfile para ayudarme. Primero obtuve una lista de nombres de archivos que me gustaría cargar, por ejemplo: In [9] filenames Out[9]: [‘A0000.dat’, ‘A0001.dat’, ‘A0002.dat’, ‘A0003.dat’, ‘A0004.dat’] Puedo cargar estos […]

¿Cómo convertir una columna con tipo de cadena a forma int en el dataframe pyspark?

Tengo dataframe en pyspark. Algunas de sus columnas numéricas contienen ‘nan’, de modo que cuando estoy leyendo los datos y revisando el esquema del dataframe, esas columnas tendrán el tipo ‘cadena’. Cómo puedo cambiarlos a tipo int. Reemplacé los valores de ‘nan’ con 0 y de nuevo verifiqué el esquema, pero también muestra el tipo […]

PySpark: StructField (…,…, False) siempre devuelve `nullable = true` en lugar de` nullable = false`

Soy nuevo en PySpark y me enfrento a un problema extraño. Estoy tratando de establecer una columna en no anulable mientras se carga un conjunto de datos CSV. Puedo reproducir mi caso con un conjunto de datos muy pequeño ( test.csv ): col1,col2,col3 11,12,13 21,22,23 31,32,33 41,42,43 51,,53 Hay un valor nulo en la fila […]

Spark 2.2 Error del servidor Thrift en el dataframe NumberFormatException al consultar la tabla Hive

Tengo Hortonworks HDP 2.6.3 ejecutando Spark2 (v2.2). Mi caso de prueba es muy simple: Crea una tabla Hive con algunos valores aleatorios. Colmena en el puerto 10000 Encienda el servidor Spark Thrift en 10016 Ejecute pyspark y consulte la tabla Hive a través de 10016 Sin embargo, no pude obtener los datos de Spark debido […]

Guardando RDD como archivo de secuencia en pyspark

Puedo ejecutar esta secuencia de comandos para guardar el archivo en formato de texto, pero cuando bash ejecutar saveAsSequenceFile, se está produciendo un error. Si alguien tiene una idea sobre cómo guardar el RDD como archivo de secuencia, hágame saber el proceso. Intenté buscar una solución en “Learning Spark” y en la documentación oficial de […]

Capacitación de un modelo de aprendizaje automático en partes seleccionadas de DataFrame en PySpark

Necesito ejecutar un bosque aleatorio en un conjunto de datos. El conjunto de datos está en un DataFrame organizado de la siguiente manera: training_set_all = sc.parallelize([ [‘u1’, 1, 0.9, 0.5, 0.0], [‘u1’, 0, 0.5, 0.1, 0.0], [‘u2’, 1, 0.3, 0.3, 0.8], [‘u3’, 1, 0.2, 0.2, 0.6], [‘u2’, 0, 0.0, 0.1, 0.4], … ]).toDF((‘status’, ‘user’, ‘product’, […]

¿Puedo conectar un proceso externo (R) a cada trabajador de pyspark durante la configuración?

Quiero que cada trabajador de Python inicie un shell R utilizando rpy2. ¿Puedo hacer esto durante algún tipo de fase de configuración similar a la forma en que asumo que esto sucederá cuando importe un módulo Python para usarlo en tareas ejecutoras posteriores? Por ejemplo: import numpy as np df.mapPartitions(lambda x: np.zeros(x)) En mi caso, […]

Pyspark Guardar el dataframe a S3

Quiero guardar el dataframe en s3, pero cuando guardo el archivo en s3, crea un archivo vacío con ${folder_name} , en el que quiero guardar el archivo. Sintaxis para guardar el dataframe: – f.write.parquet(“s3n://bucket-name/shri/test”) Guarda el archivo en la carpeta de prueba pero crea $test en shri . ¿Hay alguna manera de guardarlo sin crear […]

Guardando datos de nuevo en Cassandra como RDD

Estoy tratando de leer los mensajes de Kafka, procesar los datos y luego agregarlos a Cassandra como si fuera un RDD. Mi problema es guardar los datos de nuevo en Cassandra. from __future__ import print_function from pyspark.streaming import StreamingContext from pyspark.streaming.kafka import KafkaUtils from pyspark import SparkConf, SparkContext appName = ‘Kafka_Cassandra_Test’ kafkaBrokers = ‘1.2.3.4:9092’ topic […]