Tubos múltiples en subproceso.

Estoy tratando de usar Sailfish, que toma múltiples archivos fastq como argumentos, en una tubería de ruffus. Ejecuto Sailfish usando el módulo de subproceso en python, pero <() en la llamada de subproceso no funciona incluso cuando configuro shell=True .

Este es el comando que quiero ejecutar usando python:

 sailfish quant [options] -1 <(cat sample1a.fastq sample1b.fastq) -2 <(cat sample2a.fastq sample2b.fastq) -o [output_file] 

o (preferiblemente):

 sailfish quant [options] -1 <(gunzip sample1a.fastq.gz sample1b.fastq.gz) -2 <(gunzip sample2a.fastq.gz sample2b.fastq.gz) -o [output_file] 

Una generalización:

 someprogram <(someprocess) <(someprocess) 

¿Cómo voy a hacer esto en python? ¿Es el subproceso el enfoque correcto?

Para emular la sustitución del proceso de bash :

 #!/usr/bin/env python from subprocess import check_call check_call('someprogram <(someprocess) <(anotherprocess)', shell=True, executable='/bin/bash') 

En Python, podrías usar tuberías con nombre:

 #!/usr/bin/env python from subprocess import Popen with named_pipes(n=2) as paths: someprogram = Popen(['someprogram'] + paths) processes = [] for path, command in zip(paths, ['someprocess', 'anotherprocess']): with open(path, 'wb', 0) as pipe: processes.append(Popen(command, stdout=pipe, close_fds=True)) for p in [someprogram] + processes: p.wait() 

donde named_pipes(n) es:

 import os import shutil import tempfile from contextlib import contextmanager @contextmanager def named_pipes(n=1): dirname = tempfile.mkdtemp() try: paths = [os.path.join(dirname, 'named_pipe' + str(i)) for i in range(n)] for path in paths: os.mkfifo(path) yield paths finally: shutil.rmtree(dirname) 

Otra forma más preferible (sin necesidad de crear una entrada con nombre en el disco) para implementar la sustitución del proceso bash es usar los nombres de archivo /dev/fd/N (si están disponibles) como lo sugiere @Dunes . En FreeBSD, fdescfs(5) ( /dev/fd/# ) crea entradas para todos los descriptores de archivos abiertos por el proceso . Para probar la disponibilidad, ejecute:

 $ test -r /dev/fd/3 3 

Si falla; intente vincular /dev/fd a proc(5) como se hace en algunos Linux:

 $ ln -s /proc/self/fd /dev/fd 

Aquí está la implementación basada en /dev/fd de someprogram <(someprocess) <(anotherprocess) comando bash:

 #!/usr/bin/env python3 from contextlib import ExitStack from subprocess import CalledProcessError, Popen, PIPE def kill(process): if process.poll() is None: # still running process.kill() with ExitStack() as stack: # for proper cleanup processes = [] for command in [['someprocess'], ['anotherprocess']]: # start child processes processes.append(stack.enter_context(Popen(command, stdout=PIPE))) stack.callback(kill, processes[-1]) # kill on someprogram exit fds = [p.stdout.fileno() for p in processes] someprogram = stack.enter_context( Popen(['someprogram'] + ['/dev/fd/%d' % fd for fd in fds], pass_fds=fds)) for p in processes: # close pipes in the parent p.stdout.close() # exit stack: wait for processes if someprogram.returncode != 0: # errors shouldn't go unnoticed raise CalledProcessError(someprogram.returncode, someprogram.args) 

Nota: en mi máquina Ubuntu, el código de subprocess solo funciona en Python 3.4+, a pesar de que pass_fds está disponible desde Python 3.2.

Si bien JF Sebastian ha proporcionado una respuesta utilizando tuberías con nombre, es posible hacerlo con tuberías anónimas.

 import shlex from subprocess import Popen, PIPE inputcmd0 = "zcat hello.gz" # gzipped file containing "hello" inputcmd1 = "zcat world.gz" # gzipped file containing "world" def get_filename(file_): return "/dev/fd/{}".format(file_.fileno()) def get_stdout_fds(*processes): return tuple(p.stdout.fileno() for p in processes) # setup producer processes inputproc0 = Popen(shlex.split(inputcmd0), stdout=PIPE) inputproc1 = Popen(shlex.split(inputcmd1), stdout=PIPE) # setup consumer process # pass input processes pipes by "filename" eg. /dev/fd/5 cmd = "cat {file0} {file1}".format(file0=get_filename(inputproc0.stdout), file1=get_filename(inputproc1.stdout)) print("command is:", cmd) # pass_fds argument tells Popen to let the child process inherit the pipe's fds someprogram = Popen(shlex.split(cmd), stdout=PIPE, pass_fds=get_stdout_fds(inputproc0, inputproc1)) output, error = someprogram.communicate() for p in [inputproc0, inputproc1, someprogram]: p.wait() assert output == b"hello\nworld\n"