Cuelgue en el script Python utilizando SQLAlchemy y multiprocesamiento

Considere el siguiente script de Python, que utiliza SQLAlchemy y el módulo de multiprocesamiento de Python. Esto es con Python 2.6.6-8 + b1 (predeterminado) y SQLAlchemy 0.6.3-3 (predeterminado) en Debian squeeze. Esta es una versión simplificada de algún código real.

import multiprocessing from sqlalchemy import * from sqlalchemy.orm import * dbuser = ... password = ... dbname = ... dbstring = "postgresql://%s:%s@localhost:5432/%s"%(dbuser, password, dbname) db = create_engine(dbstring) m = MetaData(db) def make_foo(i): t1 = Table('foo%s'%i, m, Column('a', Integer, primary_key=True)) conn = db.connect() for i in range(10): conn.execute("DROP TABLE IF EXISTS foo%s"%i) conn.close() db.dispose() for i in range(10): make_foo(i) m.create_all() def do(kwargs): i, dbstring = kwargs['i'], kwargs['dbstring'] db = create_engine(dbstring) Session = scoped_session(sessionmaker()) Session.configure(bind=db) Session.execute("COMMIT; BEGIN; TRUNCATE foo%s; COMMIT;") Session.commit() db.dispose() pool = multiprocessing.Pool(processes=5) # start 4 worker processes results = [] arglist = [] for i in range(10): arglist.append({'i':i, 'dbstring':dbstring}) r = pool.map_async(do, arglist, callback=results.append) # evaluate "f(10)" asynchronously r.get() r.wait() pool.close() pool.join() 

Este script se cuelga con el siguiente mensaje de error.

 Exception in thread Thread-2: Traceback (most recent call last): File "/usr/lib/python2.6/threading.py", line 532, in __bootstrap_inner self.run() File "/usr/lib/python2.6/threading.py", line 484, in run self.__target(*self.__args, **self.__kwargs) File "/usr/lib/python2.6/multiprocessing/pool.py", line 259, in _handle_results task = get() TypeError: ('__init__() takes at least 4 arguments (2 given)', , ('(ProgrammingError) syntax error at or near "%"\nLINE 1: COMMIT; BEGIN; TRUNCATE foo%s; COMMIT;\n ^\n',)) 

Por supuesto, el error de syntax aquí es TRUNCATE foo%s; . Mi pregunta es, ¿por qué el proceso se cuelga y puedo persuadirlo para que salga con un error, sin realizar una cirugía mayor a mi código? Este comportamiento es muy similar al de mi código real.

Tenga en cuenta que el locking no se produce si la statement se reemplaza por algo como print foobarbaz . Además, el locking todavía sucede si reemplazamos

 Session.execute("COMMIT; BEGIN; TRUNCATE foo%s; COMMIT;") Session.commit() db.dispose() 

por solo Session.execute("TRUNCATE foo%s;")

Estoy usando la versión anterior porque está más cerca de lo que está haciendo mi código real.

Además, la eliminación del multiprocessing de la imagen y el bucle sobre las tablas en serie hace que el locking desaparezca, y simplemente se cierra con un error.

También estoy un poco desconcertado por la forma del error, en particular el TypeError: ('__init__() takes at least 4 arguments (2 given)' bit. ¿De dónde viene este error? Parece probable que sea de algún lugar en El código multiprocessing .

Los registros de PostgreSQL no son útiles. Veo muchas líneas como

 2012-01-09 14:16:34.174 IST [7810] 4f0aa96a.1e82/1 12/583 0 ERROR: syntax error at or near "%" at character 28 2012-01-09 14:16:34.175 IST [7810] 4f0aa96a.1e82/2 12/583 0 STATEMENT: COMMIT; BEGIN; TRUNCATE foo%s; COMMIT; 

Pero nada más que parezca relevante.

ACTUALIZACIÓN 1: Gracias a lbolla y su perspicaz análisis , pude presentar un informe de error de Python sobre esto. Vea el análisis de sbt en ese informe, y también aquí . Ver también el informe de errores de Python. Entonces, siguiendo la explicación de sbt, podemos reproducir el error original con

 import sqlalchemy.exc e = sqlalchemy.exc.ProgrammingError("", {}, None) type(e)(*e.args) 

lo que da

 Traceback (most recent call last): File "", line 9, in  TypeError: __init__() takes at least 4 arguments (2 given) 

ACTUALIZACIÓN 2: Esto se ha corregido, al menos para SQLAlchemy, por Mike Bayer, vea el informe de errores StatementError Exceptions no seleccionable. . Según la sugerencia de Mike, también informé de un error similar a psycopg2, aunque no tuve (y no tengo) un ejemplo real de rotura. En cualquier caso, aparentemente lo arreglaron, aunque no dieron detalles de la solución. Ver las excepciones de psycopg no se pueden decapar . Por si acaso , también informé de que las excepciones de ConfigParser de errores de Python no son seleccionables correspondientes a la pregunta SO que se menciona en lbolla . Parece que quieren una prueba para esto.

De todos modos, parece que seguirá siendo un problema en el futuro inmediato, ya que, en general, los desarrolladores de Python no parecen estar al tanto de este problema y, por lo tanto, no deben protegerse. Sorprendentemente, parece que no hay suficientes personas que utilicen el multiprocesamiento para que esto sea un problema bien conocido, o tal vez simplemente lo soporten. Espero que los desarrolladores de Python lo solucionen al menos para Python 3, porque es molesto.

Acepté la respuesta de Lbolla, ya que sin su explicación de cómo el problema estaba relacionado con el manejo de excepciones, probablemente no habría llegado a ninguna parte para entender esto. También quiero dar las gracias a sbt, quien explicó que Python no pudo descifrar las excepciones fue el problema. Estoy muy agradecido con ambos, y por favor voten sus respuestas. Gracias.

ACTUALIZACIÓN 3: Publiqué una pregunta de seguimiento: Detectando excepciones imparables y re-subiendo .

Creo que el TypeError viene de get multiprocessing .

He eliminado todo el código DB de su script. Mira esto:

 import multiprocessing import sqlalchemy.exc def do(kwargs): i = kwargs['i'] print i raise sqlalchemy.exc.ProgrammingError("", {}, None) return i pool = multiprocessing.Pool(processes=5) # start 4 worker processes results = [] arglist = [] for i in range(10): arglist.append({'i':i}) r = pool.map_async(do, arglist, callback=results.append) # evaluate "f(10)" asynchronously # Use get or wait? # r.get() r.wait() pool.close() pool.join() print results 

El uso de r.wait devuelve el resultado esperado, pero el uso de r.get genera TypeError . Como se describe en los documentos de python , use r.wait después de map_async .

Edición : tengo que modificar mi respuesta anterior. Ahora creo que el TypeError viene de SQLAlchemy. He modificado mi script para reproducir el error.

Edit 2 : Parece que el problema es que multiprocessing.pool no funciona bien si algún trabajador genera una excepción cuyo constructor requiere un parámetro (consulte también aquí ).

He modificado mi guión para resaltar esto.

 import multiprocessing class BadExc(Exception): def __init__(self, a): '''Non-optional param in the constructor.''' self.a = a class GoodExc(Exception): def __init__(self, a=None): '''Optional param in the constructor.''' self.a = a def do(kwargs): i = kwargs['i'] print i raise BadExc('a') # raise GoodExc('a') return i pool = multiprocessing.Pool(processes=5) results = [] arglist = [] for i in range(10): arglist.append({'i':i}) r = pool.map_async(do, arglist, callback=results.append) try: # set a timeout in order to be able to catch Cc r.get(1e100) except KeyboardInterrupt: pass print results 

En su caso, dado que su código genera una excepción SQLAlchemy, la única solución que se me ocurre es detectar todas las excepciones en la función do y volver a generar una Exception normal. Algo como esto:

 import multiprocessing class BadExc(Exception): def __init__(self, a): '''Non-optional param in the constructor.''' self.a = a def do(kwargs): try: i = kwargs['i'] print i raise BadExc('a') return i except Exception as e: raise Exception(repr(e)) pool = multiprocessing.Pool(processes=5) results = [] arglist = [] for i in range(10): arglist.append({'i':i}) r = pool.map_async(do, arglist, callback=results.append) try: # set a timeout in order to be able to catch Cc r.get(1e100) except KeyboardInterrupt: pass print results 

Edición 3 : entonces, parece ser un error con Python , pero las excepciones adecuadas en SQLAlchemy podrían solucionarlo: por lo tanto, también he planteado el problema con SQLAlchemy .

Como solución alternativa al problema, creo que la solución al final de la Edición 2 sería (envolver las devoluciones de llamada en try-except y re-raise).

El TypeError: ('__init__() takes at least 4 arguments (2 given) no está relacionado con el sql que está intentando ejecutar, tiene que ver con cómo está usando la API de SqlAlchemy.

El problema es que está intentando execute la execute de la clase de sesión en lugar de una instancia de esa sesión.

Prueba esto:

 session = Session() session.execute("COMMIT; BEGIN; TRUNCATE foo%s; COMMIT;") session.commit() 

De los documentos :

Se pretende que la función sessionmaker () sea llamada dentro del scope global de una aplicación, y que la clase devuelta esté disponible para el rest de la aplicación como la única clase utilizada para instanciar sesiones.

De modo que Session = sessionmaker() devuelve una nueva clase de sesión y session = Session() devuelve una instancia de esa clase a la que luego puede execute .

No sé sobre la causa de la excepción original. Sin embargo, los problemas de multiprocesamiento con las excepciones “malas” se deben realmente a cómo funciona el decapado. Creo que la clase de excepción sqlachemy está rota.

Si una clase de excepción tiene un __init__() que no llama a BaseException.__init__() (directa o indirectamente), entonces self.args probablemente no se configurará correctamente. BaseException.__reduce__() (que es utilizado por el protocolo pickle) asume que una copia de una excepción e puede ser recreada simplemente haciendo

 type(e)(*e.args) 

Por ejemplo

 >>> e = ValueError("bad value") >>> e ValueError('bad value',) >>> type(e)(*e.args) ValueError('bad value',) 

Si este invariante no se mantiene, el decapado / despegado fallará. Así que instancias de

 class BadExc(Exception): def __init__(self, a): '''Non-optional param in the constructor.''' self.a = a 

puede ser decapado, pero el resultado no puede ser descifrado:

 >>> from cPickle import loads, dumps >>> class BadExc(Exception): ... def __init__(self, a): ... '''Non-optional param in the constructor.''' ... self.a = a ... >>> loads(dumps(BadExc(1))) Traceback (most recent call last): File "", line 1, in  TypeError: ('__init__() takes exactly 2 arguments (1 given)', , ()) 

Pero instancias de

 class GoodExc1(Exception): def __init__(self, a): '''Non-optional param in the constructor.''' Exception.__init__(self, a) self.a = a 

o

 class GoodExc2(Exception): def __init__(self, a): '''Non-optional param in the constructor.''' self.args = (a,) self.a = a 

puede ser decapado / despegado con éxito.

Así que deberías pedir a los desarrolladores de sqlalchemy que corrijan sus clases de excepción. Mientras tanto, es probable que pueda usar copy_reg.pickle() para anular BaseException.__reduce__() para las clases problemáticas.

(Esto responde a la pregunta de Faheem Mitha en un comentario sobre cómo usar copy_reg para evitar las clases de excepciones rotas).

Los __init__() de las clases de excepción de SQLAlchemy parecen llamar a los métodos __init__() de su clase base, pero con argumentos diferentes. Esto engulle el escabeche.

Para personalizar el decapado de las clases de excepción de sqlalchemy, puede usar copy_reg para registrar sus propias funciones de reducción para esas clases.

Una función de reducción toma un argumento obj y devuelve un par (callable_obj, args) manera que se puede crear una copia de obj haciendo callable_obj(*args) . Por ejemplo

 class StatementError(SQLAlchemyError): def __init__(self, message, statement, params, orig): SQLAlchemyError.__init__(self, message) self.statement = statement self.params = params self.orig = orig ... 

se puede “arreglar” haciendo

 import copy_reg, sqlalchemy.exc def reduce_StatementError(e): message = e.args[0] args = (message, e.statement, e.params, e.orig) return (type(e), args) copy_reg.pickle(sqlalchemy.exc.StatementError, reduce_StatementError) 

Hay varias otras clases en sqlalchemy.exc que necesitan ser arregladas de manera similar. Pero espero que tengas la idea.


__reduce__() bien, en lugar de arreglar cada clase individualmente, es probable que solo pueda __reduce__() el __reduce__() de la clase de excepción base:

 import sqlalchemy.exc def rebuild_exc(cls, args, dic): e = Exception.__new__(cls) e.args = args e.__dict__.update(dic) return e def __reduce__(e): return (rebuild_exc, (type(e), e.args, e.__dict__)) sqlalchemy.exc.SQLAlchemyError.__reduce__ = __reduce__