Procesar datos de socket que terminan con un salto de línea.

¿Cuál es el mejor enfoque para procesar una conexión de socket donde necesito data var para terminar con un salto de línea \n ? Estoy usando el código a continuación, pero a veces los paquetes tcp se fragmentan y lleva mucho tiempo coincidir con data.endswith("\n") . También probé otros métodos, como guardar la última línea si no termina con \n adjuntarla a los data en el siguiente ciclo. pero esto tampoco funciona porque los paquetes múltiples se fragmentan y la primera y la segunda parte no coinciden. No tengo control sobre el otro extremo, básicamente envía varias líneas que terminan en \r\n .

Cualquier sugerencia será bienvenida, ya que no tengo mucho conocimiento sobre conexiones de socket.

 def receive_bar_updates(s): global all_bars data = '' buffer_size = 4096 while True: data += s.recv(buffer_size) if not data.endswith("\n"): continue lines = data.split("\n") lines = filter(None, lines) for line in lines: if line.startswith("BH") or line.startswith("BC"): symbol = str(line.split(",")[1]) all_bars[symbol].append(line) y = Thread(target=proccess_bars, kwargs={'symbol': symbol}) y.start() data = "" 

Ejemplo de datanormales “:

 line1\r\n line2\r\n line3\r\n 

Ejemplo de data fragmentados :

 line1\r\n line2\r\n lin 

Si tiene una entrada sin procesar que desea procesar como línea, el módulo io es su amigo porque hará el assembly de paquetes de bajo nivel en líneas.

Podrías usar:

 class SocketIO(io.RawIOBase): def __init__(self, sock): self.sock = sock def read(self, sz=-1): if (sz == -1): sz=0x7FFFFFFF return self.sock.recv(sz) def seekable(self): return False 

Es más robusto que el endswith('\n') porque si un paquete contiene una nueva línea incrustada ( 'ab\ncd' ), el módulo io lo procesará correctamente. Su código podría convertirse en:

 def receive_bar_updates(s): global all_bars data = '' buffer_size = 4096 fd = SocketIO(s) # fd can be used as an input file object for line in fd: if should_be_rejected_by_filter(line): continue # do not know what filter does... if line.startswith("BH") or line.startswith("BC"): symbol = str(line.split(",")[1]) all_bars[symbol].append(line) y = Thread(target=proccess_bars, kwargs={'symbol': symbol}) y.start() 

¿Estás aceptando conexiones diferentes? ¿O es un flujo de datos, dividido por \r\n ‘s?

Al aceptar conexiones múltiples, esperaría una conexión con s.accept() y luego procesaría todos sus datos. Cuando tenga todo el paquete, procese sus datos y espere la próxima conexión. Lo que hagas dependerá de la estructura de cada paquete. (Ejemplo: https://wiki.python.org/moin/TcpCommunication )

Si, en cambio, está consumiendo un flujo de datos, probablemente debería procesar cada ‘línea’ que encuentre en un hilo separado, mientras sigue consumiendo en otro.

Edit: Por lo tanto, si tengo su situación correcta; una conexión, siendo los datos una cadena dividida por \r\n , que termina con \n . Sin embargo, los datos no se corresponden con lo que está esperando, en lugar de eso, hacen un bucle infinitamente esperando un \n .

La interfaz de socket, tal como la entiendo, termina con un resultado de datos vacío. Por lo tanto, el último búfer podría haber terminado con un \n , pero luego continuó obteniendo objetos None , tratando de encontrar otro \n .

En su lugar, intente agregar esto:

 if not data: break 

Código completo:

 def receive_bar_updates(s): global all_bars data = '' buffer_size = 4096 while True: data += s.recv(buffer_size) if not data: break if not data.endswith("\n"): continue lines = data.split("\n") lines = filter(None, lines) for line in lines: if line.startswith("BH") or line.startswith("BC"): symbol = str(line.split(",")[1]) all_bars[symbol].append(line) y = Thread(target=proccess_bars, kwargs={'symbol': symbol}) y.start() data = "" 

Edit2: Vaya, código incorrecto

No he probado este código, pero debería funcionar:

 def receive_bar_updates(s): global all_bars data = '' buf = '' buffer_size = 4096 while True: if not "\r\n" in data: # skip recv if we already have another line buffered. data += s.recv(buffer_size) if not "\r\n" in data: continue i = data.rfind("\r\n") data, buf = data[:i+2], data[i+2:] lines = data.split("\r\n") lines = filter(None, lines) for line in lines: if line.startswith("BH") or line.startswith("BC"): symbol = str(line.split(",")[1]) all_bars[symbol].append(line) y = Thread(target=proccess_bars, kwargs={'symbol': symbol}) y.start() data = buf 

Edición: Olvidé mencionar, solo modifiqué el código para recibir los datos, no tengo idea para qué lines = data.split("\n") el rest de la función (comenzando con lines = data.split("\n") ).

Edición 2: ahora usa “\ r \ n” para saltos de línea en lugar de “\ n”.

Edición 3: Se solucionó un problema.

Básicamente parece que quieres leer líneas desde el socket. Tal vez sea mejor que no recv llamadas de bajo nivel de recv , sino que solo uses sock.makefile() y trates el resultado como un archivo normal desde donde puedes leer líneas desde: from line in sfile: ...

Eso deja la cuestión de la demora / trozo. Es probable que esto sea causado por el algoritmo de Nagle en el lado de envío. Intenta deshabilitar eso:

 sock.setsockopt(socket.IPPROTO_TCP, socket.TCP_NODELAY, 1)