Errores de base de datos en Django al usar subprocesos

Estoy trabajando en una aplicación web de Django que necesita consultar una base de datos PostgreSQL. Al implementar la concurrencia mediante la interfaz de subprocesos de Python, DoesNotExist errores DoesNotExist para los elementos consultados. Por supuesto, estos errores no se producen al realizar las consultas de forma secuencial.

Déjame mostrar una prueba de unidad que escribí para demostrar el comportamiento inesperado:

 class ThreadingTest(TestCase): fixtures = ['demo_city',] def test_sequential_requests(self): """ A very simple request to database, made sequentially. A fixture for the cities has been loaded above. It is supposed to be six cities in the testing database now. We will made a request for each one of the cities sequentially. """ for number in range(1, 7): c = City.objects.get(pk=number) self.assertEqual(c.pk, number) def test_threaded_requests(self): """ Now, to test the threaded behavior, we will spawn a thread for retrieving each city from the database. """ threads = [] cities = [] def do_requests(number): cities.append(City.objects.get(pk=number)) [threads.append(threading.Thread(target=do_requests, args=(n,))) for n in range(1, 7)] [t.start() for t in threads] [t.join() for t in threads] self.assertNotEqual(cities, []) 

Como puede ver, la primera prueba realiza algunas solicitudes de base de datos de forma secuencial, que de hecho funcionan sin problemas. La segunda prueba, sin embargo, realiza exactamente las mismas solicitudes, pero cada solicitud se genera en un hilo. Esto realmente está fallando, devolviendo una excepción DoesNotExist .

La salida de la ejecución de las pruebas de esta unidad es así:

 test_sequential_requests (cesta.core.tests.threadbase.ThreadingTest) ... ok test_threaded_requests (cesta.core.tests.threadbase.ThreadingTest) ... Exception in thread Thread-1: Traceback (most recent call last): File "/usr/lib/python2.6/threading.py", line 532, in __bootstrap_inner self.run() File "/usr/lib/python2.6/threading.py", line 484, in run self.__target(*self.__args, **self.__kwargs) File "/home/jose/Work/cesta/trunk/src/cesta/core/tests/threadbase.py", line 45, in do_requests cities.append(City.objects.get(pk=number)) File "/home/jose/Work/cesta/trunk/parts/django/django/db/models/manager.py", line 132, in get return self.get_query_set().get(*args, **kwargs) File "/home/jose/Work/cesta/trunk/parts/django/django/db/models/query.py", line 349, in get % self.model._meta.object_name) DoesNotExist: City matching query does not exist. 

… otros hilos devuelve una salida similar …

 Exception in thread Thread-6: Traceback (most recent call last): File "/usr/lib/python2.6/threading.py", line 532, in __bootstrap_inner self.run() File "/usr/lib/python2.6/threading.py", line 484, in run self.__target(*self.__args, **self.__kwargs) File "/home/jose/Work/cesta/trunk/src/cesta/core/tests/threadbase.py", line 45, in do_requests cities.append(City.objects.get(pk=number)) File "/home/jose/Work/cesta/trunk/parts/django/django/db/models/manager.py", line 132, in get return self.get_query_set().get(*args, **kwargs) File "/home/jose/Work/cesta/trunk/parts/django/django/db/models/query.py", line 349, in get % self.model._meta.object_name) DoesNotExist: City matching query does not exist. FAIL ====================================================================== FAIL: test_threaded_requests (cesta.core.tests.threadbase.ThreadingTest) ---------------------------------------------------------------------- Traceback (most recent call last): File "/home/jose/Work/cesta/trunk/src/cesta/core/tests/threadbase.py", line 52, in test_threaded_requests self.assertNotEqual(cities, []) AssertionError: [] == [] ---------------------------------------------------------------------- Ran 2 tests in 0.278s FAILED (failures=1) Destroying test database for alias 'default' ('test_cesta')... 

Recuerde que todo esto está sucediendo en una base de datos PostgreSQL, que se supone que es segura para subprocesos, no con SQLite o similares. La prueba se ejecutó utilizando PostgreSQL también.

En este punto, estoy totalmente perdido sobre lo que puede estar fallando. ¿Alguna idea o sugerencia?

¡Gracias!

EDITAR: escribí una pequeña vista sólo para comprobar si funciona fuera de las pruebas. Aquí está el código de la vista:

 def get_cities(request): queue = Queue.Queue() def get_async_cities(q, n): city = City.objects.get(pk=n) q.put(city) threads = [threading.Thread(target=get_async_cities, args=(queue, number)) for number in range(1, 5)] [t.start() for t in threads] [t.join() for t in threads] cities = list() while not queue.empty(): cities.append(queue.get()) return render_to_response('async/cities.html', {'cities': cities}, context_instance=RequestContext(request)) 

( Por favor, no tome en cuenta la locura de escribir la lógica de la aplicación dentro del código de vista. Recuerde que esto es solo una prueba de concepto y no estaría nunca en la aplicación real ) .

El resultado es que el código funciona bien, las solicitudes se realizan correctamente en subprocesos y la vista finalmente muestra las ciudades después de llamar a su URL.

Por lo tanto, creo que realizar consultas utilizando subprocesos solo será un problema cuando necesite probar el código. En producción, funcionará sin ningún problema.

¿Alguna sugerencia útil para probar este tipo de código con éxito?

Trate de usar TransactionTestCase:

 class ThreadingTest(TransactionTestCase): 

TestCase mantiene los datos en la memoria y no emite un COMPROMISO con la base de datos. Probablemente los hilos están intentando conectarse directamente a la base de datos, mientras que los datos aún no están comprometidos allí. Se describe aquí: https://docs.djangoproject.com/en/dev/topics/testing/?from=olddocs#django.test.TransactionTestCase

TransactionTestCase y TestCase son idénticos, excepto por la manera en que la base de datos se restablece a un estado conocido y la capacidad del código de prueba para probar los efectos de la confirmación y la reversión. Un TransactionTestCase restablece la base de datos antes de que la prueba se ejecute truncando todas las tablas y recargando los datos iniciales. Un TransactionTestCase puede llamar a commit y rollback y observar los efectos de estas llamadas en la base de datos.

Esto suena como si fuera un problema con las transacciones. Si está creando elementos dentro de la solicitud (o prueba) actual, es casi seguro que están en una transacción no confirmada que no es accesible desde la conexión separada en el otro hilo. Probablemente necesite administrar sus transcciones manualmente para que esto funcione.

Queda más claro a partir de esta parte de la documentación.

 class LiveServerTestCase(TransactionTestCase): """ ... Note that it inherits from TransactionTestCase instead of TestCase because the threads do not share the same transactions (unless if using in-memory sqlite) and each thread needs to commit all their transactions so that the other thread can see the changes. """ 

Ahora, la transacción no se ha confirmado dentro de un TestCase, por lo tanto, los cambios no son visibles para el otro hilo.