Skip to content

Commit

Permalink
[core] Fix log monitor printing the wrong IP address when --node-ip-a…
Browse files Browse the repository at this point in the history
…ddress is given (ray-project#39382)

log monitor uses get_node_ip_address, which doesn't return the correct IP when ray is not initialized (it returns the IP from the main network interface, not a given --node-ip-address). This PR fixes the issue by getting the cached IP instead.
  • Loading branch information
rkooo567 authored Oct 6, 2023
1 parent b6d34fd commit e2796c5
Show file tree
Hide file tree
Showing 5 changed files with 162 additions and 116 deletions.
17 changes: 13 additions & 4 deletions python/ray/_private/log_monitor.py
Original file line number Diff line number Diff line change
Expand Up @@ -133,14 +133,15 @@ class LogMonitor:

def __init__(
self,
logs_dir,
gcs_publisher,
node_ip_address: str,
logs_dir: str,
gcs_publisher: ray._raylet.GcsPublisher,
is_proc_alive_fn: Callable[[int], bool],
max_files_open: int = ray_constants.LOG_MONITOR_MAX_OPEN_FILES,
gcs_address: Optional[str] = None,
):
"""Initialize the log monitor object."""
self.ip: str = services.get_node_ip_address()
self.ip: str = node_ip_address
self.logs_dir: str = logs_dir
self.publisher = gcs_publisher
self.log_filenames: Set[str] = set()
Expand Down Expand Up @@ -520,11 +521,17 @@ def is_proc_alive(pid):
"log to stdout if set empty, default is "
f'"{ray_constants.LOG_MONITOR_LOG_FILE_NAME}"',
)
parser.add_argument(
"--session-dir",
required=True,
type=str,
help="Specify the path of the session directory used by Ray processes.",
)
parser.add_argument(
"--logs-dir",
required=True,
type=str,
help="Specify the path of the temporary directory used by Ray processes.",
help="Specify the path of the log directory used by Ray processes.",
)
parser.add_argument(
"--logging-rotate-bytes",
Expand Down Expand Up @@ -553,7 +560,9 @@ def is_proc_alive(pid):
backup_count=args.logging_rotate_backup_count,
)

node_ip = services.get_cached_node_ip_address(args.session_dir)
log_monitor = LogMonitor(
node_ip,
args.logs_dir,
ray._raylet.GcsPublisher(address=args.gcs_address),
is_proc_alive,
Expand Down
117 changes: 11 additions & 106 deletions python/ray/_private/node.py
Original file line number Diff line number Diff line change
Expand Up @@ -18,13 +18,11 @@
from typing import Dict, Optional, Tuple, IO, AnyStr

from filelock import FileLock
from pathlib import Path

import ray
import ray._private.ray_constants as ray_constants
import ray._private.services
import ray._private.utils
from ray._private.ray_constants import RAY_NODE_IP_FILENAME
from ray._private import storage
from ray._raylet import GcsClient, get_session_key_from_storage
from ray._private.resource_spec import ResourceSpec
Expand Down Expand Up @@ -208,7 +206,9 @@ def __init__(
)
self._node_ip_address = node_ip_address
if not connect_only:
self._write_node_ip_address(node_ip_address)
ray._private.services.write_node_ip_address(
self.get_session_dir_path(), node_ip_address
)

if ray_params.raylet_ip_address:
raylet_ip_address = ray_params.raylet_ip_address
Expand Down Expand Up @@ -988,126 +988,30 @@ def _wait_and_get_for_node_address(self, timeout_s: int = 60) -> str:
within timeout_s.
"""
for i in range(timeout_s):
node_ip_address = self._get_cached_node_ip_address()
node_ip_address = ray._private.services.get_cached_node_ip_address(
self.get_session_dir_path()
)

if node_ip_address is not None:
return node_ip_address

time.sleep(1)
if i % 10 == 0:
logger.info(
f"Can't find a `{RAY_NODE_IP_FILENAME}` file from "
f"{self.get_session_dir_path()}. "
f"Can't find a `{ray_constants.RAY_NODE_IP_FILENAME}` "
f"file from {self.get_session_dir_path()}. "
"Have you started Ray instsance using "
"`ray start` or `ray.init`?"
)

raise ValueError(
f"Can't find a `{RAY_NODE_IP_FILENAME}` file from "
f"{self.get_session_dir_path()}. "
f"Can't find a `{ray_constants.RAY_NODE_IP_FILENAME}` "
f"file from {self.get_session_dir_path()}. "
f"for {timeout_s} seconds. "
"A ray instance hasn't started. "
"Did you do `ray start` or `ray.init` on this host?"
)

def _get_cached_node_ip_address(self) -> str:
"""Get a node address cached on this session.
If a ray instance is started by `ray start --node-ip-address`,
the node ip address is cached to a file RAY_NODE_IP_FILENAME.
Otherwise, the file exists, but it is emptyl.
This API is process-safe, meaning the file access is protected by
a file lock.
Returns:
node_ip_address cached on the current node. None if the node
the file doesn't exist, meaning ray instance hasn't been
started on a current node. If node_ip_address is not written
to a file, it means --node-ip-address is not given, and in this
case, we find the IP address ourselves.
"""
assert hasattr(self, "_session_dir")
file_path = Path(
os.path.join(self.get_session_dir_path(), RAY_NODE_IP_FILENAME)
)
cached_node_ip_address = {}

with FileLock(str(file_path.absolute()) + ".lock"):
if not file_path.exists():
return None

with file_path.open() as f:
cached_node_ip_address.update(json.load(f))

if "node_ip_address" in cached_node_ip_address:
return cached_node_ip_address["node_ip_address"]
else:
return ray.util.get_node_ip_address()

def _write_node_ip_address(self, node_ip_address: Optional[str]) -> None:
"""Write a node ip address of the current session to
RAY_NODE_IP_FILENAME.
If a ray instance is started by `ray start --node-ip-address`,
the node ip address is cached to a file RAY_NODE_IP_FILENAME.
This API is process-safe, meaning the file access is protected by
a file lock.
The file contains a single string node_ip_address. If nothing
is written, it means --node-ip-address was not given, and Ray
resolves the IP address on its own. It assumes in a single node,
you can have only 1 IP address (which is the assumption ray
has in general).
node_ip_address is the ip address of the current node.
Args:
node_ip_address: The node IP address of the current node.
If None, it means the node ip address is not given
by --node-ip-address. In this case, we don't write
anything to a file.
"""
assert hasattr(self, "_session_dir")

file_path = Path(
os.path.join(self.get_session_dir_path(), RAY_NODE_IP_FILENAME)
)
cached_node_ip_address = {}

with FileLock(str(file_path.absolute()) + ".lock"):
if not file_path.exists():
with file_path.open(mode="w") as f:
json.dump({}, f)

with file_path.open() as f:
cached_node_ip_address.update(json.load(f))

cached_node_ip = cached_node_ip_address.get("node_ip_address")

if node_ip_address is not None:
if cached_node_ip:
if cached_node_ip == node_ip_address:
# Nothing to do.
return
else:
logger.warning(
"The node IP address of the current host recorded "
f"in {RAY_NODE_IP_FILENAME} ({cached_node_ip}) "
"is different from the current IP address: "
f"{node_ip_address}. Ray will use {node_ip_address} "
"as the current node's IP address. "
"Creating 2 instances in the same host with different "
"IP address is not supported. "
"Please create an enhnacement request to"
"https://github.com/ray-project/ray/issues."
)

cached_node_ip_address["node_ip_address"] = node_ip_address
with file_path.open(mode="w") as f:
json.dump(cached_node_ip_address, f)

def start_reaper_process(self):
"""
Start the reaper process.
Expand All @@ -1133,6 +1037,7 @@ def start_log_monitor(self):
"log_monitor", unique=True, create_out=False
)
process_info = ray._private.services.start_log_monitor(
self.get_session_dir_path(),
self._logs_dir,
self.gcs_address,
fate_share=self.kernel_fate_share,
Expand Down
107 changes: 107 additions & 0 deletions python/ray/_private/services.py
Original file line number Diff line number Diff line change
Expand Up @@ -19,12 +19,14 @@

# Import psutil after ray so the packaged version is used.
import psutil
from filelock import FileLock

# Ray modules
import ray
import ray._private.ray_constants as ray_constants
from ray._raylet import GcsClient, GcsClientOptions
from ray.core.generated.common_pb2 import Language
from ray._private.ray_constants import RAY_NODE_IP_FILENAME

resource = None
if sys.platform != "win32":
Expand Down Expand Up @@ -649,6 +651,11 @@ def node_ip_address_from_perspective(address: str):
return node_ip_address


# NOTE: This API should not be used when you obtain the
# IP address when ray.init is not called because
# it cannot find the IP address if it is specified by
# ray start --node-ip-address. You should instead use
# get_cached_node_ip_address.
def get_node_ip_address(address="8.8.8.8:53"):
if ray._private.worker._global_node is not None:
return ray._private.worker._global_node.node_ip_address
Expand All @@ -660,6 +667,103 @@ def get_node_ip_address(address="8.8.8.8:53"):
return node_ip_address_from_perspective(address)


def get_cached_node_ip_address(session_dir: str) -> str:
"""Get a node address cached on this session.
If a ray instance is started by `ray start --node-ip-address`,
the node ip address is cached to a file RAY_NODE_IP_FILENAME.
Otherwise, the file exists, but it is emptyl.
This API is process-safe, meaning the file access is protected by
a file lock.
Args:
session_dir: Path to the Ray session directory.
Returns:
node_ip_address cached on the current node. None if the node
the file doesn't exist, meaning ray instance hasn't been
started on a current node. If node_ip_address is not written
to a file, it means --node-ip-address is not given, and in this
case, we find the IP address ourselves.
"""
file_path = Path(os.path.join(session_dir, RAY_NODE_IP_FILENAME))
cached_node_ip_address = {}

with FileLock(str(file_path.absolute()) + ".lock"):
if not file_path.exists():
return None

with file_path.open() as f:
cached_node_ip_address.update(json.load(f))

if "node_ip_address" in cached_node_ip_address:
return cached_node_ip_address["node_ip_address"]
else:
return ray.util.get_node_ip_address()


def write_node_ip_address(session_dir: str, node_ip_address: Optional[str]) -> None:
"""Write a node ip address of the current session to
RAY_NODE_IP_FILENAME.
If a ray instance is started by `ray start --node-ip-address`,
the node ip address is cached to a file RAY_NODE_IP_FILENAME.
This API is process-safe, meaning the file access is protected by
a file lock.
The file contains a single string node_ip_address. If nothing
is written, it means --node-ip-address was not given, and Ray
resolves the IP address on its own. It assumes in a single node,
you can have only 1 IP address (which is the assumption ray
has in general).
node_ip_address is the ip address of the current node.
Args:
session_dir: The path to Ray session directory.
node_ip_address: The node IP address of the current node.
If None, it means the node ip address is not given
by --node-ip-address. In this case, we don't write
anything to a file.
"""
file_path = Path(os.path.join(session_dir, RAY_NODE_IP_FILENAME))
cached_node_ip_address = {}

with FileLock(str(file_path.absolute()) + ".lock"):
if not file_path.exists():
with file_path.open(mode="w") as f:
json.dump({}, f)

with file_path.open() as f:
cached_node_ip_address.update(json.load(f))

cached_node_ip = cached_node_ip_address.get("node_ip_address")

if node_ip_address is not None:
if cached_node_ip:
if cached_node_ip == node_ip_address:
# Nothing to do.
return
else:
logger.warning(
"The node IP address of the current host recorded "
f"in {RAY_NODE_IP_FILENAME} ({cached_node_ip}) "
"is different from the current IP address: "
f"{node_ip_address}. Ray will use {node_ip_address} "
"as the current node's IP address. "
"Creating 2 instances in the same host with different "
"IP address is not supported. "
"Please create an enhnacement request to"
"https://github.com/ray-project/ray/issues."
)

cached_node_ip_address["node_ip_address"] = node_ip_address
with file_path.open(mode="w") as f:
json.dump(cached_node_ip_address, f)


def create_redis_client(redis_address, password=None):
"""Create a Redis client.
Expand Down Expand Up @@ -970,6 +1074,7 @@ def start_reaper(fate_share=None):


def start_log_monitor(
session_dir: str,
logs_dir: str,
gcs_address: str,
fate_share: Optional[bool] = None,
Expand All @@ -982,6 +1087,7 @@ def start_log_monitor(
"""Start a log monitor process.
Args:
session_dir: The session directory.
logs_dir: The directory of logging files.
gcs_address: GCS address for pubsub.
fate_share: Whether to share fate between log_monitor
Expand All @@ -1006,6 +1112,7 @@ def start_log_monitor(
sys.executable,
"-u",
log_monitor_filepath,
f"--session-dir={session_dir}",
f"--logs-dir={logs_dir}",
f"--gcs-address={gcs_address}",
f"--logging-rotate-bytes={max_bytes}",
Expand Down
Loading

0 comments on commit e2796c5

Please sign in to comment.