Sincronización de hilos en Python

Actualmente estoy trabajando en un proyecto escolar en el que la asignación, entre otras cosas, es configurar un sistema de cliente / servidor con hilos. Cada cliente en el sistema debe tener asignado su propio hilo en el servidor cuando se conecta a él. Además, me gustaría que el servidor ejecute otros subprocesos, uno relativo a la entrada desde la línea de comandos y otro relacionado con la difusión de mensajes a todos los clientes. Sin embargo, no puedo hacer que esto se ejecute como quiero. Parece que los hilos se están bloqueando entre sí. Me gustaría que mi progtwig tome entradas de la línea de comandos, al mismo tiempo que el servidor escucha a los clientes conectados, y así sucesivamente.

Soy nuevo en la progtwigción y multihilo de python, y aunque creo que mi idea es buena, no estoy sorprendido de que mi código no funcione. Lo cierto es que no estoy exactamente seguro de cómo voy a implementar el paso del mensaje entre los diferentes subprocesos. Tampoco estoy seguro de cómo implementar correctamente los comandos de locking de recursos correctamente. Voy a publicar el código para mi archivo de servidor y mi archivo de cliente aquí, y espero que alguien pueda ayudarme con esto. Creo que esto debería ser en realidad dos guiones relativamente simples. He tratado de comentar mi código lo mejor posible hasta cierto punto.

import select import socket import sys import threading import client class Server: #initializing server socket def __init__(self, event): self.host = 'localhost' self.port = 50000 self.backlog = 5 self.size = 1024 self.server = None self.server_running = False self.listen_threads = [] self.local_threads = [] self.clients = [] self.serverSocketLock = None self.cmdLock = None #here i have also declared some events for the command line input #and the receive function respectively, not sure if correct self.cmd_event = event self.socket_event = event def openSocket(self): #binding server to port try: self.server = socket.socket(socket.AF_INET, socket.SOCK_STREAM) self.server.bind((self.host, self.port)) self.server.listen(5) print "Listening to port " + str(self.port) + "..." except socket.error, (value,message): if self.server: self.server.close() print "Could not open socket: " + message sys.exit(1) def run(self): self.openSocket() #making Rlocks for the socket and for the command line input self.serverSocketLock = threading.RLock() self.cmdLock = threading.RLock() #set blocking to non-blocking self.server.setblocking(0) #making two threads always running on the server, #one for the command line input, and one for broadcasting (sending) cmd_thread = threading.Thread(target=self.server_cmd) broadcast_thread = threading.Thread(target=self.broadcast,args=[self.clients]) cmd_thread.daemon = True broadcast_thread.daemon = True #append the threads to thread list self.local_threads.append(cmd_thread) self.local_threads.append(broadcast_thread) cmd_thread.start() broadcast_thread.start() self.server_running = True while self.server_running: #connecting to "knocking" clients try: c = client.Client(self.server.accept()) self.clients.append(c) print "Client " + str(c.address) + " connected" #making a thread for each clientn and appending it to client list listen_thread = threading.Thread(target=self.listenToClient,args=[c]) self.listen_threads.append(listen_thread) listen_thread.daemon = True listen_thread.start() #setting event "client has connected" self.socket_event.set() except socket.error, (value, message): continue #close threads self.server.close() print "Closing client threads" for c in self.listen_threads: c.join() def listenToClient(self, c): while self.server_running: #the idea here is to wait until the thread gets the message "client #has connected" self.socket_event.wait() #then clear the event immidiately... self.socket_event.clear() #and aquire the socket resource self.serverSocketLock.acquire() #the below is the receive thingy try: recvd_data = c.client.recv(self.size) if recvd_data == "" or recvd_data == "close\n": print "Client " + str(c.address) + (" disconnected...") self.socket_event.clear() self.serverSocketLock.release() return print recvd_data #I put these here to avoid locking the resource if no message #has been received self.socket_event.clear() self.serverSocketLock.release() except socket.error, (value, message): continue def server_cmd(self): #this is a simple command line utility while self.server_running: #got to have a smart way to make this work self.cmd_event.wait() self.cmd_event.clear() self.cmdLock.acquire() cmd = sys.stdin.readline() if cmd == "": continue if cmd == "close\n": print "Server shutting down..." self.server_running = False self.cmdLock.release() def broadcast(self, clients): while self.server_running: #this function will broadcast a message received from one #client, to all other clients, but i guess any thread #aspects applied to the above, will work here also try: send_data = sys.stdin.readline() if send_data == "": continue else: for c in clients: c.client.send(send_data) self.serverSocketLock.release() self.cmdLock.release() except socket.error, (value, message): continue if __name__ == "__main__": e = threading.Event() s = Server(e) s.run() 

Y luego el archivo cliente

 import select import socket import sys import server import threading class Client(threading.Thread): #initializing client socket def __init__(self,(client,address)): threading.Thread.__init__(self) self.client = client self.address = address self.size = 1024 self.client_running = False self.running_threads = [] self.ClientSocketLock = None def run(self): #connect to server self.client.connect(('localhost',50000)) #making a lock for the socket resource self.clientSocketLock = threading.Lock() self.client.setblocking(0) self.client_running = True #making two threads, one for receiving messages from server... listen = threading.Thread(target=self.listenToServer) #...and one for sending messages to server speak = threading.Thread(target=self.speakToServer) #not actually sure wat daemon means listen.daemon = True speak.daemon = True #appending the threads to the thread-list self.running_threads.append(listen) self.running_threads.append(speak) listen.start() speak.start() #this while-loop is just for avoiding the script terminating while self.client_running: dummy = 1 #closing the threads if the client goes down print "Client operating on its own" self.client.close() #close threads for t in self.running_threads: t.join() return #defining "listen"-function def listenToServer(self): while self.client_running: #here i acquire the socket to this function, but i realize I also #should have a message passing wait()-function or something #somewhere self.clientSocketLock.acquire() try: data_recvd = self.client.recv(self.size) print data_recvd except socket.error, (value,message): continue #releasing the socket resource self.clientSocketLock.release() #defining "speak"-function, doing much the same as for the above function def speakToServer(self): while self.client_running: self.clientSocketLock.acquire() try: send_data = sys.stdin.readline() if send_data == "close\n": print "Disconnecting..." self.client_running = False else: self.client.send(send_data) except socket.error, (value,message): continue self.clientSocketLock.release() if __name__ == "__main__": c = Client((socket.socket(socket.AF_INET, socket.SOCK_STREAM),'localhost')) c.run() 

Me doy cuenta de que este es un buen número de líneas de código para que las lea, pero como dije, creo que el concepto y el script en sí deben ser bastante fáciles de entender. Sería muy apreciado si alguien me ayudara a sincronizar mis hilos de manera adecuada =)

Gracias por adelantado

—Editar—

DE ACUERDO. Así que ahora he simplificado mi código para que solo contenga funciones de envío y recepción tanto en el servidor como en los módulos del cliente. Los clientes que se conectan al servidor obtienen sus propios hilos, y las funciones de envío y recepción en ambos módulos funcionan en sus propios hilos separados. Esto funciona como un encanto, con la función de difusión en el módulo del servidor haciendo eco de las cadenas que recibe de un cliente a todos los clientes. ¡Hasta ahora tan bueno!

Lo siguiente que quiero que haga mi script es tomar comandos específicos, es decir, “cerrar”, en el módulo del cliente para cerrar el cliente y unir todos los subprocesos en ejecución en la lista de subprocesos. Estoy usando un indicador de evento para notificar al listenToServer y al hilo principal que el hilo speakToServer ha leído la entrada “cerrar”. Parece que el subproceso principal salta de su bucle while e inicia el bucle for que se supone que se une a los otros subprocesos. Pero aquí cuelga. Parece que el bucle while en el hilo listenToServer nunca se detiene, aunque server_running debería establecerse en False cuando se establece el indicador de evento.

Estoy publicando solo el módulo de cliente aquí, porque supongo que una respuesta para sincronizar estos dos subprocesos se relacionará con la sincronización de más subprocesos tanto en el cliente como en el módulo del servidor.

 import select import socket import sys import server_bygg0203 import threading from time import sleep class Client(threading.Thread): #initializing client socket def __init__(self,(client,address)): threading.Thread.__init__(self) self.client = client self.address = address self.size = 1024 self.client_running = False self.running_threads = [] self.ClientSocketLock = None self.disconnected = threading.Event() def run(self): #connect to server self.client.connect(('localhost',50000)) #self.client.setblocking(0) self.client_running = True #making two threads, one for receiving messages from server... listen = threading.Thread(target=self.listenToServer) #...and one for sending messages to server speak = threading.Thread(target=self.speakToServer) #not actually sure what daemon means listen.daemon = True speak.daemon = True #appending the threads to the thread-list self.running_threads.append((listen,"listen")) self.running_threads.append((speak, "speak")) listen.start() speak.start() while self.client_running: #check if event is set, and if it is #set while statement to false if self.disconnected.isSet(): self.client_running = False #closing the threads if the client goes down print "Client operating on its own" self.client.shutdown(1) self.client.close() #close threads #the script hangs at the for-loop below, and #refuses to close the listen-thread (and possibly #also the speak thread, but it never gets that far) for t in self.running_threads: print "Waiting for " + t[1] + " to close..." t[0].join() self.disconnected.clear() return #defining "speak"-function def speakToServer(self): #sends strings to server while self.client_running: try: send_data = sys.stdin.readline() self.client.send(send_data) #I want the "close" command #to set an event flag, which is being read by all other threads, #and, at the same time set the while statement to false if send_data == "close\n": print "Disconnecting..." self.disconnected.set() self.client_running = False except socket.error, (value,message): continue return #defining "listen"-function def listenToServer(self): #receives strings from server while self.client_running: #check if event is set, and if it is #set while statement to false if self.disconnected.isSet(): self.client_running = False try: data_recvd = self.client.recv(self.size) print data_recvd except socket.error, (value,message): continue return if __name__ == "__main__": c = Client((socket.socket(socket.AF_INET, socket.SOCK_STREAM),'localhost')) c.run() 

Más adelante, cuando ponga en funcionamiento este sistema servidor / cliente, usaré este sistema en algunos modelos de elevadores que tenemos aquí en el laboratorio, y cada cliente recibirá órdenes de piso o llamadas “arriba” y “abajo”. El servidor ejecutará un algoritmo de distribución y actualizará las colas del elevador en los clientes que sean más apropiados para el pedido solicitado. Me doy cuenta de que es un largo camino por recorrer, pero creo que uno solo debe dar un paso a la vez =)

Espero que alguien tenga tiempo para mirar esto. Gracias por adelantado.

El mayor problema que veo con este código es que tienes demasiadas cosas que hacer para corregir fácilmente tu problema. El enhebrado puede ser extremadamente complicado debido a lo no lineal que se vuelve la lógica. Especialmente cuando tienes que preocuparte por la sincronización con los lockings.

La razón por la que los clientes se bloquean entre sí se debe a la forma en que está utilizando su serverSocketLock en su bucle listenToClient () en el servidor. Para ser honesto, este no es exactamente su problema en este momento con su código, pero se convirtió en un problema cuando comencé a depurarlo y convertí los sockets en sockets de locking. Si está poniendo cada conexión en su propio hilo y leyendo de ellas, entonces no hay razón para usar un locking global del servidor aquí. Todos pueden leer desde sus propios sockets al mismo tiempo, que es el propósito del hilo.

Aquí está mi recomendación para usted:

  1. Elimine todos los lockings e hilos adicionales que no necesita, y comience desde el principio
  2. Haga que los clientes se conecten a medida que lo hace, y póngalos en su hilo mientras lo hace. Y simplemente haz que envíen datos cada segundo. Verifique que pueda hacer que más de un cliente se conecte y envíe, y que su servidor esté en bucle y recibiendo. Una vez que tenga esta parte funcionando, puede pasar a la siguiente parte.
  3. Ahora mismo tienes tus sockets configurados para no bloquear. Esto hace que todos giren muy rápido en sus bucles cuando los datos no están listos. Ya que estás enhebrando, deberías configurarlos para bloquear. Luego, los hilos del lector simplemente se sentarán, esperarán los datos y responderán de inmediato.

Los lockings se utilizan cuando los subprocesos accederán a los recursos compartidos. Obviamente, debe hacerlo en cualquier momento en que un hilo intente modificar un atributo de servidor como una lista o un valor. Pero no cuando están trabajando en sus propios sockets privados.

El evento que estás usando para activar a tus lectores no parece necesario aquí. Has recibido el cliente y luego inicias el hilo. Así que está listo para irse.

En pocas palabras … simplifique y pruebe un bit a la vez. Cuando esté funcionando, agregue más. Hay demasiados hilos y lockings en este momento.

Aquí hay un ejemplo simplificado de su método listenToClient :

 def listenToClient(self, c): while self.server_running: try: recvd_data = c.client.recv(self.size) print "received:", c, recvd_data if recvd_data == "" or recvd_data == "close\n": print "Client " + str(c.address) + (" disconnected...") return print recvd_data except socket.error, (value, message): if value == 35: continue else: print "Error:", value, message 

Copia de seguridad de su trabajo, luego tirarlo – parcialmente.

Necesita implementar su progtwig en partes y probar cada pieza a medida que avanza. Primero, aborde la parte de input de su progtwig. No se preocupe por cómo transmitir la entrada que recibió. En su lugar, preocuparse de que pueda recibir información de manera exitosa y repetida a través de su socket. Hasta ahora tan bueno.

Ahora, asumo que le gustaría reactjsr a esta entrada transmitiendo a los otros clientes adjuntos. ¡Qué pena, no puedes hacer eso todavía! Porque, dejé un pequeño detalle fuera del párrafo anterior. Tienes que diseñar un PROTOCOLO .

¿Qué es un protocolo? Es un conjunto de reglas para la comunicación. ¿Cómo sabe su servidor cuando el cliente ha terminado de enviar sus datos? ¿Está terminado por algún personaje especial? O quizás codifique el tamaño del mensaje que se enviará como el primer byte o dos del mensaje.

Esto está resultando ser mucho trabajo, ¿no es así? 🙂

¿Qué es un protocolo simple? Un protocolo line-oriented es simple. Lea 1 carácter a la vez hasta que llegue al final del terminador de registro – ‘\ n’. Entonces, los clientes enviarían registros como este a su servidor –

HELO \ n MSG DAVE ¿Dónde están tus hijos? \ N

Entonces, asumiendo que tiene este protocolo simple diseñado, implementarlo. Por ahora, ¡NO SE PREOCUPE POR LA MATERIA DE MULTITHREADING! Solo preocupate por hacer que funcione.

Su protocolo actual es leer 1024 bytes. Lo que puede no ser malo, solo asegúrese de enviar mensajes de 1024 bytes desde el cliente.

Una vez que haya configurado el protocolo, continúe reactjsndo a la entrada. Pero por ahora necesitas algo que lea entrada. Una vez hecho esto, podemos preocuparnos por hacer algo con él.

jdi tiene razón, tienes demasiado progtwig para trabajar. Las piezas son más fáciles de arreglar.