pyspark: ml + streaming

Según Combining Spark Streaming + MLlib es posible hacer una predicción sobre un flujo de entrada en spark.

El problema con el ejemplo dado (que funciona en mi clúster) es que testData es un derecho dado en el formato correcto.

Estoy intentando configurar un cliente servidor tcp intercambio basado en cadenas de datos. No puedo averiguar cómo transformar la cadena en el formato correcto.

mientras esto funciona:

sep = ";" str_recue = '0.0;0.1;0.2;0.3;0.4;0.5' rdd = sc.parallelize([str_recue]) chemin = "hdfs://xx.xx.xx.xx:8020/cart_model_for_cycliste_v2" model = DecisionTreeClassificationModel.load(chemin) # travail sur la string rdd2 = rdd.map( lambda data : data.split(sep)) rdd3 = rdd2.map(lambda tableau: [float(x) for x in tableau]) # création df cols = ["c1", "c2", "c3", "c4", "c5", "c6"] fields = [StructField(x, FloatType(), True) for x in cols] schema = StructType(fields) df = spark.createDataFrame(rdd3, schema=schema ) # preparation d'une colonne de features schema = StructType(fields) assembler = VectorAssembler() assembler = assembler.setInputCols(cols) assembler = assembler.setOutputCol("features") df2 = assembler.transform(df) model.transform(df2).show() 

dando:

     +---+---+---+---+---+---+--------------------+-------------+-----------+----------+ | c1| c2| c3| c4| c5| c6| features|rawPrediction|probability|prediction| +---+---+---+---+---+---+--------------------+-------------+-----------+----------+ |0.0|0.1|0.2|0.3|0.4|0.5|[0.0,0.1000000014...| [0.0,3426.0]| [0.0,1.0]| 1.0| +---+---+---+---+---+---+--------------------+-------------+-----------+----------+ 

    No puedo imaginar cómo hacer que funcione mientras escucho un zócalo.

    Tengo mi servidor:

     import socket import random import time port = 12003 ip = socket.gethostname() serversocket = socket.socket(socket.AF_INET, socket.SOCK_STREAM) serversocket.bind((ip, port)) serversocket.listen(1) (clientsocket, address) = serversocket.accept() nb_d_envois = 10 tps_attente = 3 for i in range(nb_d_envois): time.sleep(tps_attente) sep = ";" to_send = '0.0;0.1;0.2;0.3;0.4;0.5' print(to_send) clientsocket.send(to_send.encode()) 

    que envía una cadena a mi chispa Contexto de transmisión. ¿Qué hacer a continuación? Esta es mi pregunta. De acuerdo con: https://github.com/apache/spark/blob/master/examples/src/main/python/streaming/sql_network_wordcount.py debería ser posible hacer un [foreach]

    Así que creé una función:

     def prevoir(time, rdd): sep = ";" chemin = "hdfs://54.37.12.49:8020/cart_model_for_cycliste_v2" model = DecisionTreeClassificationModel.load(chemin) # travail sur la string rdd2 = rdd.map( lambda data : data.split(sep)) rdd3 = rdd2.map(lambda tableau: [float(x) for x in tableau]) # création df cols = ["c1", "c2", "c3", "c4", "c5", "c6"] fields = [StructField(x, FloatType(), True) for x in cols] schema = StructType(fields) df = spark.createDataFrame(rdd3, schema=schema ) # preparation d'une colonne de features schema = StructType(fields) assembler = VectorAssembler() assembler = assembler.setInputCols(cols) assembler = assembler.setOutputCol("features") df2 = assembler.transform(df) model.transform(df2).show() 

    y lo aplicó en un contexto de transmisión:

     ssc = StreamingContext(sc, 5) dstream = ssc.socketTextStream(listen_to_ip, listen_to_port) dstream.foreachRDD(prevoir) 

    pero no aparece nada (ni siquiera la información de tiempo normal). No hay errores tampoco.

    Mis dudas son:

    • La función no está registrada como UDF, por lo que sospecho que se puede llamar en absoluto

    • la carga del modelo a través de hdfs se debe realizar solo una vez y se pasa como parámetro

    • la función “mostrar” me parece que no está realmente distribuida (pero funciona cuando no se aplica en ‘foreachrdd’ … => tal vez debería guardar algo en hdfs?

    Cualquier ayuda bienvenida …

    Los datos no se envían desde el servidor al contexto de transmisión. El código es correcto.