Pruebas de unidad de prueba utilizando Autobahn WebSocket

Estoy tratando de escribir pruebas de unidad para mi aplicación que usa Autobahn.

Quiero probar mis controladores, los cuales reciben datos del protocolo, los analizan y reactjsn a ellos.

Pero cuando mi prueba llega a un punto en que el protocolo debe desconectarse ( self.sendClose ), se genera un error.

 exceptions.AttributeError: 'MyProtocol' object has no attribute 'state'. 

Estaba tratando de makeConnection usando proto_helpers.StringTransport pero también tengo errores

 exceptions.AttributeError: StringTransport instance has no attribute 'setTcpNoDelay'` 

Estoy usando la trial y no quiero ejecutar un servidor / cliente ficticio solo para fines de prueba, porque no se recomienda.

¿Cómo debo escribir mis pruebas para poder probar funciones que envían datos, leen datos, se desconectan, etc. utilizando una conexión falsa y una versión de prueba?

Es difícil decir exactamente lo que está pasando sin echar un vistazo a la clase MyProtocol . Parece que el problema se debe al hecho de que está jugando directamente con funciones de bajo nivel y, por lo tanto, también el atributo de state de la clase WebSocket , que es, bueno, una representación del estado interno de la conexión WebSocket.

De acuerdo con el documento de referencia de la autopista , las API de WebSicketProtocol que puede usar y reemplazar directamente son:

  • onOpen
  • onMessage
  • onCerrar
  • enviar mensaje
  • enviar cerrar

Su enfoque de utilizar el StringTransport para probar su protocolo no es ideal. El problema radica en el hecho de que MyProtocol es una pequeña capa sobre el marco de WebSocketProtocol proporcionado por autobahn que, para bien o para mal, oculta los detalles sobre la administración de la conexión, el transporte y el estado del protocolo interno.

Si lo piensa, desea probar sus cosas, no WebSocketProtocol y, por lo tanto, si no desea incrustar un servidor o cliente ficticio, su mejor MyProtocol es probar directamente los métodos que anula MyProtocol .

Un ejemplo de lo que estoy diciendo es el siguiente

 class MyPublisher(object): cbk=None def publish(self, msg): if self.cbk: self.cbk(msg) class MyProtocol(WebSocketServerProtocol): def __init__(self, publisher): WebSocketServerProtocol.__init__(self) #Defining callback for publisher publisher.cbk = self.sendMessage def onMessage(self, msg, binary) #Stupid echo self.sendMessage(msg) class NotificationTest(unittest.TestCase): class MyProtocolFactory(WebSocketServerFactory): def __init__(self, publisher): WebSocketServerFactory.__init__(self, "ws://127.0.0.1:8081") self.publisher = publisher self.openHandshakeTimeout = None def buildProtocol(self, addr): protocol = MyProtocol(self.listener) protocol.factory = self protocol.websocket_version = 13 #Hybi version 13 is supported by pretty much everyone (apart from IE <8 and android browsers) return protocol def setUp(self): publisher = task.LoopingCall(self.send_stuff, "Hi there") factory = NotificationTest.MyProtocolFactory(listener) protocol = factory.buildProtocol(None) transport = proto_helpers.StringTransport() def play_dumb(*args): pass setattr(transport, "setTcpNoDelay", play_dumb) protocol.makeConnection(transport) self.protocol, self.transport, self.publisher, self.fingerprint_handler = protocol, transport, publisher, fingerprint_handler def test_onMessage(self): #Following 2 lines are the problematic part. Here you are manipulating explicitly a hidden state which your implementation should not be concerned with! self.protocol.state = WebSocketProtocol.STATE_OPEN self.protocol.websocket_version = 13 self.protocol.onMessage("Whatever") self.assertEqual(self.transport.value()[2:], 'Whatever') def test_push(self): #Following 2 lines are the problematic part. Here you are manipulating explicitly a hidden state which your implementation should not be concerned with! self.protocol.state = WebSocketProtocol.STATE_OPEN self.protocol.websocket_version = 13 self.publisher.publish("Hi there") self.assertEqual(self.transport.value()[2:], 'Hi There') 

Como te habrás dado cuenta, usar el StringTransport aquí es muy engorroso. Debe tener conocimiento del marco de referencia y omitir su administración estatal, algo que realmente no quiere hacer. Desafortunadamente, autobahn no proporciona un objeto de prueba listo para usar que permita una fácil manipulación del estado y, por lo tanto, mi sugerencia de utilizar servidores y clientes ficticios sigue siendo válida


Probando su servidor CON la red

La prueba provista muestra cómo puede probar el empuje del servidor, afirmando que lo que está obteniendo es lo que espera, y también utiliza un gancho sobre cómo determinar cuándo terminar.

El protocolo del servidor

 from twisted.trial.unittest import TestCase as TrialTest from autobahn.websocket import WebSocketServerProtocol, WebSocketServerFactory, WebSocketClientProtocol, WebSocketClientFactory, connectWS, listenWS from twisted.internet.defer import Deferred from twisted.internet import task START="START" class TestServerProtocol(WebSocketServerProtocol): def __init__(self): #The publisher task simulates an event that triggers a message push self.publisher = task.LoopingCall(self.send_stuff, "Hi there") def send_stuff(self, msg): #this method sends a message to the client self.sendMessage(msg) def _on_start(self): #here we trigger the task to execute every second self.publisher.start(1.0) def onMessage(self, message, binary): #According to this stupid protocol, the server starts sending stuff when the client sends a "START" message #You can plug other commands in here { START : self._on_start #Put other keys here }[message]() def onClose(self, wasClean, code, reason): #After closing the connection, we tell the task to stop sending messages self.publisher.stop() 

El protocolo del cliente y la fábrica.

La siguiente clase es el protocolo del cliente. Básicamente le dice al servidor que comience a enviar mensajes. Llama a close_condition para ver si es el momento de cerrar la conexión y, como último, llama a la función de assertion en los mensajes que recibió para ver si la prueba fue exitosa o no.

 class TestClientProtocol(WebSocketClientProtocol): def __init__(self, assertion, close_condition, timeout, *args, **kwargs): self.assertion = assertion self.close_condition = close_condition self._received_msgs = [] from twisted.internet import reactor #This is a way to set a timeout for your test #in case you never meet the conditions dictated by close_condition self.damocle_sword = reactor.callLater(timeout, self.sendClose) def onOpen(self): #After the connection has been established, #you can tell the server to send its stuff self.sendMessage(START) def onMessage(self, msg, binary): #Here you get the messages pushed from the server self._received_msgs.append(msg) #If it is time to close the connection if self.close_condition(msg): self.damocle_sword.cancel() self.sendClose() def onClose(self, wasClean, code, reason): #Now it is the right time to check our test assertions self.assertion.callback(self._received_msgs) class TestClientProtocolFactory(WebSocketClientFactory): def __init__(self, assertion, close_condition, timeout, **kwargs): WebSocketClientFactory.__init__(self, **kwargs) self.assertion = assertion self.close_condition = close_condition self.timeout = timeout #This parameter needs to be forced to None to not leave the reactor dirty self.openHandshakeTimeout = None def buildProtocol(self, addr): protocol = TestClientProtocol(self.assertion, self.close_condition, self.timeout) protocol.factory = self return protocol 

La prueba de prueba basada

 class WebSocketTest(TrialTest): def setUp(self): port = 8088 factory = WebSocketServerFactory("ws://localhost:{}".format(port)) factory.protocol = TestServerProtocol self.listening_port = listenWS(factory) self.factory, self.port = factory, port def tearDown(self): #cleaning up stuff otherwise the reactor complains self.listening_port.stopListening() def test_message_reception(self): #This is the test assertion, we are testing that the messages received were 3 def assertion(msgs): self.assertEquals(len(msgs), 3) #This class says when the connection with the server should be finalized. #In this case the condition to close the connectionis for the client to get 3 messages class CommunicationHandler(object): msg_count = 0 def close_condition(self, msg): self.msg_count += 1 return self.msg_count == 3 d = Deferred() d.addCallback(assertion) #Here we create the client... client_factory = TestClientProtocolFactory(d, CommunicationHandler().close_condition, 5, url="ws://localhost:{}".format(self.port)) #...and we connect it to the server connectWS(client_factory) #returning the assertion as a deferred purely for demonstration return d 

Obviamente, esto es solo un ejemplo, pero como puede ver, no tuve que makeConnection con makeConnection ni ningún transport explícitamente.