¿Cómo usar la fuente JDBC para escribir y leer datos en (Py) Spark?

El objective de esta pregunta es documentar:

Con pequeños cambios, estos métodos deberían funcionar con otros lenguajes compatibles, incluidos Scala y R.

Escribiendo datos

  1. Incluya el controlador JDBC aplicable cuando envíe la aplicación o inicie el shell. Puedes usar por ejemplo --packages :

     bin/pyspark --packages group:name:version 

    o combinando driver-class-path y jars

     bin/pyspark --driver-class-path $PATH_TO_DRIVER_JAR --jars $PATH_TO_DRIVER_JAR 

    Estas propiedades también se pueden establecer utilizando la variable de entorno PYSPARK_SUBMIT_ARGS antes de que se inicie la instancia de JVM o usando conf/spark-defaults.conf para establecer spark.jars.packages o spark.jars / spark.driver.extraClassPath .

  2. Elija el modo deseado. El escritor Spark JDBC soporta los siguientes modos:

    • append : append contenido de esto: clase: DataFrame a los datos existentes.
    • overwrite : sobrescribe los datos existentes.
    • ignore : ignore silenciosamente esta operación si ya existen datos.
    • error (caso predeterminado): lanza una excepción si los datos ya existen.

    Upserts u otras modificaciones de grano fino no son compatibles

     mode = ... 
  3. Prepare JDBC URI, por ejemplo:

     # You can encode credentials in URI or pass # separately using properties argument # of jdbc method or options url = "jdbc:postgresql://localhost/foobar" 
  4. (Opcional) Crea un diccionario de argumentos JDBC.

     properties = { "user": "foo", "password": "bar" } 

    properties / options también se pueden usar para establecer las propiedades de conexión JDBC compatibles .

  5. Utilice DataFrame.write.jdbc

     df.write.jdbc(url=url, table="baz", mode=mode, properties=properties) 

    para guardar los datos (consulte pyspark.sql.DataFrameWriter para obtener detalles).

Problemas conocidos :

  • No se puede encontrar el controlador adecuado cuando se ha incluido el --packages usando --packages ( java.sql.SQLException: No suitable driver found for jdbc: ... )

    Suponiendo que no haya una discrepancia en la versión del controlador para resolver esto, puede agregar una clase de driver a las properties . Por ejemplo:

     properties = { ... "driver": "org.postgresql.Driver" } 
  • el uso de df.write.format("jdbc").options(...).save() puede resultar en:

    java.lang.RuntimeException: org.apache.spark.sql.execution.datasources.jdbc.DefaultSource no permite crear una tabla como selección.

    Solución desconocida.

  • en Pyspark 1.3 puedes intentar llamar directamente al método Java:

     df._jdf.insertIntoJDBC(url, "baz", True) 

Lectura de datos

  1. Siga los pasos 1-4 de Escribir datos
  2. Utilice sqlContext.read.jdbc :

     sqlContext.read.jdbc(url=url, table="baz", properties=properties) 

    o sqlContext.read.format("jdbc") :

     (sqlContext.read.format("jdbc") .options(url=url, dbtable="baz", **properties) .load()) 

Problemas conocidos y trampas :

  • No se puede encontrar el controlador adecuado – ver: Escritura de datos
  • Spark SQL admite el empuje de predicados con fonts JDBC, aunque no todos los predicados pueden ser rechazados. Tampoco se delegan límites ni agregaciones. Una posible solución es reemplazar el argumento table / table con una subconsulta válida. Ver por ejemplo:

    • ¿El empuje de predicado de chispa funciona con JDBC?
    • Más de una hora para ejecutar pyspark.sql.DataFrame.take (4)
    • ¿Cómo usar la consulta SQL para definir la tabla en la tabla de bases de datos?
  • De forma predeterminada, los orígenes de datos JDBC cargan los datos de forma secuencial utilizando un solo hilo ejecutor. Para asegurar la carga de datos distribuidos puede:

    • Proporcione la column partición (debe ser IntegeType ), lowerBound , upperBound , numPartitions .
    • Proporcione una lista de predicados de predicates mutuamente exclusivos, uno para cada partición deseada.

    Ver:

    • Particionando en chispa mientras se lee desde RDBMS a través de JDBC ,
    • ¿Cómo optimizar la partición cuando se migran datos desde la fuente JDBC? ,
    • ¿Cómo mejorar el rendimiento para trabajos Spark lentos utilizando DataFrame y la conexión JDBC?
    • ¿Cómo particionar Spark RDD al importar Postgres usando JDBC?
  • En un modo distribuido (con columna de partición o predicados), cada ejecutor opera en su propia transacción. Si la base de datos de origen se modifica al mismo tiempo, no hay garantía de que la vista final sea coherente.

Dónde encontrar conductores adecuados:

  • Repositorio de Maven (para obtener las coordenadas requeridas para los --packages seleccione la versión deseada y copie los datos de una pestaña Gradle en un formulario compile-group:name:version sustituye los campos respectivos) o el repository central de Maven :

    • PostgreSQL
    • MySQL

Otras opciones

Dependiendo de la base de datos, puede existir una fuente especializada, y se prefiere en algunos casos:

  • Greenplum – Pivotal Greenplum-Spark Connector
  • Apache Phoenix – Apache Spark Plugin
  • Microsoft SQL Server: conector Spark para bases de datos SQL de Azure y SQL Server
  • Amazon Redshift – Conector de Redshift de Databricks (versiones actuales disponibles solo en un tiempo de ejecución patentado de Databricks. Versión descontinuada de código abierto, disponible en GitHub ).

Descargue el controlador mysql-connector-java y manténgalo en la carpeta spark jar, observe el siguiente código de python escribiendo datos en “acotr1”, tenemos que crear la estructura de tabla acotr1 en la base de datos mysql

  spark = SparkSession.builder.appName("prasadad").master('local').config('spark.driver.extraClassPath','D:\spark-2.1.0-bin-hadoop2.7\jars\mysql-connector-java-5.1.41-bin.jar').getOrCreate() sc = spark.sparkContext from pyspark.sql import SQLContext sqlContext = SQLContext(sc) df = sqlContext.read.format("jdbc").options(url="jdbc:mysql://localhost:3306/sakila",driver="com.mysql.jdbc.Driver",dbtable="actor",user="root",password="Ramyam01").load() mysql_url="jdbc:mysql://localhost:3306/sakila?user=root&password=Ramyam01" df.write.jdbc(mysql_url,table="actor1",mode="append") 

Consulte este enlace para descargar el archivo jdbc para postgres y siga los pasos para descargar el archivo jar

https://jaceklaskowski.gitbooks.io/mastering-apache-spark/exercises/spark-exercise-dataframe-jdbc-postgresql.html archivo jar se descargará en la ruta de esta manera. “/home/anand/.ivy2/jars/org.postgresql_postgresql-42.1.1.jar”

Si tu versión de chispa es 2

 from pyspark.sql import SparkSession spark = SparkSession.builder .appName("sparkanalysis") .config("spark.driver.extraClassPath", "/home/anand/.ivy2/jars/org.postgresql_postgresql42.1.1.jar") .getOrCreate() //for localhost database// pgDF = spark.read \ .format("jdbc") \ .option("url", "jdbc:postgresql:postgres") \ .option("dbtable", "public.user_emp_tab") \ .option("user", "postgres") \ .option("password", "Jonsnow@100") \ .load() print(pgDF) pgDF.filter(pgDF["user_id"]>5).show() 

guarde el archivo como python y ejecute “python respectivos nombre de archivo.py”