Skip to content

Commit aa6783f

Browse files
authored
[SPARK-478] Make driver failover_timeout configurable, to allow for temporary disconnection between driver and Mesos master. (apache#161)
1 parent d694c6e commit aa6783f

File tree

4 files changed

+156
-19
lines changed

4 files changed

+156
-19
lines changed

tests/jobs/python/long_running.py

Lines changed: 33 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,33 @@
1+
from __future__ import print_function
2+
3+
4+
import sys
5+
import time
6+
from pyspark.sql import SparkSession
7+
8+
9+
if __name__ == "__main__":
10+
"""
11+
Usage: long_running [partitions] [run_time_sec]
12+
"""
13+
partitions = int(sys.argv[1]) if len(sys.argv) > 1 else 5
14+
run_time_sec = int(sys.argv[2]) if len(sys.argv) > 2 else 600
15+
16+
spark = SparkSession \
17+
.builder \
18+
.appName("Long-Running Spark Job") \
19+
.getOrCreate()
20+
21+
n = 100000 * partitions
22+
data = spark.sparkContext.parallelize(range(1, n + 1), partitions)
23+
24+
def processPartition(partition):
25+
"""Sleep for run_time_sec"""
26+
print('Start processing partition')
27+
time.sleep(run_time_sec)
28+
print('Done processing partition')
29+
30+
data.foreachPartition(processPartition)
31+
print('Job completed successfully')
32+
33+
spark.stop()

tests/test_recovery.py

Lines changed: 89 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,89 @@
1+
import logging
2+
import os
3+
import pytest
4+
import re
5+
import shakedown
6+
import time
7+
8+
import utils
9+
from utils import SPARK_PACKAGE_NAME
10+
11+
12+
LOGGER = logging.getLogger(__name__)
13+
THIS_DIR = os.path.dirname(os.path.abspath(__file__))
14+
LONG_RUNNING_FW_NAME = "Long-Running Spark Job"
15+
LONG_RUNNING_FW_NUM_TASKS = 1
16+
MASTER_CONNECTION_TIMEOUT_SEC = 15 * 60
17+
LONG_RUNNING_RUN_TIME_SEC = MASTER_CONNECTION_TIMEOUT_SEC + (15 * 60)
18+
19+
20+
def setup_module(module):
21+
utils.require_spark()
22+
23+
24+
def teardown_module(module):
25+
shakedown.uninstall_package_and_wait(SPARK_PACKAGE_NAME)
26+
27+
28+
@pytest.mark.skip(reason="Waiting for upstream change, https://issues.apache.org/jira/browse/SPARK-21419")
29+
@pytest.mark.recovery
30+
def test_disconnect_from_master():
31+
python_script_path = os.path.join(THIS_DIR, 'jobs', 'python', 'long_running.py')
32+
python_script_url = utils.upload_file(python_script_path)
33+
task_id = utils.submit_job(python_script_url,
34+
"{} {}".format(LONG_RUNNING_FW_NUM_TASKS, LONG_RUNNING_RUN_TIME_SEC),
35+
["--conf", "spark.mesos.driver.failoverTimeout=1800",
36+
"--conf", "spark.cores.max=1"])
37+
38+
# Wait until executor is running
39+
LOGGER.info("Waiting for executor task to be RUNNING...")
40+
shakedown.wait_for(lambda: utils.is_service_ready(LONG_RUNNING_FW_NAME, LONG_RUNNING_FW_NUM_TASKS),
41+
ignore_exceptions=False,
42+
timeout_seconds=600)
43+
44+
# Block the driver's connection to Mesos master
45+
framework_info = shakedown.get_service(LONG_RUNNING_FW_NAME)
46+
(driver_host, port) = _parse_fw_pid_host_port(framework_info["pid"])
47+
_block_master_connection(driver_host, port)
48+
49+
# The connection will timeout after 15 minutes of inactivity.
50+
# Add 5 minutes to make sure the master has detected the disconnection.
51+
# The framework will be considered disconnected => failover_timeout kicks in.
52+
LOGGER.info("Waiting {} seconds for connection with master to timeout...".format(MASTER_CONNECTION_TIMEOUT_SEC))
53+
time.sleep(MASTER_CONNECTION_TIMEOUT_SEC + 5 * 60)
54+
55+
# Restore the connection. The driver should reconnect.
56+
_unblock_master_connection(driver_host)
57+
58+
# The executor and driver should finish.
59+
utils.check_job_output(task_id, "Job completed successfully")
60+
61+
# Due to https://issues.apache.org/jira/browse/MESOS-5180, the driver does not re-register, so
62+
# teardown won't occur until the failover_timeout period ends. The framework remains "Inactive".
63+
# Uncomment when the bug is fixed:
64+
#_wait_for_completed_framework(LONG_RUNNING_FW_NAME, 60)
65+
66+
67+
def _parse_fw_pid_host_port(pid):
68+
# Framework pid looks like: "scheduler-cd28f2eb-3aec-4060-a731-f5be1f5186c4@10.0.1.7:37085"
69+
regex = r"([^@]+)@([^:]+):(\d+)"
70+
match = re.search(regex, pid)
71+
return match.group(2), int(match.group(3))
72+
73+
74+
def _block_master_connection(host, port):
75+
LOGGER.info("Blocking connection with master")
76+
shakedown.network.save_iptables(host)
77+
# Reject incoming packets from master
78+
shakedown.network.run_iptables(host, '-I INPUT -p tcp --dport {} -j REJECT'.format(port))
79+
80+
81+
def _unblock_master_connection(host):
82+
LOGGER.info("Unblocking connection with master")
83+
shakedown.network.restore_iptables(host)
84+
85+
86+
def _wait_for_completed_framework(fw_name, timeout_seconds):
87+
shakedown.wait_for(lambda: utils.is_framework_completed(fw_name),
88+
ignore_exceptions=False,
89+
timeout_seconds=timeout_seconds)

tests/test_spark.py

Lines changed: 5 additions & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -26,7 +26,7 @@ def setup_module(module):
2626
if utils.hdfs_enabled():
2727
utils.require_hdfs()
2828
utils.require_spark()
29-
_upload_file(os.environ["SCALA_TEST_JAR_PATH"])
29+
utils.upload_file(os.environ["SCALA_TEST_JAR_PATH"])
3030

3131

3232
def teardown_module(module):
@@ -42,7 +42,7 @@ def test_jar():
4242
spark_job_runner_args = '{} dcos \\"*\\" spark:only 2 --auth-token={}'.format(
4343
master_url,
4444
shakedown.dcos_acs_token())
45-
jar_url = _upload_file(os.getenv('TEST_JAR_PATH'))
45+
jar_url = utils.upload_file(os.getenv('TEST_JAR_PATH'))
4646
utils.run_tests(jar_url,
4747
spark_job_runner_args,
4848
"All tests passed",
@@ -62,9 +62,9 @@ def test_teragen():
6262
@pytest.mark.sanity
6363
def test_python():
6464
python_script_path = os.path.join(THIS_DIR, 'jobs', 'python', 'pi_with_include.py')
65-
python_script_url = _upload_file(python_script_path)
65+
python_script_url = utils.upload_file(python_script_path)
6666
py_file_path = os.path.join(THIS_DIR, 'jobs', 'python', 'PySparkTestInclude.py')
67-
py_file_url = _upload_file(py_file_path)
67+
py_file_url = utils.upload_file(py_file_path)
6868
utils.run_tests(python_script_url,
6969
"30",
7070
"Pi is roughly 3",
@@ -97,7 +97,7 @@ def test_kerberos():
9797
@pytest.mark.sanity
9898
def test_r():
9999
r_script_path = os.path.join(THIS_DIR, 'jobs', 'R', 'dataframe.R')
100-
r_script_url = _upload_file(r_script_path)
100+
r_script_url = utils.upload_file(r_script_path)
101101
utils.run_tests(r_script_url,
102102
'',
103103
"Justin")
@@ -186,16 +186,5 @@ def _run_janitor(service_name):
186186
auth=shakedown.dcos_acs_token()))
187187

188188

189-
def _upload_file(file_path):
190-
LOGGER.info("Uploading {} to s3://{}/{}".format(
191-
file_path,
192-
os.environ['S3_BUCKET'],
193-
os.environ['S3_PREFIX']))
194-
195-
s3.upload_file(file_path)
196-
197-
basename = os.path.basename(file_path)
198-
return s3.http_url(basename)
199-
200189
def _scala_test_jar_url():
201190
return s3.http_url(os.path.basename(os.environ["SCALA_TEST_JAR_PATH"]))

tests/utils.py

Lines changed: 29 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -7,6 +7,7 @@
77
import os
88
import re
99
import requests
10+
import s3
1011
import shakedown
1112
import subprocess
1213
import urllib
@@ -111,7 +112,11 @@ def _wait_for_hdfs():
111112

112113

113114
def _is_hdfs_ready(expected_tasks = DEFAULT_HDFS_TASK_COUNT):
114-
running_tasks = [t for t in shakedown.get_service_tasks(HDFS_SERVICE_NAME) \
115+
return is_service_ready(HDFS_SERVICE_NAME, expected_tasks)
116+
117+
118+
def is_service_ready(service_name, expected_tasks):
119+
running_tasks = [t for t in shakedown.get_service_tasks(service_name) \
115120
if t['state'] == 'TASK_RUNNING']
116121
return len(running_tasks) >= expected_tasks
117122

@@ -138,7 +143,11 @@ def _get_spark_options(options = None):
138143

139144

140145
def run_tests(app_url, app_args, expected_output, args=[]):
141-
task_id = _submit_job(app_url, app_args, args)
146+
task_id = submit_job(app_url, app_args, args)
147+
check_job_output(task_id, expected_output)
148+
149+
150+
def check_job_output(task_id, expected_output):
142151
LOGGER.info('Waiting for task id={} to complete'.format(task_id))
143152
shakedown.wait_for_task_completion(task_id)
144153
stdout = _task_log(task_id)
@@ -167,7 +176,19 @@ def create_secret(name, value):
167176
dcos.http.put(url, data=json.dumps(data))
168177

169178

170-
def _submit_job(app_url, app_args, args=[]):
179+
def upload_file(file_path):
180+
LOGGER.info("Uploading {} to s3://{}/{}".format(
181+
file_path,
182+
os.environ['S3_BUCKET'],
183+
os.environ['S3_PREFIX']))
184+
185+
s3.upload_file(file_path)
186+
187+
basename = os.path.basename(file_path)
188+
return s3.http_url(basename)
189+
190+
191+
def submit_job(app_url, app_args, args=[]):
171192
if is_strict():
172193
args += ["--conf", 'spark.mesos.driverEnv.MESOS_MODULES=file:///opt/mesosphere/etc/mesos-scheduler-modules/dcos_authenticatee_module.json']
173194
args += ["--conf", 'spark.mesos.driverEnv.MESOS_AUTHENTICATEE=com_mesosphere_dcos_ClassicRPCAuthenticatee']
@@ -193,3 +214,8 @@ def _task_log(task_id, filename=None):
193214
LOGGER.info("Running {}".format(cmd))
194215
stdout = subprocess.check_output(cmd, shell=True).decode('utf-8')
195216
return stdout
217+
218+
219+
def is_framework_completed(fw_name):
220+
# The framework is not Active or Inactive
221+
return shakedown.get_service(fw_name, True) is None

0 commit comments

Comments
 (0)