Python envía datos CSV a la transmisión de chispas

Me gustaría intentar cargar datos CSV en Python y transmitir cada chispa de fila a través de SPark Streaming.

Soy bastante nuevo en cosas de red. No sé exactamente si se supone que debo crear un script de python para el servidor que, una vez que establezca una conexión (con chispa de transmisión), comenzará a enviar cada fila. En la Documentación de Spark Streaming hacen un nc -l 9999 que es un servidor netcat que escucha en el puerto 9999 si estoy correcto. Así que intenté crear un script de Python similar que analiza un CSV y lo envía en el puerto 60000

import socket # Import socket module import csv port = 60000 # Reserve a port for your service. s = socket.socket() # Create a socket object host = socket.gethostname() # Get local machine name s.bind((host, port)) # Bind to the port s.listen(5) # Now wait for client connection. print('Server listening....') while True: conn, addr = s.accept() # Establish connection with client. print('Got connection from', addr) csvfile = open('Titantic.csv', 'rb') reader = csv.reader(csvfile, delimiter = ',') for row in reader: line = ','.join(row) conn.send(line) print(line) csvfile.close() print('Done sending') conn.send('Thank you for connecting') conn.close() 

SPark Streaming Script –

 from pyspark import SparkContext from pyspark.streaming import StreamingContext ssc = StreamingContext(sc, 1) # Create a DStream that will connect to hostname:port, like localhost:9999 lines_RDD = ssc.socketTextStream("localhost", 60000) # Split each line into words data_RDD = lines_RDD.flatMap(lambda line: line.split(",")) data_RDD.pprint() ssc.start() # Start the computation ssc.awaitTermination() # Wait for the computation to terminate 

Cuando ejecuto el script de chispa (esto está en Jupyter Notebooks por cierto) recibo este error – IllegalArgumentException: ‘error de requisito: no hay operaciones de salida registradas, así que no hay nada que ejecutar’

No creo que esté haciendo mi script de socket correctamente, pero no estoy realmente seguro de qué hacer. Básicamente, estoy intentando replicar lo que hace nc -lk 9999, por lo que puedo enviar datos de texto a través del puerto y luego la transmisión de chispas lo está escuchando y recibe la Los datos y los procesa.

Cualquier ayuda sería apreciada grandemente

Estoy tratando de hacer algo similar, pero quiero transmitir una fila cada 10 segundos. Resolví con este script:

 import socket from time import sleep host = 'localhost' port = 12345 s = socket.socket(socket.AF_INET, socket.SOCK_STREAM) s.bind((host, port)) s.listen(1) while True: print('\nListening for a client at',host , port) conn, addr = s.accept() print('\nConnected by', addr) try: print('\nReading file...\n') with open('iris_test.csv') as f: for line in f: out = line.encode('utf-8') print('Sending line',line) conn.send(out) sleep(10) print('End Of Stream.') except socket.error: print ('Error Occured.\n\nClient disconnected.\n') conn.close() 

Espero que esto ayude.