Cómo iniciar y configurar un cluster EMR usando boto

Estoy tratando de iniciar un clúster y ejecutar un trabajo utilizando todo boto. Encuentro muchos ejemplos de creación de flujos de trabajo. Pero no puedo por mi vida, encontrar un ejemplo que muestre:

  1. Cómo definir el cluster a usar (por clusted_id)
  2. Cómo configurar un lanzamiento de un clúster (por ejemplo, si quiero usar instancias puntuales para algunos nodos de tareas)

¿Me estoy perdiendo de algo?

Boto y la API de EMR subyacente actualmente combinan los términos clúster y flujo de trabajo , y el flujo de trabajo está en desuso . Los considero sinónimos.

Usted crea un nuevo clúster llamando a la función boto.emr.connection.run_jobflow() . Devolverá el ID de clúster que EMR genera para usted.

Primero todas las cosas obligatorias:

 #!/usr/bin/env python import boto import boto.emr from boto.emr.instance_group import InstanceGroup conn = boto.emr.connect_to_region('us-east-1') 

Luego especificamos los grupos de instancias, incluido el precio al contado que queremos pagar por los nodos TASK:

 instance_groups = [] instance_groups.append(InstanceGroup( num_instances=1, role="MASTER", type="m1.small", market="ON_DEMAND", name="Main node")) instance_groups.append(InstanceGroup( num_instances=2, role="CORE", type="m1.small", market="ON_DEMAND", name="Worker nodes")) instance_groups.append(InstanceGroup( num_instances=2, role="TASK", type="m1.small", market="SPOT", name="My cheap spot nodes", bidprice="0.002")) 

Finalmente comenzamos un nuevo cluster:

 cluster_id = conn.run_jobflow( "Name for my cluster", instance_groups=instance_groups, action_on_failure='TERMINATE_JOB_FLOW', keep_alive=True, enable_debugging=True, log_uri="s3://mybucket/logs/", hadoop_version=None, ami_version="2.4.9", steps=[], bootstrap_actions=[], ec2_keyname="my-ec2-key", visible_to_all_users=True, job_flow_role="EMR_EC2_DefaultRole", service_role="EMR_DefaultRole") 

También podemos imprimir el ID de clúster si nos importa eso:

 print "Starting cluster", cluster_id 

Creo que la cantidad mínima de Python que lanzará un clúster EMR con boto3 es:

 import boto3 client = boto3.client('emr', region_name='us-east-1') response = client.run_job_flow( Name="Boto3 test cluster", ReleaseLabel='emr-5.12.0', Instances={ 'MasterInstanceType': 'm4.xlarge', 'SlaveInstanceType': 'm4.xlarge', 'InstanceCount': 3, 'KeepJobFlowAliveWhenNoSteps': True, 'TerminationProtected': False, 'Ec2SubnetId': 'my-subnet-id', 'Ec2KeyName': 'my-key', }, VisibleToAllUsers=True, JobFlowRole='EMR_EC2_DefaultRole', ServiceRole='EMR_DefaultRole' ) 

Notas: tendrás que crear EMR_EC2_DefaultRole y EMR_DefaultRole . La documentación de Amazon afirma que JobFlowRole y ServiceRole son opcionales, pero omitirlos no funcionó para mí. Esto podría deberse a que mi subred es una subred VPC, pero no estoy seguro.

Uso el siguiente código para crear EMR con flink instalado e incluye 3 grupos de instancias. Documento de referencia: https://boto3.amazonaws.com/v1/documentation/api/latest/reference/services/emr.html#EMR.Client.run_job_flow

 import boto3 masterInstanceType = 'm4.large' coreInstanceType = 'c3.xlarge' taskInstanceType = 'm4.large' coreInstanceNum = 2 taskInstanceNum = 2 clusterName = 'my-emr-name' emrClient = boto3.client('emr') logUri = 's3://bucket/xxxxxx/' releaseLabel = 'emr-5.17.0' #emr version instances = { 'Ec2KeyName': 'my_keyxxxxxx', 'Ec2SubnetId': 'subnet-xxxxxx', 'ServiceAccessSecurityGroup': 'sg-xxxxxx', 'EmrManagedMasterSecurityGroup': 'sg-xxxxxx', 'EmrManagedSlaveSecurityGroup': 'sg-xxxxxx', 'KeepJobFlowAliveWhenNoSteps': True, 'TerminationProtected': False, 'InstanceGroups': [{ 'InstanceRole': 'MASTER', "InstanceCount": 1, "InstanceType": masterInstanceType, "Market": "SPOT", "Name": "Master" }, { 'InstanceRole': 'CORE', "InstanceCount": coreInstanceNum, "InstanceType": coreInstanceType, "Market": "SPOT", "Name": "Core", }, { 'InstanceRole': 'TASK', "InstanceCount": taskInstanceNum, "InstanceType": taskInstanceType, "Market": "SPOT", "Name": "Core", } ] } bootstrapActions = [{ 'Name': 'Log to Cloudwatch Logs', 'ScriptBootstrapAction': { 'Path': 's3://mybucket/bootstrap_cwl.sh' } }, { 'Name': 'Custom action', 'ScriptBootstrapAction': { 'Path': 's3://mybucket/install.sh' } }] applications = [{'Name': 'Flink'}] serviceRole = 'EMR_DefaultRole' jobFlowRole = 'EMR_EC2_DefaultRole' tags = [{'Key': 'keyxxxxxx', 'Value': 'valuexxxxxx'}, {'Key': 'key2xxxxxx', 'Value': 'value2xxxxxx'} ] steps = [ { 'Name': 'Run Flink', 'ActionOnFailure': 'TERMINATE_JOB_FLOW', 'HadoopJarStep': { 'Jar': 'command-runner.jar', 'Args': ['flink', 'run', '-m', 'yarn-cluster', '-p', str(taskInstanceNum), '-yjm', '1024', '-ytm', '1024', '/home/hadoop/test-1.0-SNAPSHOT.jar' ] } }, ] response = emrClient.run_job_flow( Name=clusterName, LogUri=logUri, ReleaseLabel=releaseLabel, Instances=instances, Steps=steps, Configurations=configurations, BootstrapActions=bootstrapActions, Applications=applications, ServiceRole=serviceRole, JobFlowRole=jobFlowRole, Tags=tags )