Articles of pyspark

Obtener el lunes pasado en Spark

Estoy usando Spark 2.0 con la API de Python. Tengo un dataframe con una columna de tipo DateType (). Me gustaría agregar una columna al dataframe que contiene el lunes más reciente. Puedo hacerlo así: reg_schema = pyspark.sql.types.StructType([ pyspark.sql.types.StructField(‘AccountCreationDate’, pyspark.sql.types.DateType(), True), pyspark.sql.types.StructField(‘UserId’, pyspark.sql.types.LongType(), True) ]) reg = spark.read.schema(reg_schema).option(‘header’, True).csv(path_to_file) reg = reg.withColumn(‘monday’, pyspark.sql.functions.when(pyspark.sql.functions.date_format(reg.AccountCreationDate,’E’) == ‘Mon’, […]

Filtrado de columnas en PySpark

Tengo un df dataframe cargado de la tabla Hive y tiene una columna de marca de tiempo, por ejemplo, ts , con el tipo de cadena de formato dd-MMM-yy hh.mm.ss.MS a (convertido a la biblioteca datetime de python, esto es %d-%b-%y %I.%M.%S.%f %p ). Ahora quiero filtrar las filas del dataframe que son de los […]

Composición de la función de fila PySpark

Como ejemplo simplificado, tengo un dataframe “df” con las columnas “col1, col2” y quiero calcular un máximo de filas después de aplicar una función a cada columna: def f(x): return (x+1) max_udf=udf(lambda x,y: max(x,y), IntegerType()) f_udf=udf(f, IntegerType()) df2=df.withColumn(“result”, max_udf(f_udf(df.col1),f_udf(df.col2))) Así que si df: col1 col2 1 2 3 0 Entonces df2: col1 col2 result 1 […]

Módulo definido por el usuario pyspark o archivos .py

Construí un módulo de Python y quiero importarlo en mi aplicación pyspark. La estructura de mi directorio de paquetes es: wesam/ |– data.py `– __init__.py Un simple import wesam en la parte superior de mi script pyspark conduce a ImportError: No module named wesam . También intenté comprimirlo y enviarlo con mi código con –py-files […]

Convertir una nueva columna derivada en un DataFrame de booleano a entero

Supongamos que tengo un DataFrame x con este esquema: xSchema = StructType([ \ StructField(“a”, DoubleType(), True), \ StructField(“b”, DoubleType(), True), \ StructField(“c”, DoubleType(), True)]) Entonces tengo el DataFrame: DataFrame[a :double, b:double, c:double] Me gustaría tener una columna derivada de enteros. Soy capaz de crear una columna booleana: x = x.withColumn(‘y’, (xa-xb)/xc > 1) Mi nuevo […]

¿Qué tipo debería ser, después de usar .toArray () para un vector Spark?

Quiero transferir mi vector a array, entonces uso get_array = udf(lambda x: x.toArray(),ArrayType(DoubleType())) result3 = result2.withColumn(‘list’,get_array(‘features’)) result3.show() donde las features la columna es dtype vector. Pero Spark me dice que net.razorvine.pickle.PickleException: expected zero arguments for construction of ClassDict (for numpy.core.multiarray._reconstruct) Sé que la razón debe ser del tipo que uso en la UDF, así que […]

PySpark reduce la agregaciónByKey después de collect_list en una columna

Quiero tomar el siguiente ejemplo para hacer mi agregación de acuerdo con los ‘estados’ recostackdos por collect_list. código de ejemplo: states = sc.parallelize([“TX”,”TX”,”CA”,”TX”,”CA”]) states.map(lambda x:(x,1)).reduceByKey(operator.add).collect() #printed output: [(‘TX’, 3), (‘CA’, 2)] mi código: from pyspark import SparkContext,SparkConf from pyspark.sql.session import SparkSession from pyspark.sql.functions import collect_list import operator conf = SparkConf().setMaster(“local”) conf = conf.setAppName(“test”) sc = […]

¿Son compatibles las semillas aleatorias entre sistemas?

Hice un modelo de bosque aleatorio utilizando el paquete sklearn de python donde establecí el valor de inicialización, por ejemplo, en 1234 . Para la producción de modelos, utilizamos pyspark. Si tuviera que pasar los mismos hiperparmetros y el mismo valor de semilla, es decir, 1234 , ¿obtendré los mismos resultados? Básicamente, ¿los números de […]

Cómo distribuir el grupo de multiprocesamiento a los trabajadores de la chispa

Estoy tratando de usar el multiprocesamiento para leer 100 archivos CSV en paralelo (y luego procesarlos por separado en paralelo). Aquí está mi código que se ejecuta en Jupyter alojado en mi nodo maestro de EMR en AWS. (Eventualmente serán archivos de 100k csv, de ahí la necesidad de lectura distribuida). import findspark import boto3 […]

problema al codificar una característica no numérica a numérica en Spark e Ipython

Estoy trabajando en algo en lo que tengo que hacer predicciones para datos numeric (gastos mensuales de los empleados) usando características non-numeric . Estoy usando Spark MLlibs Random Forests algorthim . Tengo los datos de mis features en un dataframe datos que se ve así: _1 _2 _3 _4 0 Level1 Male New York New […]