-
Notifications
You must be signed in to change notification settings - Fork 6.6k
[ci][release][core] rewrite RuntimeEnvAgentClient with reusable TCP connection, also test_many_runtime_envs.py with env vars #38772
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Changes from all commits
1dc07c7
3e78be4
594c7bc
24e6d58
8fad62c
d0faa40
6b62c9c
3a0a7ec
7257691
be125a5
b5b5588
64e5737
9138469
c9ed025
66ca2d6
e243fb1
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -58,6 +58,21 @@ class CreatedEnvResult: | |
UriType = str | ||
|
||
|
||
def truncate_dict_str(dict_str: str) -> str: | ||
""" | ||
If `dict_str` is too long, returns a truncated string to avoid overloading the logs. | ||
Omits the last chars except for the last char which should be "}". | ||
""" | ||
if len(dict_str) > 120: | ||
return dict_str[:116] + "...}" | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Maybe add an env var to enable full string? |
||
return dict_str | ||
|
||
|
||
def log_debug_runtime_env_if_too_long(logger, serialized_runtime_env): | ||
if len(serialized_runtime_env) > 120: | ||
logger.debug(f"serialized runtime env full length: {serialized_runtime_env}") | ||
|
||
|
||
class ReferenceTable: | ||
""" | ||
The URI reference table which is used for GC. | ||
|
@@ -124,7 +139,10 @@ def _decrease_reference_for_runtime_env(self, serialized_env: str): | |
else: | ||
default_logger.warn(f"Runtime env {serialized_env} does not exist.") | ||
if unused: | ||
default_logger.info(f"Unused runtime env {serialized_env}.") | ||
default_logger.info( | ||
f"Unused runtime env {truncate_dict_str(serialized_env)}." | ||
) | ||
log_debug_runtime_env_if_too_long(default_logger, serialized_env) | ||
self._unused_runtime_env_callback(serialized_env) | ||
return unused | ||
|
||
|
@@ -253,8 +271,10 @@ def unused_runtime_env_processor(self, unused_runtime_env: str) -> None: | |
def delete_runtime_env(): | ||
del self._env_cache[unused_runtime_env] | ||
self._logger.info( | ||
"Runtime env %s removed from env-level cache.", unused_runtime_env | ||
"Runtime env %s removed from env-level cache.", | ||
truncate_dict_str(unused_runtime_env), | ||
) | ||
log_debug_runtime_env_if_too_long(self._logger, unused_runtime_env) | ||
|
||
if unused_runtime_env in self._env_cache: | ||
if not self._env_cache[unused_runtime_env].success: | ||
|
@@ -279,6 +299,9 @@ def get_or_create_logger(self, job_id: bytes): | |
return self._per_job_logger_cache[job_id] | ||
|
||
async def GetOrCreateRuntimeEnv(self, request): | ||
|
||
runtime_env_truncated = truncate_dict_str(request.serialized_runtime_env) | ||
|
||
self._logger.debug( | ||
f"Got request from {request.source_process} to increase " | ||
"reference for runtime env: " | ||
|
@@ -326,8 +349,6 @@ async def _setup_runtime_env( | |
|
||
async def _create_runtime_env_with_retry( | ||
runtime_env, | ||
serialized_runtime_env, | ||
serialized_allocated_resource_instances, | ||
setup_timeout_seconds, | ||
) -> Tuple[bool, str, str]: | ||
""" | ||
|
@@ -346,9 +367,10 @@ async def _create_runtime_env_with_retry( | |
|
||
""" | ||
self._logger.info( | ||
f"Creating runtime env: {serialized_env} with timeout " | ||
f"Creating runtime env: {runtime_env_truncated} with timeout " | ||
f"{setup_timeout_seconds} seconds." | ||
) | ||
log_debug_runtime_env_if_too_long(self._logger, serialized_env) | ||
serialized_context = None | ||
error_message = None | ||
for _ in range(runtime_env_consts.RUNTIME_ENV_RETRY_TIMES): | ||
|
@@ -393,9 +415,10 @@ async def _create_runtime_env_with_retry( | |
else: | ||
self._logger.info( | ||
"Successfully created runtime env: %s, the context: %s", | ||
serialized_env, | ||
serialized_context, | ||
runtime_env_truncated, | ||
truncate_dict_str(serialized_context), | ||
) | ||
log_debug_runtime_env_if_too_long(self._logger, serialized_env) | ||
return True, serialized_context, None | ||
|
||
try: | ||
|
@@ -429,9 +452,10 @@ async def _create_runtime_env_with_retry( | |
context = result.result | ||
self._logger.info( | ||
"Runtime env already created " | ||
f"successfully. Env: {serialized_env}, " | ||
f"successfully. Env: {runtime_env_truncated}, " | ||
f"context: {context}" | ||
) | ||
log_debug_runtime_env_if_too_long(self._logger, serialized_env) | ||
return runtime_env_agent_pb2.GetOrCreateRuntimeEnvReply( | ||
status=agent_manager_pb2.AGENT_RPC_STATUS_OK, | ||
serialized_runtime_env_context=context, | ||
|
@@ -440,9 +464,10 @@ async def _create_runtime_env_with_retry( | |
error_message = result.result | ||
self._logger.info( | ||
"Runtime env already failed. " | ||
f"Env: {serialized_env}, " | ||
f"Env: {runtime_env_truncated}, " | ||
f"err: {error_message}" | ||
) | ||
log_debug_runtime_env_if_too_long(self._logger, serialized_env) | ||
# Recover the reference. | ||
self._reference_table.decrease_reference( | ||
runtime_env, serialized_env, request.source_process | ||
|
@@ -472,8 +497,6 @@ async def _create_runtime_env_with_retry( | |
error_message, | ||
) = await _create_runtime_env_with_retry( | ||
runtime_env, | ||
serialized_env, | ||
request.serialized_allocated_resource_instances, | ||
setup_timeout_seconds, | ||
) | ||
creation_time_ms = int(round((time.perf_counter() - start) * 1000, 0)) | ||
|
@@ -498,11 +521,15 @@ async def _create_runtime_env_with_retry( | |
) | ||
|
||
async def DeleteRuntimeEnvIfPossible(self, request): | ||
|
||
runtime_env_truncated = truncate_dict_str(request.serialized_runtime_env) | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Hmm actuallly I wonder if it is possible to make it a part of |
||
|
||
self._logger.info( | ||
f"Got request from {request.source_process} to decrease " | ||
"reference for runtime env: " | ||
f"{request.serialized_runtime_env}." | ||
f"{runtime_env_truncated}." | ||
) | ||
log_debug_runtime_env_if_too_long(self._logger, request.serialized_runtime_env) | ||
|
||
try: | ||
runtime_env = RuntimeEnv.deserialize(request.serialized_runtime_env) | ||
|
This file was deleted.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Hmm actually I am confused with this comment. If keepalive_timeout = None, does that mean keepalive will never timeout, meaning it can actually never detect the client side failures?