piping postgres COPY en python con psycopg2

Estoy escribiendo un script para hacer una copia de algunos datos entre dos máquinas en la misma red usando psycopg2. Estoy reemplazando un viejo y feo bash que hace la copia con

psql -c -h remote.host "COPY table TO STDOUT" | psql -c "COPY table FROM STDIN" 

Esto parece ser la forma más simple y eficiente de hacer la copia. Es fácil de replicar en Python con un stringIO o un archivo temporal, así:

 buf = StringIO() from_curs = from_conn.cursor() to_curs = to_conn.cursor() from_curs.copy_expert("COPY table TO STDOUT", buf) buf.seek(0, os.SEEK_SET) to_curs.copy_expert("COPY table FROM STDIN", buf) 

… pero eso implica guardar todos los datos en el disco / en la memoria.

¿Alguien ha descubierto una manera de imitar el comportamiento de una canalización Unix en una copia como esta? Parece que no puedo encontrar un objeto de unix-pipe que no involucre POpen. Tal vez la mejor solución sea usar POpen y subprocess, después de todo.

Tendrá que poner una de sus llamadas en un hilo separado. Me di cuenta de que puedes usar os.pipe () , lo que hace que el rest sea bastante sencillo:

 #!/usr/bin/python import psycopg2 import os import threading fromdb = psycopg2.connect("dbname=from_db") todb = psycopg2.connect("dbname=to_db") r_fd, w_fd = os.pipe() def copy_from(): cur = todb.cursor() cur.copy_from(os.fdopen(r_fd), 'table') cur.close() todb.commit() to_thread = threading.Thread(target=copy_from) to_thread.start() cur = fromdb.cursor() write_f = os.fdopen(w_fd, 'w') cur.copy_to(write_f, 'table') write_f.close() # or deadlock... to_thread.join() 

Podría usar un deque que haya subclasificado para admitir la lectura y la escritura:

 from collections import deque from Exceptions import IndexError class DequeBuffer(deque): def write(self, data): self.append(data) def read(self): try: return self.popleft() except IndexError: return '' buf = DequeBuffer() 

Si el lector es mucho más rápido que el escritor y la tabla es grande, el deque seguirá siendo grande, pero será más pequeño que almacenar todo el contenido.

Además, no estoy seguro de que el return '' cuando el deque está vacío es seguro, en lugar de volver a intentarlo hasta que no esté vacío, pero supongo que lo es. Déjame saber si funciona.

Recuerde que debe estar en del buf cuando esté seguro de que la copia está hecha, especialmente si el script no está saliendo en ese momento.