¿Se pueden usar los eventos SQLAlchemy para actualizar un caché de datos desnormalizados?

Por motivos de rendimiento, tengo una base de datos desnormalizada en la que algunas tablas contienen datos que se han agregado de muchas filas en otras tablas. Me gustaría mantener este caché de datos desnormalizados mediante el uso de eventos SQLAlchemy . Como ejemplo, supongamos que estaba escribiendo software de foro y quería que cada Thread tuviera una columna que siguiera el recuento combinado de palabras de todos los comentarios en el thread para poder mostrar esa información de manera eficiente:

 class Thread(Base): id = Column(UUID, primary_key=True, default=uuid.uuid4) title = Column(UnicodeText(), nullable=False) word_count = Column(Integer, nullable=False, default=0) class Comment(Base): id = Column(UUID, primary_key=True, default=uuid.uuid4) thread_id = Column(UUID, ForeignKey('thread.id', ondelete='CASCADE'), nullable=False) thread = relationship('Thread', backref='comments') message = Column(UnicodeText(), nullable=False) @property def word_count(self): return len(self.message.split()) 

Por lo tanto, cada vez que se inserta un comentario (para simplificar, digamos que los comentarios nunca se editan o eliminan), queremos actualizar el atributo word_count en el objeto Thread asociado. Así que me gustaría hacer algo como

 def after_insert(mapper, connection, target): thread = target.thread thread.word_count = sum(c.word_count for c in thread.comments) print "updated cached word count to", thread.word_count event.listen(Comment, "after_insert", after_insert) 

Por lo tanto, cuando inserto un Comment , puedo ver la activación del evento y ver que se ha calculado correctamente el recuento de palabras, pero ese cambio no se guarda en la fila de Thread de la base de datos. No veo advertencias acerca de otras tablas actualizadas en la documentación de after_insert , aunque sí veo algunas advertencias en algunas de las otras, como after_delete .

Entonces, ¿hay una forma compatible de hacer esto con los eventos de SQLAlchemy? Ya estoy usando eventos SQLAlchemy para muchas otras cosas, así que me gustaría hacer todo de esa manera en lugar de tener que escribir activadores de bases de datos.

El evento after_insert () es una forma de hacer esto, y es posible que observe que se pasa un objeto SQLAlchemy Connection , en lugar de una Session como es el caso con otros eventos relacionados con el vaciado. Los eventos de descarga de nivel de asignador están destinados a ser utilizados normalmente para invocar SQL directamente en la Connection dada:

 @event.listens_for(Comment, "after_insert") def after_insert(mapper, connection, target): thread_table = Thread.__table__ thread = target.thread connection.execute( thread_table.update(). where(thread_table.c.id==thread.id). values(word_count=sum(c.word_count for c in thread.comments)) ) print "updated cached word count to", thread.word_count 

lo que es notable aquí es que invocar una instrucción UPDATE directamente también es mucho más eficaz que ejecutar ese cambio de atributo nuevamente a través de todo el proceso de la unidad de trabajo.

Sin embargo, un evento como after_insert () no es realmente necesario aquí, ya que sabemos el valor de “word_count” antes de que ocurra la descarga. En realidad, lo sabemos, ya que los objetos Comment y Thread están asociados entre sí, y también podríamos mantener Thread.word_count completamente actualizado en la memoria en todo momento utilizando eventos de atributos:

 def _word_count(msg): return len(msg.split()) @event.listens_for(Comment.message, "set") def set(target, value, oldvalue, initiator): if target.thread is not None: target.thread.word_count += (_word_count(value) - _word_count(oldvalue)) @event.listens_for(Comment.thread, "set") def set(target, value, oldvalue, initiator): # the new Thread, if any if value is not None: value.word_count += _word_count(target.message) # the old Thread, if any if oldvalue is not None: oldvalue.word_count -= _word_count(target.message) 

La gran ventaja de este método es que tampoco hay necesidad de iterar a través de thread.comments, que para una colección descargada significa que se emite otro SELECT.

otro método es hacerlo en before_flush (). A continuación se encuentra una versión rápida y sucia, que se puede refinar para analizar más cuidadosamente lo que ha cambiado para determinar si el valor word_count necesita actualizarse o no:

 @event.listens_for(Session, "before_flush") def before_flush(session, flush_context, instances): for obj in session.new | session.dirty: if isinstance(obj, Thread): obj.word_count = sum(c.word_count for c in obj.comments) elif isinstance(obj, Comment): obj.thread.word_count = sum(c.word_count for c in obj.comments) 

Me gustaría utilizar el método de evento de atributo, ya que es el más eficaz y actualizado.

Puede hacer esto con aggregated columnas aggregated SQLAlchemy-Utils: http://sqlalchemy-utils.readthedocs.org/en/latest/aggregates.html