Skip to content

[SPARK-6193] [EC2] Push group filter up to EC2 #4922

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Closed
wants to merge 3 commits into from
Closed
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
78 changes: 41 additions & 37 deletions ec2/spark_ec2.py
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@
from __future__ import with_statement

import hashlib
import itertools
import logging
import os
import os.path
Expand Down Expand Up @@ -290,13 +291,6 @@ def get_validate_spark_version(version, repo):
return version


# Check whether a given EC2 instance object is in a state we consider active,
# i.e. not terminating or terminated. We count both stopping and stopped as
# active since we can restart stopped clusters.
def is_active(instance):
return (instance.state in ['pending', 'running', 'stopping', 'stopped'])


# Source: http://aws.amazon.com/amazon-linux-ami/instance-type-matrix/
# Last Updated: 2014-06-20
# For easy maintainability, please keep this manually-inputted dictionary sorted by key.
Expand Down Expand Up @@ -564,8 +558,11 @@ def launch_cluster(conn, opts, cluster_name):
placement_group=opts.placement_group,
user_data=user_data_content)
slave_nodes += slave_res.instances
print "Launched %d slaves in %s, regid = %s" % (num_slaves_this_zone,
zone, slave_res.id)
print "Launched {s} slave{plural_s} in {z}, regid = {r}".format(
s=num_slaves_this_zone,
plural_s=('' if num_slaves_this_zone == 1 else 's'),
z=zone,
r=slave_res.id)
i += 1

# Launch or resume masters
Expand Down Expand Up @@ -612,40 +609,47 @@ def launch_cluster(conn, opts, cluster_name):
return (master_nodes, slave_nodes)


# Get the EC2 instances in an existing cluster if available.
# Returns a tuple of lists of EC2 instance objects for the masters and slaves
def get_existing_cluster(conn, opts, cluster_name, die_on_error=True):
print "Searching for existing cluster " + cluster_name + " in region " \
+ opts.region + "..."
reservations = conn.get_all_reservations()
master_nodes = []
slave_nodes = []
for res in reservations:
active = [i for i in res.instances if is_active(i)]
for inst in active:
group_names = [g.name for g in inst.groups]
if (cluster_name + "-master") in group_names:
master_nodes.append(inst)
elif (cluster_name + "-slaves") in group_names:
slave_nodes.append(inst)
if any((master_nodes, slave_nodes)):
print "Found %d master(s), %d slaves" % (len(master_nodes), len(slave_nodes))
if master_nodes != [] or not die_on_error:
return (master_nodes, slave_nodes)
else:
if master_nodes == [] and slave_nodes != []:
print >> sys.stderr, "ERROR: Could not find master in group " + cluster_name \
+ "-master" + " in region " + opts.region
else:
print >> sys.stderr, "ERROR: Could not find any existing cluster" \
+ " in region " + opts.region
"""
Get the EC2 instances in an existing cluster if available.
Returns a tuple of lists of EC2 instance objects for the masters and slaves.
"""
print "Searching for existing cluster {c} in region {r}...".format(
c=cluster_name, r=opts.region)

def get_instances(group_names):
"""
Get all non-terminated instances that belong to any of the provided security groups.

EC2 reservation filters and instance states are documented here:
http://docs.aws.amazon.com/cli/latest/reference/ec2/describe-instances.html#options
"""
reservations = conn.get_all_reservations(
filters={"instance.group-name": group_names})
instances = itertools.chain.from_iterable(r.instances for r in reservations)
return [i for i in instances if i.state not in ["shutting-down", "terminated"]]

master_instances = get_instances([cluster_name + "-master"])
slave_instances = get_instances([cluster_name + "-slaves"])

if any((master_instances, slave_instances)):
print "Found {m} master{plural_m}, {s} slave{plural_s}.".format(
m=len(master_instances),
plural_m=('' if len(master_instances) == 1 else 's'),
s=len(slave_instances),
plural_s=('' if len(slave_instances) == 1 else 's'))

if not master_instances and die_on_error:
print >> sys.stderr, \
"ERROR: Could not find a master for cluster {c} in region {r}.".format(
c=cluster_name, r=opts.region)
sys.exit(1)

return (master_instances, slave_instances)


# Deploy configuration files and run setup scripts on a newly launched
# or started EC2 cluster.


def setup_cluster(conn, master_nodes, slave_nodes, opts, deploy_ssh_key):
master = master_nodes[0].public_dns_name
if deploy_ssh_key:
Expand Down