Spark estructurado streaming: no escribir correctamente

Estoy transmitiendo registros de lectura de medidores como JSON desde kafka_2.11-0.10.0.1 a Spark 2.1. Cambié a la transmisión estructurada; y aunque kafka consumer confirma los datos entrantes, la consola y writeStream no se mueven. Estoy probando usando

pyspark --packages org.apache.spark:spark-sql-kafka-0-10_2.11:2.1.0 

Mi código:

 from pyspark.sql import SparkSession from pyspark.sql.functions import * from pyspark.sql.types import * spark = SparkSession \ .builder \ .appName("interval") \ .master("local[4]") \ .getOrCreate() schema = StructType().add("customer_id", StringType()) df = spark \ .readStream \ .format("kafka") \ .option("kafka.bootstrap.servers", "xx.xxx.xx.xxx:9092") \ .option("subscribe", "test") \ .option("startingOffsets", "earliest") \ .load() \ .select(from_json(col("value").cast("string"), schema).alias("parsed_value")) query = df.writeStream \ .option("checkpointLocation", "/user/XX/checkpoint5") \ .format("parquet") \ .start("/user/XX/interval5") 

Crea los directorios de punto de control y datos con un archivo de parquet de 388 bytes. Sin embargo, nunca se escriben datos transmitidos.

 $ hdfs dfs -ls interval5 drwxr-xr-x ... interval5/_spark_metadata -rw-r--r-- ... interval5/part-00000-0b2eb00a-c361-4dfe-a24e-9589d150a911.snappy.parquet -rw-r--r-- ... interval5/part-00000-e0cb12d1-9c29-4eb0-92a8-688f468a42ce.snappy.parquet 

kafka-consumer confirma que los datos están siendo enviados:

 {"customer_id":"customer_736"} {"customer_id":"customer_995"} {"customer_id":"customer_1899"} {"customer_id":"customer_35"} 

kafka-consumer muestra los datos transmitidos.

Creo que me estoy perdiendo un paso esencial para eliminar la cola y guardar las filas transmitidas: un día de arrastre stackoverflow no ha ayudado. (editado para eliminar las referencias a la consola, ya que no es relevante).

Con .option("startingOffsets", "latest") solo debe esperar los mensajes que se publicaron después de haber iniciado la consulta de transmisión.

Por lo tanto, el curso de acción esperado es iniciar la consulta de transmisión y luego publicar mensajes.

Nada está escrito en los archivos de parquet.

No verá nada guardado en los archivos de parquet ya que utilizó .format("console") . Tienes que cambiarlo a parquet y reiniciar la consulta.

El mismo código .py de transmisión estructurada funciona en spark-submit, pero nunca procesa ningún dato usando pspark; sin mensaje de error, salida de consola o datos de parquet (aparte de la creación de directorios y metadatos). Imagínate.