Skip to content

CaaS - E2E fixes #2212

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 2 commits into from
Jun 2, 2021
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 2 additions & 2 deletions .circleci/config.yml
Original file line number Diff line number Diff line change
Expand Up @@ -159,8 +159,8 @@ jobs:
node_groups:
- name: spot
instance_type: t3.medium
min_instances: 10
max_instances: 10
min_instances: 16
max_instances: 16
spot: true
- name: cpu
instance_type: c5.xlarge
Expand Down
20 changes: 19 additions & 1 deletion test/e2e/e2e/tests.py
Original file line number Diff line number Diff line change
Expand Up @@ -540,6 +540,7 @@ def test_load_realtime(
# controls the flow of requests
request_stopper = td.Event()
latencies: List[float] = []
failed = False
try:
printer(f"getting {desired_replicas} replicas ready")
assert apis_ready(
Expand Down Expand Up @@ -621,6 +622,7 @@ def test_load_realtime(

except:
# best effort
failed = True
try:
api_info = client.get_api(api_name)
printer(json.dumps(api_info, indent=2))
Expand All @@ -630,6 +632,8 @@ def test_load_realtime(
finally:
request_stopper.set()
delete_apis(client, [api_name])
if failed:
time.sleep(30)


def test_load_async(
Expand Down Expand Up @@ -663,6 +667,7 @@ def test_load_async(
request_stopper = td.Event()
map_stopper = td.Event()
responses: List[Dict[str, Any]] = []
failed = False
try:
printer(f"getting {desired_replicas} replicas ready")
assert apis_ready(
Expand Down Expand Up @@ -736,6 +741,7 @@ def test_load_async(

except:
# best effort
failed = True
try:
api_info = client.get_api(api_name)
printer(json.dumps(api_info, indent=2))
Expand All @@ -747,6 +753,8 @@ def test_load_async(
printer(f"{len(results)}/{total_requests} have been successfully retrieved")
map_stopper.set()
delete_apis(client, [api_name])
if failed:
time.sleep(30)


def test_load_batch(
Expand Down Expand Up @@ -784,7 +792,7 @@ def test_load_batch(
api_name = api_specs[0]["name"]
client.deploy(api_spec=api_specs[0])
api_endpoint = client.get_api(api_name)["endpoint"]

failed = False
try:
assert endpoint_ready(
client=client, api_name=api_name, timeout=deploy_timeout
Expand Down Expand Up @@ -838,6 +846,7 @@ def test_load_batch(

except:
# best effort
failed = True
try:
api_info = client.get_api(api_name)

Expand All @@ -851,6 +860,8 @@ def test_load_batch(

finally:
delete_apis(client, [api_name])
if failed:
time.sleep(30)


def test_load_task(
Expand Down Expand Up @@ -879,6 +890,7 @@ def test_load_task(

request_stopper = td.Event()
map_stopper = td.Event()
failed = False
try:
assert endpoint_ready(
client=client, api_name=api_name, timeout=deploy_timeout
Expand All @@ -900,6 +912,9 @@ def test_load_task(
check_futures_healthy(threads_futures)
wait_on_futures(threads_futures)

# give it a bit of a delay to avoid overloading
time.sleep(1)

printer("waiting on the jobs")
job_ids = [job_spec.json()["job_id"] for job_spec in job_specs]
retrieve_results_concurrently(
Expand All @@ -914,6 +929,7 @@ def test_load_task(

except:
# best effort
failed = True
try:
api_info = client.get_api(api_name)

Expand All @@ -928,6 +944,8 @@ def test_load_task(
finally:
map_stopper.set()
delete_apis(client, [api_name])
if failed:
time.sleep(30)


def test_long_running_realtime(
Expand Down
26 changes: 18 additions & 8 deletions test/e2e/e2e/utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -374,19 +374,29 @@ def _retriever(request_id: str):
continue

result_response_json = result_response.json()
if (
async_kind
and "status" in result_response_json
and result_response_json["status"] == "completed"
):
break
if async_kind and "status" in result_response_json:
if result_response_json["status"] == "completed":
break
if result_response_json["status"] not in ["in_progress", "in_queue"]:
raise RuntimeError(
f"status for request ID {request_id} got set to {result_response_json['status']}"
)

if (
task_kind
and "job_status" in result_response_json
and "status" in result_response_json["job_status"]
and result_response_json["job_status"]["status"] == "status_succeeded"
):
break
if result_response_json["job_status"]["status"] == "succeeded":
break
if result_response_json["job_status"]["status"] not in [
"pending",
"enqueuing",
"running",
]:
raise RuntimeError(
f"status for job ID {request_id} got set to {result_response_json['job_status']['status']}"
)

if event_stopper.is_set():
return
Expand Down
16 changes: 8 additions & 8 deletions test/e2e/tests/conftest.py
Original file line number Diff line number Diff line change
Expand Up @@ -86,13 +86,13 @@ def pytest_configure(config):
"realtime_deploy_timeout": int(
os.environ.get("CORTEX_TEST_REALTIME_DEPLOY_TIMEOUT", 200)
),
"batch_deploy_timeout": int(os.environ.get("CORTEX_TEST_BATCH_DEPLOY_TIMEOUT", 30)),
"batch_deploy_timeout": int(os.environ.get("CORTEX_TEST_BATCH_DEPLOY_TIMEOUT", 150)),
"batch_job_timeout": int(os.environ.get("CORTEX_TEST_BATCH_JOB_TIMEOUT", 200)),
"async_deploy_timeout": int(os.environ.get("CORTEX_TEST_ASYNC_DEPLOY_TIMEOUT", 120)),
"async_deploy_timeout": int(os.environ.get("CORTEX_TEST_ASYNC_DEPLOY_TIMEOUT", 150)),
"async_workload_timeout": int(
os.environ.get("CORTEX_TEST_ASYNC_WORKLOAD_TIMEOUT", 200)
),
"task_deploy_timeout": int(os.environ.get("CORTEX_TEST_TASK_DEPLOY_TIMEOUT", 30)),
"task_deploy_timeout": int(os.environ.get("CORTEX_TEST_TASK_DEPLOY_TIMEOUT", 75)),
"task_job_timeout": int(os.environ.get("CORTEX_TEST_TASK_JOB_TIMEOUT", 200)),
"skip_gpus": config.getoption("--skip-gpus"),
"skip_infs": config.getoption("--skip-infs"),
Expand All @@ -104,7 +104,7 @@ def pytest_configure(config):
},
"load_test_config": {
"realtime": {
"total_requests": 10 ** 6,
"total_requests": 10 ** 5,
"desired_replicas": 50,
"concurrency": 50,
"min_rtt": 0.004, # measured in seconds
Expand All @@ -115,7 +115,7 @@ def pytest_configure(config):
},
"async": {
"total_requests": 10 ** 3,
"desired_replicas": 50,
"desired_replicas": 20,
"concurrency": 10,
"submit_timeout": 120, # measured in seconds
"workload_timeout": 120, # measured in seconds
Expand All @@ -125,13 +125,13 @@ def pytest_configure(config):
"workers_per_job": 10,
"items_per_job": 10 ** 5,
"batch_size": 10 * 2,
"workload_timeout": 210, # measured in seconds
"workload_timeout": 200, # measured in seconds
},
"task": {
"jobs": 10 ** 2,
"concurrency": 4,
"submit_timeout": 240, # measured in seconds
"workload_timeout": 180, # measured in seconds
"submit_timeout": 200, # measured in seconds
"workload_timeout": 400, # measured in seconds
},
},
"long_running_test_config": {
Expand Down