pyserial: cómo leer la última línea enviada desde un dispositivo serie

Tengo un Arduino conectado a mi computadora ejecutando un bucle, enviando un valor a través del puerto serie a la computadora cada 100 ms.

Quiero hacer un script de Python que se lea desde el puerto serie solo cada pocos segundos, así que quiero que solo vea lo último enviado desde Arduino.

¿Cómo haces esto en Pyserial?

Aquí está el código que probé que no funciona. Lee las líneas secuencialmente.

import serial import time ser = serial.Serial('com4',9600,timeout=1) while 1: time.sleep(10) print ser.readline() #How do I get the most recent line sent from the device? 

Tal vez no entiendo bien tu pregunta, pero como es una línea serial, tendrás que leer todo lo que se envía desde el Arduino secuencialmente, se almacenará en el Arduino hasta que lo leas.

Si desea tener una pantalla de estado que muestre lo último que se envió, use un hilo que incorpore el código en su pregunta (menos la suspensión) y mantenga la última línea completa como la última línea del Arduino.

Actualización: el código de ejemplo de mtasic es bastante bueno, pero si el Arduino ha enviado una línea parcial cuando se llama inWaiting() , obtendrá una línea truncada. En su lugar, lo que desea hacer es colocar la última línea completa en last_received y mantener la línea parcial en el buffer para que se pueda agregar a la próxima vez que se realice el ciclo. Algo como esto:

 def receiving(ser): global last_received buffer_string = '' while True: buffer_string = buffer_string + ser.read(ser.inWaiting()) if '\n' in buffer_string: lines = buffer_string.split('\n') # Guaranteed to have at least 2 entries last_received = lines[-2] #If the Arduino sends lots of empty lines, you'll lose the #last filled line, so you could make the above statement conditional #like so: if lines[-2]: last_received = lines[-2] buffer_string = lines[-1] 

Con respecto al uso de readline() : Esto es lo que dice la documentación de Pyserial (ligeramente editada para mayor claridad y con una mención a readlines ()):

Tenga cuidado al usar “readline”. Especifique un tiempo de espera al abrir el puerto serie, de lo contrario podría bloquearse para siempre si no se recibe un carácter de nueva línea. También tenga en cuenta que “readlines ()” solo funciona con un tiempo de espera. Depende de tener un tiempo de espera e interpreta eso como EOF (final de archivo).

Lo que me parece bastante razonable!

 from serial import * from threading import Thread last_received = '' def receiving(ser): global last_received buffer = '' while True: # last_received = ser.readline() buffer += ser.read(ser.inWaiting()) if '\n' in buffer: last_received, buffer = buffer.split('\n')[-2:] if __name__ == '__main__': ser = Serial( port=None, baudrate=9600, bytesize=EIGHTBITS, parity=PARITY_NONE, stopbits=STOPBITS_ONE, timeout=0.1, xonxoff=0, rtscts=0, interCharTimeout=None ) Thread(target=receiving, args=(ser,)).start() 

Estas soluciones acaparan la CPU mientras esperan a los personajes.

Debes hacer al menos una llamada de locking para leer (1)

 while True: if '\n' in buffer: pass # skip if a line already in buffer else: buffer += ser.read(1) # this will block until one more char or timeout buffer += ser.read(ser.inWaiting()) # get remaining buffered chars 

… y haz la división como antes.

Puede usar ser.flushInput() para eliminar todos los datos en serie que se encuentran actualmente en el búfer.

Después de borrar los datos antiguos, puede usar ser.readline () para obtener los datos más recientes del dispositivo serie.

Creo que es un poco más simple que las otras soluciones propuestas aquí. Trabajó para mí, espero que sea adecuado para ti.

Este método le permite controlar por separado el tiempo de espera para recostackr todos los datos de cada línea, y un tiempo de espera diferente para esperar en líneas adicionales.

 # get the last line from serial port lines = serial_com() lines[-1] def serial_com(): '''Serial communications: get a response''' # open serial port try: serial_port = serial.Serial(com_port, baudrate=115200, timeout=1) except serial.SerialException as e: print("could not open serial port '{}': {}".format(com_port, e)) # read response from serial port lines = [] while True: line = serial_port.readline() lines.append(line.decode('utf-8').rstrip()) # wait for new data after each line timeout = time.time() + 0.1 while not serial_port.inWaiting() and timeout > time.time(): pass if not serial_port.inWaiting(): break #close the serial port serial_port.close() return lines 

Necesitará un bucle para leer todo lo enviado, con la última llamada a readline () bloqueando hasta el tiempo de espera. Asi que:

 def readLastLine(ser): last_data='' while True: data=ser.readline() if data!='': last_data=data else: return last_data 

Modificación leve a mtasic y al código de Vinay Sajip:

Si bien este código me resultó muy útil para una aplicación similar, necesitaba que todas las líneas regresaran de un dispositivo serie que enviara información periódicamente.

Opté por sacar el primer elemento de la parte superior, registrarlo y luego volver a unir los elementos restantes como el nuevo búfer y continuar desde allí.

Me doy cuenta de que esto no es lo que Greg estaba pidiendo, pero pensé que valía la pena compartirlo como nota al margen.

 def receiving(ser): global last_received buffer = '' while True: buffer = buffer + ser.read(ser.inWaiting()) if '\n' in buffer: lines = buffer.split('\n') last_received = lines.pop(0) buffer = '\n'.join(lines) 

Usar .inWaiting() dentro de un bucle infinito puede ser problemático. Puede acaparar toda la CPU dependiendo de la implementación. En su lugar, recomendaría usar un tamaño específico de datos para leer. Así que en este caso se debe hacer lo siguiente, por ejemplo:

 ser.read(1024) 

Demasiadas complicaciones

¿Cuál es la razón para dividir el objeto de bytes por línea nueva o por otras manipulaciones de matriz? Escribo el método más simple, que resolverá su problema:

 import serial s = serial.Serial(31) s.write(bytes("ATI\r\n", "utf-8")); while True: last = '' for byte in s.read(s.inWaiting()): last += chr(byte) if len(last) > 0: # Do whatever you want with last print (bytes(last, "utf-8")) last = '' 

Aquí hay un ejemplo que usa un contenedor que le permite leer la línea más reciente sin el 100% de la CPU

 class ReadLine: """ pyserial object wrapper for reading line source: https://github.com/pyserial/pyserial/issues/216 """ def __init__(self, s): self.buf = bytearray() self.s = s def readline(self): i = self.buf.find(b"\n") if i >= 0: r = self.buf[:i + 1] self.buf = self.buf[i + 1:] return r while True: i = max(1, min(2048, self.s.in_waiting)) data = self.s.read(i) i = data.find(b"\n") if i >= 0: r = self.buf + data[:i + 1] self.buf[0:] = data[i + 1:] return r else: self.buf.extend(data) s = serial.Serial('/dev/ttyS0') device = ReadLine(s) while True: print(device.readline())