Skip to content

[SPARK-3398] [EC2] Have spark-ec2 intelligently wait for specific cluster states #2339

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Closed
wants to merge 6 commits into from
Closed
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
111 changes: 86 additions & 25 deletions ec2/spark_ec2.py
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@
import tempfile
import time
import urllib2
import warnings
from optparse import OptionParser
from sys import stderr
import boto
Expand Down Expand Up @@ -61,8 +62,8 @@ def parse_args():
"-s", "--slaves", type="int", default=1,
help="Number of slaves to launch (default: %default)")
parser.add_option(
"-w", "--wait", type="int", default=120,
help="Seconds to wait for nodes to start (default: %default)")
"-w", "--wait", type="int",
help="DEPRECATED (no longer necessary) - Seconds to wait for nodes to start")
parser.add_option(
"-k", "--key-pair",
help="Key pair to use on instances")
Expand Down Expand Up @@ -192,18 +193,6 @@ def get_or_make_group(conn, name):
return conn.create_security_group(name, "Spark EC2 group")


# Wait for a set of launched instances to exit the "pending" state
# (i.e. either to start running or to fail and be terminated)
def wait_for_instances(conn, instances):
while True:
for i in instances:
i.update()
if len([i for i in instances if i.state == 'pending']) > 0:
time.sleep(5)
else:
return


# Check whether a given EC2 instance object is in a state we consider active,
# i.e. not terminating or terminated. We count both stopping and stopped as
# active since we can restart stopped clusters.
Expand Down Expand Up @@ -608,14 +597,64 @@ def setup_spark_cluster(master, opts):
print "Ganglia started at http://%s:5080/ganglia" % master


# Wait for a whole cluster (masters, slaves and ZooKeeper) to start up
def wait_for_cluster(conn, wait_secs, master_nodes, slave_nodes):
print "Waiting for instances to start up..."
time.sleep(5)
wait_for_instances(conn, master_nodes)
wait_for_instances(conn, slave_nodes)
print "Waiting %d more seconds..." % wait_secs
time.sleep(wait_secs)
def is_ssh_available(host, opts):
"Checks if SSH is available on the host."
try:
with open(os.devnull, 'w') as devnull:
ret = subprocess.check_call(
ssh_command(opts) + ['-t', '-t', '-o', 'ConnectTimeout=3',
'%s@%s' % (opts.user, host), stringify_command('true')],
stdout=devnull,
stderr=devnull
)
return ret == 0
except subprocess.CalledProcessError as e:
return False


def is_cluster_ssh_available(cluster_instances, opts):
for i in cluster_instances:
if not is_ssh_available(host=i.ip_address, opts=opts):
return False
else:
return True


def wait_for_cluster_state(cluster_instances, cluster_state, opts):
"""
cluster_instances: a list of boto.ec2.instance.Instance
cluster_state: a string representing the desired state of all the instances in the cluster
value can be 'ssh-ready' or a valid value from boto.ec2.instance.InstanceState such as
'running', 'terminated', etc.
(would be nice to replace this with a proper enum: http://stackoverflow.com/a/1695250)
"""
sys.stdout.write(
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

purely subjective, but I prefer this for readability:

waiting_msg = "Waiting for all instances in cluster to enter '{s}' state.".format(s=cluster_state)
sys.stdout.write(waiting_msg)

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I can make that change, but scanning through the rest of the script it doesn't look consistent with the style that's been followed. Some examples:

Is your recommendation specific to sys.stdout.write()?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nope, I just prefer using intermediate variables rather than braces with line breaks. Somewhat of a hold over from scala. Fine either way.

"Waiting for all instances in cluster to enter '{s}' state.".format(s=cluster_state)
)
sys.stdout.flush()

num_attempts = 0

while True:
time.sleep(3 * num_attempts)

for i in cluster_instances:
s = i.update() # capture output to suppress print to screen in newer versions of boto

if cluster_state == 'ssh-ready':
if all(i.state == 'running' for i in cluster_instances) and \
is_cluster_ssh_available(cluster_instances, opts):
break
else:
if all(i.state == cluster_state for i in cluster_instances):
break

num_attempts += 1

sys.stdout.write(".")
sys.stdout.flush()

sys.stdout.write("\n")


# Get number of local disks available for a given EC2 instance type.
Expand Down Expand Up @@ -850,6 +889,16 @@ def real_main():
(opts, action, cluster_name) = parse_args()

# Input parameter validation
if opts.wait is not None:
# NOTE: DeprecationWarnings are silent in 2.7+ by default.
# To show them, run Python with the -Wdefault switch.
# See: https://docs.python.org/3.5/whatsnew/2.7.html
warnings.warn(
"This option is deprecated and has no effect. "
"spark-ec2 automatically waits as long as necessary for clusters to startup.",
DeprecationWarning
)

if opts.ebs_vol_num > 8:
print >> stderr, "ebs-vol-num cannot be greater than 8"
sys.exit(1)
Expand All @@ -872,7 +921,11 @@ def real_main():
(master_nodes, slave_nodes) = get_existing_cluster(conn, opts, cluster_name)
else:
(master_nodes, slave_nodes) = launch_cluster(conn, opts, cluster_name)
wait_for_cluster(conn, opts.wait, master_nodes, slave_nodes)
wait_for_cluster_state(
cluster_instances=(master_nodes + slave_nodes),
cluster_state='ssh-ready',
opts=opts
)
setup_cluster(conn, master_nodes, slave_nodes, opts, True)

elif action == "destroy":
Expand Down Expand Up @@ -901,7 +954,11 @@ def real_main():
else:
group_names = [opts.security_group_prefix + "-master",
opts.security_group_prefix + "-slaves"]

wait_for_cluster_state(
cluster_instances=(master_nodes + slave_nodes),
cluster_state='terminated',
opts=opts
)
attempt = 1
while attempt <= 3:
print "Attempt %d" % attempt
Expand Down Expand Up @@ -987,7 +1044,11 @@ def real_main():
for inst in master_nodes:
if inst.state not in ["shutting-down", "terminated"]:
inst.start()
wait_for_cluster(conn, opts.wait, master_nodes, slave_nodes)
wait_for_cluster_state(
cluster_instances=(master_nodes + slave_nodes),
cluster_state='ssh-ready',
opts=opts
)
setup_cluster(conn, master_nodes, slave_nodes, opts, False)

else:
Expand Down