When ingesting a large amount of data, we usually use a parallel processing engine for efficiency, like Spark for efficiency. Taking data from a database with spark is relatively straightforward, however, in our situation, it happens that our target database is behind a private network and could only be reached via a bastion box.
Bootstrap Approach
Now, one way to do it is to set up a bootstrap action when creating the EMR, to include installation of the VPN and to actually run it (either as a service or inside a terminal multiplexer like tmux). The problem with this approach is that it will always be on. So, in case we have to connect multiple databases with different bastion box, this approach will not work.
Send Process across Nodes
Another approach that we use is by creating a program run_cluster.py
that will loop through all the available nodes, connect to it and send the command that we want to run.
With this approach, we could send command to start the vpn across the cluster, run our ETL, and after that send command to close the vpn.
For our setup, each node has the same private key that we store on AWS SecretManager, and the same username (ubuntu
).
The first part of our script simply lists all the active nodes, given the cluster_id. For this, we will use boto3
package.
import boto3
def list_emr_nodes(cluster_id):
"""List the private DNS names of EMR nodes."""
emr_client = boto3.client('emr', region_name='ap-southeast-3')
response = emr_client.list_instances(ClusterId=cluster_id)
instance_groups = response['Instances']
private_nodes = []
for group in instance_groups:
if group['Status']['State'] == 'RUNNING':
if 'PrivateDnsName' in group:
private_nodes.append(group['PrivateDnsName'])
return private_nodes
Then for each active nodes, we will connect to it, with SSH via paramiko
, and then run the intended command.
import paramiko
def run_command_on_node(
private_host: str,
emr_user: str,
emr_private_key_ssm_path: str, # location for SecretManager for the private key
command
):
"""Run a command on a specific EMR node."""
# Create an SSH client
node = paramiko.SSHClient()
node.set_missing_host_key_policy(paramiko.AutoAddPolicy())
# Getting EMR key from SSM
ssm_client = boto3.client('ssm', region_name='ap-southeast-3')
response = ssm_client.get_parameter(Name=emr_private_key_ssm_path, WithDecryption=True)
private_pkey_str = response['Parameter']['Value']
# create paramiko parameter
private_pkey_str = get_key_from_ssm(emr_private_key_ssm_path)
private_key = paramiko.RSAKey.from_private_key(io.StringIO(private_pkey_str))
# Connect to the node
node.connect(
private_host,
username=emr_user,
pkey=private_key
)
# Run the SSH command directly (without any SSH-specific additions)
stdin, stdout, stderr = node.exec_command(command)
stdout_output = stdout.read().decode()
stderr_output = stderr.read().decode()
node.close()
return stdout_output, stderr_output
With this two functions, to run something across all nodes, what we need just get the IP, loop through that IP, then send the command. Something like:
command = 'openvpn --config /mnt/vpn/rds1.conf' # command to connect VPN
emr_id = 'j-31312'
user_id = 'ubuntu'
emr_private_key_ssm_path = '/emr/node/private_key'
nodes = list_emr_nodes(my_emr_id)
for node in nodes:
# send command to this node
stdout, stderr = run_command_on_node(node, user_id, emr_private_key_ssm_path, command)
if stdout: print (stdout)
if stderr: print (stderr)