¿Cómo escribir datos en Redshift que son el resultado de un dataframe creado en Python?

Tengo un dataframe en Python. ¿Puedo escribir estos datos en Redshift como una nueva tabla? He creado con éxito una conexión de db a Redshift y puedo ejecutar consultas de SQL simples. Ahora necesito escribirle un dataframe.

Puede usar to_sql para enviar datos a una base de datos de Redshift. He podido hacer esto usando una conexión a mi base de datos a través de un motor SQLAlchemy. Solo asegúrese de establecer el index = False en su llamada to_sql . La tabla se creará si no existe, y puede especificar si desea llamar para reemplazar la tabla, adjuntarla a la tabla o fallar si la tabla ya existe.

 from sqlalchemy import create_engine import pandas as pd conn = create_engine('postgresql://username:password@yoururl.com:5439/yourdatabase') df = pd.DataFrame([{'A': 'foo', 'B': 'green', 'C': 11},{'A':'bar', 'B':'blue', 'C': 20}]) df.to_sql('your_table', conn, index=False, if_exists='replace') 

Tenga en cuenta que es posible que deba pip install psycopg2 para conectarse a Redshift a través de SQLAlchemy.

Documentación to_sql

 import pandas_redshift as pr pr.connect_to_redshift(dbname = , host = , port = , user = , password = ) pr.connect_to_s3(aws_access_key_id = , aws_secret_access_key = , bucket = , subdirectory = ) # Write the DataFrame to S3 and then to redshift pr.pandas_to_redshift(data_frame = data_frame, redshift_table_name = 'gawronski.nba_shots_log') 

Detalles: https://github.com/agawronski/pandas_redshift

Suponiendo que tenga acceso a S3, este enfoque debería funcionar:

Paso 1: escriba el DataFrame como un csv a S3 (para eso uso AWS SDK boto3)
Paso 2: conoce las columnas, los tipos de datos y la clave / índice de su tabla de Redshift desde su DataFrame, por lo que debería poder generar un script de create table y enviarlo a Redshift para crear una tabla vacía
Paso 3: envíe un comando de copy desde su entorno de Python a Redshift para copiar datos de S3 en la tabla vacía creada en el paso 2

Funciona como un encanto cada vez.

Paso 4: antes de que su gente de almacenamiento en la nube comience a gritarle, elimine el csv de S3

Si te ves haciendo esto varias veces, envolver los cuatro pasos en una función lo mantiene ordenado.

Intenté usar pandas df.to_sql() pero fue tremendamente lento. Me tomó más de 10 minutos insertar 50 filas. Ver este número abierto (a partir de la escritura)

Intenté usar odo desde el ecosistema de las llamas (según las recomendaciones de la discusión del tema), pero me enfrenté a un ProgrammingError que no me molesté en investigar.

Finalmente lo que funcionó:

 import psycopg2 # Fill in the blanks for the conn object conn = psycopg2.connect(user = 'user', password = 'password', host = 'host', dbname = 'db', port = 666) cursor = conn.cursor() args_str = b','.join(cursor.mogrify("(%s,%s,...)", x) for x in tuple(map(tuple,np_data))) cursor.execute("insert into table (a,b,...) VALUES "+args_str.decode("utf-8")) cursor.close() flexprobi_conn.commit() flexprobi_conn.close() 

Sí, simple psycopg2 . Esto es para una matriz numpy pero la conversión de un df a un ndarray no debería ser demasiado difícil. Esto me dio alrededor de 3k filas / minuto.

Sin embargo, la solución más rápida según las recomendaciones de otros compañeros de equipo es usar el comando COPY después de volcar el dataframe como TSV / CSV en un clúster de S3 y luego copiarlo. Debería investigar esto si está copiando realmente grandes conjuntos de datos. (Voy a actualizar aquí si y cuando lo pruebe)

Para los fines de esta conversación, Postgres = RedShift tienes dos opciones:

Opción 1:

De Pandas: http://pandas.pydata.org/pandas-docs/stable/io.html#io-sql

El módulo pandas.io.sql proporciona una colección de envoltorios de consulta para facilitar la recuperación de datos y para reducir la dependencia de la API específica de la base de datos. SQLAlchemy proporciona la abstracción de la base de datos si está instalada. Además, necesitará una biblioteca de controladores para su base de datos. Ejemplos de tales controladores son psycopg2 para PostgreSQL o pymysql para MySQL.

Escribiendo DataFrames

Suponiendo que los siguientes datos se encuentren en datos de DataFrame, podemos insertarlos en la base de datos utilizando to_sql ().

 id Date Col_1 Col_2 Col_3 26 2012-10-18 X 25.7 True 42 2012-10-19 Y -12.4 False 63 2012-10-20 Z 5.73 True In [437]: data.to_sql('data', engine) 

Con algunas bases de datos, la escritura de grandes DataFrames puede dar como resultado errores debido a la superación de las limitaciones de tamaño del paquete. Esto se puede evitar configurando el parámetro chunksize al llamar a to_sql. Por ejemplo, lo siguiente escribe datos en la base de datos en lotes de 1000 filas a la vez:

 In [438]: data.to_sql('data_chunked', engine, chunksize=1000) 

opcion 2

O simplemente puede hacerlo usted mismo. Si tiene un dataframe denominado datos, simplemente realice un bucle sobre él utilizando iterrows:

 for row in data.iterrows(): 

a continuación, agregue cada fila a su base de datos. Yo usaría copiar en lugar de insertar para cada fila, ya que será mucho más rápido.

http://initd.org/psycopg/docs/usage.html#using-copy-to-and-copy-from

Solía ​​confiar en la función pandas to_sql() , pero es demasiado lenta. Recientemente he cambiado a hacer lo siguiente:

 import pandas as pd import s3fs # great module which allows you to read/write to s3 easily import sqlalchemy df = pd.DataFrame([{'A': 'foo', 'B': 'green', 'C': 11},{'A':'bar', 'B':'blue', 'C': 20}]) s3 = s3fs.S3FileSystem(anon=False) filename = 'my_s3_bucket_name/file.csv' with s3.open(filename, 'w') as f: df.to_csv(f, index=False, header=False) con = sqlalchemy.create_engine('postgresql://username:password@yoururl.com:5439/yourdatabase') # make sure the schema for mytable exists # if you need to delete the table but not the schema leave DELETE mytable # if you want to only append, I think just removing the DELETE mytable would work con.execute(""" DELETE mytable; COPY mytable from 's3://%s' iam_role 'arn:aws:iam::xxxx:role/role_name' csv;""" % filename) 

el rol debe permitir el acceso al cambio a S3. Vea aquí para más detalles.

Encontré que para un archivo de 300KB (12000×2 dataframe) esto toma 4 segundos en comparación con los 8 minutos que obtuve con la función pandas to_sql()