Skip to content

[ci][release][core] rewrite RuntimeEnvAgentClient with reusable TCP connection, also test_many_runtime_envs.py with env vars #38772

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Closed
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions python/ray/_private/runtime_env/agent/main.py
Original file line number Diff line number Diff line change
Expand Up @@ -211,6 +211,7 @@ def parent_dead_callback():
host=args.node_ip_address,
port=args.runtime_env_agent_port,
loop=loop,
keepalive_timeout=None, # we want to always keep alive.
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hmm actually I am confused with this comment. If keepalive_timeout = None, does that mean keepalive will never timeout, meaning it can actually never detect the client side failures?

)
except SystemExit as e:
agent._logger.info(f"SystemExit! {e}")
Expand Down
51 changes: 39 additions & 12 deletions python/ray/_private/runtime_env/agent/runtime_env_agent.py
Original file line number Diff line number Diff line change
Expand Up @@ -58,6 +58,21 @@ class CreatedEnvResult:
UriType = str


def truncate_dict_str(dict_str: str) -> str:
"""
If `dict_str` is too long, returns a truncated string to avoid overloading the logs.
Omits the last chars except for the last char which should be "}".
"""
if len(dict_str) > 120:
return dict_str[:116] + "...}"
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Maybe add an env var to enable full string?

return dict_str


def log_debug_runtime_env_if_too_long(logger, serialized_runtime_env):
if len(serialized_runtime_env) > 120:
logger.debug(f"serialized runtime env full length: {serialized_runtime_env}")


class ReferenceTable:
"""
The URI reference table which is used for GC.
Expand Down Expand Up @@ -124,7 +139,10 @@ def _decrease_reference_for_runtime_env(self, serialized_env: str):
else:
default_logger.warn(f"Runtime env {serialized_env} does not exist.")
if unused:
default_logger.info(f"Unused runtime env {serialized_env}.")
default_logger.info(
f"Unused runtime env {truncate_dict_str(serialized_env)}."
)
log_debug_runtime_env_if_too_long(default_logger, serialized_env)
self._unused_runtime_env_callback(serialized_env)
return unused

Expand Down Expand Up @@ -253,8 +271,10 @@ def unused_runtime_env_processor(self, unused_runtime_env: str) -> None:
def delete_runtime_env():
del self._env_cache[unused_runtime_env]
self._logger.info(
"Runtime env %s removed from env-level cache.", unused_runtime_env
"Runtime env %s removed from env-level cache.",
truncate_dict_str(unused_runtime_env),
)
log_debug_runtime_env_if_too_long(self._logger, unused_runtime_env)

if unused_runtime_env in self._env_cache:
if not self._env_cache[unused_runtime_env].success:
Expand All @@ -279,6 +299,9 @@ def get_or_create_logger(self, job_id: bytes):
return self._per_job_logger_cache[job_id]

async def GetOrCreateRuntimeEnv(self, request):

runtime_env_truncated = truncate_dict_str(request.serialized_runtime_env)

self._logger.debug(
f"Got request from {request.source_process} to increase "
"reference for runtime env: "
Expand Down Expand Up @@ -326,8 +349,6 @@ async def _setup_runtime_env(

async def _create_runtime_env_with_retry(
runtime_env,
serialized_runtime_env,
serialized_allocated_resource_instances,
setup_timeout_seconds,
) -> Tuple[bool, str, str]:
"""
Expand All @@ -346,9 +367,10 @@ async def _create_runtime_env_with_retry(

"""
self._logger.info(
f"Creating runtime env: {serialized_env} with timeout "
f"Creating runtime env: {runtime_env_truncated} with timeout "
f"{setup_timeout_seconds} seconds."
)
log_debug_runtime_env_if_too_long(self._logger, serialized_env)
serialized_context = None
error_message = None
for _ in range(runtime_env_consts.RUNTIME_ENV_RETRY_TIMES):
Expand Down Expand Up @@ -393,9 +415,10 @@ async def _create_runtime_env_with_retry(
else:
self._logger.info(
"Successfully created runtime env: %s, the context: %s",
serialized_env,
serialized_context,
runtime_env_truncated,
truncate_dict_str(serialized_context),
)
log_debug_runtime_env_if_too_long(self._logger, serialized_env)
return True, serialized_context, None

try:
Expand Down Expand Up @@ -429,9 +452,10 @@ async def _create_runtime_env_with_retry(
context = result.result
self._logger.info(
"Runtime env already created "
f"successfully. Env: {serialized_env}, "
f"successfully. Env: {runtime_env_truncated}, "
f"context: {context}"
)
log_debug_runtime_env_if_too_long(self._logger, serialized_env)
return runtime_env_agent_pb2.GetOrCreateRuntimeEnvReply(
status=agent_manager_pb2.AGENT_RPC_STATUS_OK,
serialized_runtime_env_context=context,
Expand All @@ -440,9 +464,10 @@ async def _create_runtime_env_with_retry(
error_message = result.result
self._logger.info(
"Runtime env already failed. "
f"Env: {serialized_env}, "
f"Env: {runtime_env_truncated}, "
f"err: {error_message}"
)
log_debug_runtime_env_if_too_long(self._logger, serialized_env)
# Recover the reference.
self._reference_table.decrease_reference(
runtime_env, serialized_env, request.source_process
Expand Down Expand Up @@ -472,8 +497,6 @@ async def _create_runtime_env_with_retry(
error_message,
) = await _create_runtime_env_with_retry(
runtime_env,
serialized_env,
request.serialized_allocated_resource_instances,
setup_timeout_seconds,
)
creation_time_ms = int(round((time.perf_counter() - start) * 1000, 0))
Expand All @@ -498,11 +521,15 @@ async def _create_runtime_env_with_retry(
)

async def DeleteRuntimeEnvIfPossible(self, request):

runtime_env_truncated = truncate_dict_str(request.serialized_runtime_env)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hmm actuallly I wonder if it is possible to make it a part of __str__ implementation instead? Seems a bit fragile (i.e., if you use this dict by mistake for functional purpose, that will be a huge issue & if you add new logs, it is easy to miss)


self._logger.info(
f"Got request from {request.source_process} to decrease "
"reference for runtime env: "
f"{request.serialized_runtime_env}."
f"{runtime_env_truncated}."
)
log_debug_runtime_env_if_too_long(self._logger, request.serialized_runtime_env)

try:
runtime_env = RuntimeEnv.deserialize(request.serialized_runtime_env)
Expand Down
55 changes: 21 additions & 34 deletions release/nightly_tests/stress_tests/test_many_runtime_envs.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,6 @@
import os

import ray
import ray.util.scheduling_strategies

logging.basicConfig(level=logging.INFO)
logger = logging.getLogger(__name__)
Expand All @@ -14,45 +13,33 @@


@ray.remote
def assert_file_content(file_name, expected_content):
with open(file_name, "r") as file:
actual_content = file.read()
if actual_content != expected_content:
raise ValueError(f"expected {expected_content}, got {actual_content}")


@ray.remote
def assert_n_files_in_working_dir(n):
files = os.listdir()
num_files = len(files)
if num_files != n:
raise ValueError(
f"Expected {n} files in working dir, but found {num_files} files."
)
def assert_env_var(prefix, expected_count, expected_value):
count = 0
for k, v in os.environ.items():
if k.startswith(prefix):
assert v == expected_value
count += 1
assert count == expected_count


def parse_script_args():
parser = argparse.ArgumentParser()
parser.add_argument("--file-name", type=str)
parser.add_argument("--expected-content", type=str)
parser.add_argument("--expected-file-count-in-working-dir", type=int)
parser.add_argument("--node_id", type=str)
parser.add_argument("--num_runtime_envs", type=int)
parser.add_argument("--num_tasks", type=int)
return parser.parse_known_args()


if __name__ == "__main__":
args, unknown = parse_script_args()
strategy = ray.util.scheduling_strategies.NodeAffinitySchedulingStrategy(
node_id=args.node_id,
soft=True,
)
ray.get(
[
assert_file_content.options(strategy=strategy).remote(
args.file_name, args.expected_content
),
assert_n_files_in_working_dir.options(strategy=strategy).remote(
args.expected_file_count_in_working_dir
),
]
)
tasks = []
for i in range(args.num_tasks):
val = f"task{i}"
task = assert_env_var.options(
runtime_env={
"env_vars": {
f"STRESS_TEST_{j}": val for j in range(args.num_runtime_envs)
}
},
).remote("STRESS_TEST_", args.num_runtime_envs, val)
tasks.append(task)
ray.get(tasks)

This file was deleted.

2 changes: 1 addition & 1 deletion release/release_tests.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -5159,7 +5159,7 @@
timeout: 14400
wait_for_nodes:
num_nodes: 5
script: python stress_tests/test_many_runtime_envs_runner.py --num_runtime_envs=10000
script: python stress_tests/test_many_runtime_envs.py --num_runtime_envs=100 --num_tasks=10000

variations:
- __suffix__: aws
Expand Down
Loading