Lee archivos ZIP de S3 sin descargar el archivo completo

Tenemos archivos ZIP de 5 a 10 GB de tamaño. El archivo ZIP típico tiene entre 5 y 10 archivos internos, cada uno de 1 a 5 GB de tamaño sin comprimir.

Tengo un buen conjunto de herramientas de Python para leer estos archivos. Básicamente, puedo abrir un nombre de archivo y, si hay un archivo ZIP, las herramientas buscan en el archivo ZIP y luego abren el archivo comprimido. Todo es bastante transparente.

Quiero almacenar estos archivos en Amazon S3 como archivos comprimidos. Puedo obtener rangos de archivos S3, por lo que debería ser posible obtener el directorio central ZIP (es el final del archivo, así que solo puedo leer los últimos 64 KB), encontrar el componente que quiero, descargarlo y transmitirlo directamente a El proceso de llamada.

Entonces, mi pregunta es, ¿cómo lo hago a través de la API ZipFile de Python? No se documenta cómo reemplazar el transporte del sistema de archivos con un objeto arbitrario que admita la semántica POSIX. ¿Es esto posible sin reescribir el módulo?

Así que aquí está el código que le permite abrir un archivo en Amazon S3 como si fuera un archivo normal. Tenga en cuenta que uso el comando aws , en lugar del módulo Python de boto3 . (No tengo acceso a boto3). Puedes abrir el archivo y buscarlo. El archivo se almacena en caché localmente. Si abre el archivo con la API de ZipFile de Python y es un ZipFile, puede leer partes individuales. Sin embargo, no se puede escribir porque S3 no admite escrituras parciales.

Por separado, implemento s3open() , que puede abrir un archivo para leer o escribir, pero no implementa la interfaz de búsqueda, que es requerida por ZipFile.

 from urllib.parse import urlparse from subprocess import run,Popen,PIPE import copy import json import os import tempfile # Tools for reading and write files from Amazon S3 without boto or boto3 # http://boto.cloudhackers.com/en/latest/s3_tut.html # but it is easier to use the aws cli, since it's configured to work. def s3open(path, mode="r", encoding=None): """ Open an s3 file for reading or writing. Can handle any size, but cannot seek. We could use boto. http://boto.cloudhackers.com/en/latest/s3_tut.html but it is easier to use the aws cli, since it is present and more likely to work. """ from subprocess import run,PIPE,Popen if "b" in mode: assert encoding == None else: if encoding==None: encoding="utf-8" assert 'a' not in mode assert '+' not in mode if "r" in mode: p = Popen(['aws','s3','cp',path,'-'],stdout=PIPE,encoding=encoding) return p.stdout elif "w" in mode: p = Popen(['aws','s3','cp','-',path],stdin=PIPE,encoding=encoding) return p.stdin else: raise RuntimeError("invalid mode:{}".format(mode)) CACHE_SIZE=4096 # big enough for front and back caches MAX_READ=65536*16 debug=False class S3File: """Open an S3 file that can be seeked. This is done by caching to the local file system.""" def __init__(self,name,mode='rb'): self.name = name self.url = urlparse(name) if self.url.scheme != 's3': raise RuntimeError("url scheme is {}; expecting s3".format(url.scheme)) self.bucket = self.url.netloc self.key = self.url.path[1:] self.fpos = 0 self.tf = tempfile.NamedTemporaryFile() cmd = ['aws','s3api','list-objects','--bucket',self.bucket,'--prefix',self.key,'--output','json'] data = json.loads(Popen(cmd,encoding='utf8',stdout=PIPE).communicate()[0]) file_info = data['Contents'][0] self.length = file_info['Size'] self.ETag = file_info['ETag'] # Load the caches self.frontcache = self._readrange(0,CACHE_SIZE) # read the first 1024 bytes and get length of the file if self.length > CACHE_SIZE: self.backcache_start = self.length-CACHE_SIZE if debug: print("backcache starts at {}".format(self.backcache_start)) self.backcache = self._readrange(self.backcache_start,CACHE_SIZE) else: self.backcache = None def _readrange(self,start,length): # This is gross; we copy everything to the named temporary file, rather than a pipe # because the pipes weren't showing up in /dev/fd/? # We probably want to cache also... That's coming cmd = ['aws','s3api','get-object','--bucket',self.bucket,'--key',self.key,'--output','json', '--range','bytes={}-{}'.format(start,start+length-1),self.tf.name] if debug:print(cmd) data = json.loads(Popen(cmd,encoding='utf8',stdout=PIPE).communicate()[0]) if debug:print(data) self.tf.seek(0) # go to the beginning of the data just read return self.tf.read(length) # and read that much def __repr__(self): return "FakeFile".format(self.name,self.url) def read(self,length=-1): # If length==-1, figure out the max we can read to the end of the file if length==-1: length = min(MAX_READ, self.length - self.fpos + 1) if debug: print("read: fpos={} length={}".format(self.fpos,length)) # Can we satisfy from the front cache? if self.fpos < CACHE_SIZE and self.fpos+length < CACHE_SIZE: if debug:print("front cache") buf = self.frontcache[self.fpos:self.fpos+length] self.fpos += len(buf) if debug:print("return 1: buf=",buf) return buf # Can we satisfy from the back cache? if self.backcache and (self.length - CACHE_SIZE < self.fpos): if debug:print("back cache") buf = self.backcache[self.fpos - self.backcache_start:self.fpos - self.backcache_start + length] self.fpos += len(buf) if debug:print("return 2: buf=",buf) return buf buf = self._readrange(self.fpos, length) self.fpos += len(buf) if debug:print("return 3: buf=",buf) return buf def seek(self,offset,whence=0): if debug:print("seek({},{})".format(offset,whence)) if whence==0: self.fpos = offset elif whence==1: self.fpos += offset elif whence==2: self.fpos = self.length + offset else: raise RuntimeError("whence={}".format(whence)) if debug:print(" ={} (self.length={})".format(self.fpos,self.length)) def tell(self): return self.fpos def write(self): raise RuntimeError("Write not supported") def flush(self): raise RuntimeError("Flush not supported") def close(self): return 

Este es un enfoque que no necesita recuperar todo el archivo (la versión completa está disponible aquí ).

Sin embargo, sí requiere boto (o boto3 ) (a menos que pueda imitar los GET rango a través de AWS CLI; lo que creo que también es bastante posible).

 import sys import zlib import zipfile import io import boto from boto.s3.connection import OrdinaryCallingFormat # range-fetches a S3 key def fetch(key, start, len): end = start + len - 1 return key.get_contents_as_string(headers={"Range": "bytes=%d-%d" % (start, end)}) # parses 2 or 4 little-endian bits into their corresponding integer value def parse_int(bytes): val = ord(bytes[0]) + (ord(bytes[1]) << 8) if len(bytes) > 3: val += (ord(bytes[2]) << 16) + (ord(bytes[3]) << 24) return val """ bucket: name of the bucket key: path to zipfile inside bucket entry: pathname of zip entry to be retrieved (path/to/subdir/file.name) """ # OrdinaryCallingFormat prevents certificate errors on bucket names with dots # https://stackoverflow.com/questions/51604689/read-zip-files-from-amazon-s3-using-boto3-and-python#51605244 _bucket = boto.connect_s3(calling_format=OrdinaryCallingFormat()).get_bucket(bucket) _key = _bucket.get_key(key) # fetch the last 22 bytes (end-of-central-directory record; assuming the comment field is empty) size = _key.size eocd = fetch(_key, size - 22, 22) # start offset and size of the central directory cd_start = parse_int(eocd[16:20]) cd_size = parse_int(eocd[12:16]) # fetch central directory, append EOCD, and open as zipfile! cd = fetch(_key, cd_start, cd_size) zip = zipfile.ZipFile(io.BytesIO(cd + eocd)) for zi in zip.filelist: if zi.filename == entry: # local file header starting at file name length + file content # (so we can reliably skip file name and extra fields) # in our "mock" zipfile, `header_offset`s are negative (probably because the leading content is missing) # so we have to add to it the CD start offset (`cd_start`) to get the actual offset file_head = fetch(_key, cd_start + zi.header_offset + 26, 4) name_len = parse_int(file_head[0:2]) extra_len = parse_int(file_head[2:4]) content = fetch(_key, cd_start + zi.header_offset + 30 + name_len + extra_len, zi.compress_size) # now `content` has the file entry you were looking for! # you should probably decompress it in context before passing it to some other program if zi.compress_type == zipfile.ZIP_DEFLATED: print zlib.decompressobj(-15).decompress(content) else: print content break 

En su caso, es posible que deba escribir el contenido recuperado en un archivo local (debido al gran tamaño), a menos que el uso de la memoria no sea una preocupación.