Run a Process Across EMR Nodes


aws emr spark python

When ingesting a large amount of data, we usually use a parallel processing engine for efficiency, like Spark for efficiency. Taking data from a database with spark is relatively straightforward, however, in our situation, it happens that our target database is behind a private network and could only be reached via a bastion box.

Bootstrap Approach

Now, one way to do it is to set up a bootstrap action when creating the EMR, to include installation of the VPN and to actually run it (either as a service or inside a terminal multiplexer like tmux). The problem with this approach is that it will always be on. So, in case we have to connect multiple databases with different bastion box, this approach will not work.

Send Process across Nodes

Another approach that we use is by creating a program run_cluster.py that will loop through all the available nodes, connect to it and send the command that we want to run. With this approach, we could send command to start the vpn across the cluster, run our ETL, and after that send command to close the vpn.

For our setup, each node has the same private key that we store on AWS SecretManager, and the same username (ubuntu). The first part of our script simply lists all the active nodes, given the cluster_id. For this, we will use boto3 package.

import boto3
def list_emr_nodes(cluster_id):
    """List the private DNS names of EMR nodes."""
    emr_client = boto3.client('emr', region_name='ap-southeast-3')
    response = emr_client.list_instances(ClusterId=cluster_id)
    instance_groups = response['Instances']
    
    private_nodes = []
    for group in instance_groups:
        if group['Status']['State'] == 'RUNNING':
            if 'PrivateDnsName' in group:
                private_nodes.append(group['PrivateDnsName'])

    return private_nodes

Then for each active nodes, we will connect to it, with SSH via paramiko, and then run the intended command.

import paramiko
def run_command_on_node(
        private_host: str, 
        emr_user: str, 
        emr_private_key_ssm_path: str, # location for SecretManager for the private key
        command
    ):
    """Run a command on a specific EMR node."""
    # Create an SSH client
    node = paramiko.SSHClient()
    node.set_missing_host_key_policy(paramiko.AutoAddPolicy())

    # Getting EMR key from SSM
    ssm_client = boto3.client('ssm', region_name='ap-southeast-3')
    response = ssm_client.get_parameter(Name=emr_private_key_ssm_path, WithDecryption=True)
    private_pkey_str = response['Parameter']['Value']

    # create paramiko parameter
    private_pkey_str = get_key_from_ssm(emr_private_key_ssm_path)
    private_key = paramiko.RSAKey.from_private_key(io.StringIO(private_pkey_str))

    # Connect to the node
    node.connect(
        private_host,
        username=emr_user,
        pkey=private_key
    )

    # Run the SSH command directly (without any SSH-specific additions)
    stdin, stdout, stderr = node.exec_command(command)
    stdout_output = stdout.read().decode()
    stderr_output = stderr.read().decode()

    node.close()

    return stdout_output, stderr_output

With this two functions, to run something across all nodes, what we need just get the IP, loop through that IP, then send the command. Something like:

command = 'openvpn --config /mnt/vpn/rds1.conf' # command to connect VPN
emr_id = 'j-31312'
user_id = 'ubuntu'
emr_private_key_ssm_path = '/emr/node/private_key'

nodes = list_emr_nodes(my_emr_id)
for node in nodes:
    # send command to this node
    stdout, stderr = run_command_on_node(node, user_id, emr_private_key_ssm_path, command) 

    if stdout: print (stdout)
    if stderr: print (stderr)