diff --git a/python/ray/_private/log_monitor.py b/python/ray/_private/log_monitor.py index 9c02ba3ae0acf..8802117b24bca 100644 --- a/python/ray/_private/log_monitor.py +++ b/python/ray/_private/log_monitor.py @@ -133,14 +133,15 @@ class LogMonitor: def __init__( self, - logs_dir, - gcs_publisher, + node_ip_address: str, + logs_dir: str, + gcs_publisher: ray._raylet.GcsPublisher, is_proc_alive_fn: Callable[[int], bool], max_files_open: int = ray_constants.LOG_MONITOR_MAX_OPEN_FILES, gcs_address: Optional[str] = None, ): """Initialize the log monitor object.""" - self.ip: str = services.get_node_ip_address() + self.ip: str = node_ip_address self.logs_dir: str = logs_dir self.publisher = gcs_publisher self.log_filenames: Set[str] = set() @@ -520,11 +521,17 @@ def is_proc_alive(pid): "log to stdout if set empty, default is " f'"{ray_constants.LOG_MONITOR_LOG_FILE_NAME}"', ) + parser.add_argument( + "--session-dir", + required=True, + type=str, + help="Specify the path of the session directory used by Ray processes.", + ) parser.add_argument( "--logs-dir", required=True, type=str, - help="Specify the path of the temporary directory used by Ray processes.", + help="Specify the path of the log directory used by Ray processes.", ) parser.add_argument( "--logging-rotate-bytes", @@ -553,7 +560,9 @@ def is_proc_alive(pid): backup_count=args.logging_rotate_backup_count, ) + node_ip = services.get_cached_node_ip_address(args.session_dir) log_monitor = LogMonitor( + node_ip, args.logs_dir, ray._raylet.GcsPublisher(address=args.gcs_address), is_proc_alive, diff --git a/python/ray/_private/node.py b/python/ray/_private/node.py index d984811847377..9a5d9b4381091 100644 --- a/python/ray/_private/node.py +++ b/python/ray/_private/node.py @@ -18,13 +18,11 @@ from typing import Dict, Optional, Tuple, IO, AnyStr from filelock import FileLock -from pathlib import Path import ray import ray._private.ray_constants as ray_constants import ray._private.services import ray._private.utils -from ray._private.ray_constants import RAY_NODE_IP_FILENAME from ray._private import storage from ray._raylet import GcsClient, get_session_key_from_storage from ray._private.resource_spec import ResourceSpec @@ -208,7 +206,9 @@ def __init__( ) self._node_ip_address = node_ip_address if not connect_only: - self._write_node_ip_address(node_ip_address) + ray._private.services.write_node_ip_address( + self.get_session_dir_path(), node_ip_address + ) if ray_params.raylet_ip_address: raylet_ip_address = ray_params.raylet_ip_address @@ -988,7 +988,9 @@ def _wait_and_get_for_node_address(self, timeout_s: int = 60) -> str: within timeout_s. """ for i in range(timeout_s): - node_ip_address = self._get_cached_node_ip_address() + node_ip_address = ray._private.services.get_cached_node_ip_address( + self.get_session_dir_path() + ) if node_ip_address is not None: return node_ip_address @@ -996,118 +998,20 @@ def _wait_and_get_for_node_address(self, timeout_s: int = 60) -> str: time.sleep(1) if i % 10 == 0: logger.info( - f"Can't find a `{RAY_NODE_IP_FILENAME}` file from " - f"{self.get_session_dir_path()}. " + f"Can't find a `{ray_constants.RAY_NODE_IP_FILENAME}` " + f"file from {self.get_session_dir_path()}. " "Have you started Ray instsance using " "`ray start` or `ray.init`?" ) raise ValueError( - f"Can't find a `{RAY_NODE_IP_FILENAME}` file from " - f"{self.get_session_dir_path()}. " + f"Can't find a `{ray_constants.RAY_NODE_IP_FILENAME}` " + f"file from {self.get_session_dir_path()}. " f"for {timeout_s} seconds. " "A ray instance hasn't started. " "Did you do `ray start` or `ray.init` on this host?" ) - def _get_cached_node_ip_address(self) -> str: - """Get a node address cached on this session. - - If a ray instance is started by `ray start --node-ip-address`, - the node ip address is cached to a file RAY_NODE_IP_FILENAME. - Otherwise, the file exists, but it is emptyl. - - This API is process-safe, meaning the file access is protected by - a file lock. - - Returns: - node_ip_address cached on the current node. None if the node - the file doesn't exist, meaning ray instance hasn't been - started on a current node. If node_ip_address is not written - to a file, it means --node-ip-address is not given, and in this - case, we find the IP address ourselves. - """ - assert hasattr(self, "_session_dir") - file_path = Path( - os.path.join(self.get_session_dir_path(), RAY_NODE_IP_FILENAME) - ) - cached_node_ip_address = {} - - with FileLock(str(file_path.absolute()) + ".lock"): - if not file_path.exists(): - return None - - with file_path.open() as f: - cached_node_ip_address.update(json.load(f)) - - if "node_ip_address" in cached_node_ip_address: - return cached_node_ip_address["node_ip_address"] - else: - return ray.util.get_node_ip_address() - - def _write_node_ip_address(self, node_ip_address: Optional[str]) -> None: - """Write a node ip address of the current session to - RAY_NODE_IP_FILENAME. - - If a ray instance is started by `ray start --node-ip-address`, - the node ip address is cached to a file RAY_NODE_IP_FILENAME. - - This API is process-safe, meaning the file access is protected by - a file lock. - - The file contains a single string node_ip_address. If nothing - is written, it means --node-ip-address was not given, and Ray - resolves the IP address on its own. It assumes in a single node, - you can have only 1 IP address (which is the assumption ray - has in general). - - node_ip_address is the ip address of the current node. - - Args: - node_ip_address: The node IP address of the current node. - If None, it means the node ip address is not given - by --node-ip-address. In this case, we don't write - anything to a file. - """ - assert hasattr(self, "_session_dir") - - file_path = Path( - os.path.join(self.get_session_dir_path(), RAY_NODE_IP_FILENAME) - ) - cached_node_ip_address = {} - - with FileLock(str(file_path.absolute()) + ".lock"): - if not file_path.exists(): - with file_path.open(mode="w") as f: - json.dump({}, f) - - with file_path.open() as f: - cached_node_ip_address.update(json.load(f)) - - cached_node_ip = cached_node_ip_address.get("node_ip_address") - - if node_ip_address is not None: - if cached_node_ip: - if cached_node_ip == node_ip_address: - # Nothing to do. - return - else: - logger.warning( - "The node IP address of the current host recorded " - f"in {RAY_NODE_IP_FILENAME} ({cached_node_ip}) " - "is different from the current IP address: " - f"{node_ip_address}. Ray will use {node_ip_address} " - "as the current node's IP address. " - "Creating 2 instances in the same host with different " - "IP address is not supported. " - "Please create an enhnacement request to" - "https://github.com/ray-project/ray/issues." - ) - - cached_node_ip_address["node_ip_address"] = node_ip_address - with file_path.open(mode="w") as f: - json.dump(cached_node_ip_address, f) - def start_reaper_process(self): """ Start the reaper process. @@ -1133,6 +1037,7 @@ def start_log_monitor(self): "log_monitor", unique=True, create_out=False ) process_info = ray._private.services.start_log_monitor( + self.get_session_dir_path(), self._logs_dir, self.gcs_address, fate_share=self.kernel_fate_share, diff --git a/python/ray/_private/services.py b/python/ray/_private/services.py index a886341ef7573..867b748a0e8d4 100644 --- a/python/ray/_private/services.py +++ b/python/ray/_private/services.py @@ -19,12 +19,14 @@ # Import psutil after ray so the packaged version is used. import psutil +from filelock import FileLock # Ray modules import ray import ray._private.ray_constants as ray_constants from ray._raylet import GcsClient, GcsClientOptions from ray.core.generated.common_pb2 import Language +from ray._private.ray_constants import RAY_NODE_IP_FILENAME resource = None if sys.platform != "win32": @@ -649,6 +651,11 @@ def node_ip_address_from_perspective(address: str): return node_ip_address +# NOTE: This API should not be used when you obtain the +# IP address when ray.init is not called because +# it cannot find the IP address if it is specified by +# ray start --node-ip-address. You should instead use +# get_cached_node_ip_address. def get_node_ip_address(address="8.8.8.8:53"): if ray._private.worker._global_node is not None: return ray._private.worker._global_node.node_ip_address @@ -660,6 +667,103 @@ def get_node_ip_address(address="8.8.8.8:53"): return node_ip_address_from_perspective(address) +def get_cached_node_ip_address(session_dir: str) -> str: + """Get a node address cached on this session. + + If a ray instance is started by `ray start --node-ip-address`, + the node ip address is cached to a file RAY_NODE_IP_FILENAME. + Otherwise, the file exists, but it is emptyl. + + This API is process-safe, meaning the file access is protected by + a file lock. + + Args: + session_dir: Path to the Ray session directory. + + Returns: + node_ip_address cached on the current node. None if the node + the file doesn't exist, meaning ray instance hasn't been + started on a current node. If node_ip_address is not written + to a file, it means --node-ip-address is not given, and in this + case, we find the IP address ourselves. + """ + file_path = Path(os.path.join(session_dir, RAY_NODE_IP_FILENAME)) + cached_node_ip_address = {} + + with FileLock(str(file_path.absolute()) + ".lock"): + if not file_path.exists(): + return None + + with file_path.open() as f: + cached_node_ip_address.update(json.load(f)) + + if "node_ip_address" in cached_node_ip_address: + return cached_node_ip_address["node_ip_address"] + else: + return ray.util.get_node_ip_address() + + +def write_node_ip_address(session_dir: str, node_ip_address: Optional[str]) -> None: + """Write a node ip address of the current session to + RAY_NODE_IP_FILENAME. + + If a ray instance is started by `ray start --node-ip-address`, + the node ip address is cached to a file RAY_NODE_IP_FILENAME. + + This API is process-safe, meaning the file access is protected by + a file lock. + + The file contains a single string node_ip_address. If nothing + is written, it means --node-ip-address was not given, and Ray + resolves the IP address on its own. It assumes in a single node, + you can have only 1 IP address (which is the assumption ray + has in general). + + node_ip_address is the ip address of the current node. + + Args: + session_dir: The path to Ray session directory. + node_ip_address: The node IP address of the current node. + If None, it means the node ip address is not given + by --node-ip-address. In this case, we don't write + anything to a file. + """ + file_path = Path(os.path.join(session_dir, RAY_NODE_IP_FILENAME)) + cached_node_ip_address = {} + + with FileLock(str(file_path.absolute()) + ".lock"): + if not file_path.exists(): + with file_path.open(mode="w") as f: + json.dump({}, f) + + with file_path.open() as f: + cached_node_ip_address.update(json.load(f)) + + cached_node_ip = cached_node_ip_address.get("node_ip_address") + + if node_ip_address is not None: + if cached_node_ip: + if cached_node_ip == node_ip_address: + # Nothing to do. + return + else: + logger.warning( + "The node IP address of the current host recorded " + f"in {RAY_NODE_IP_FILENAME} ({cached_node_ip}) " + "is different from the current IP address: " + f"{node_ip_address}. Ray will use {node_ip_address} " + "as the current node's IP address. " + "Creating 2 instances in the same host with different " + "IP address is not supported. " + "Please create an enhnacement request to" + "https://github.com/ray-project/ray/issues." + ) + + cached_node_ip_address["node_ip_address"] = node_ip_address + with file_path.open(mode="w") as f: + json.dump(cached_node_ip_address, f) + + def create_redis_client(redis_address, password=None): """Create a Redis client. @@ -970,6 +1074,7 @@ def start_reaper(fate_share=None): def start_log_monitor( + session_dir: str, logs_dir: str, gcs_address: str, fate_share: Optional[bool] = None, @@ -982,6 +1087,7 @@ def start_log_monitor( """Start a log monitor process. Args: + session_dir: The session directory. logs_dir: The directory of logging files. gcs_address: GCS address for pubsub. fate_share: Whether to share fate between log_monitor @@ -1006,6 +1112,7 @@ def start_log_monitor( sys.executable, "-u", log_monitor_filepath, + f"--session-dir={session_dir}", f"--logs-dir={logs_dir}", f"--gcs-address={gcs_address}", f"--logging-rotate-bytes={max_bytes}", diff --git a/python/ray/tests/test_logging.py b/python/ray/tests/test_logging.py index 2d51a8520a849..00f54968b1da4 100644 --- a/python/ray/tests/test_logging.py +++ b/python/ray/tests/test_logging.py @@ -25,6 +25,7 @@ from ray._private.test_utils import ( get_log_batch, get_log_message, + get_log_data, init_log_pubsub, run_string_as_driver, wait_for_condition, @@ -551,7 +552,7 @@ def test_log_monitor(tmp_path, live_dead_pids): mock_publisher = MagicMock() log_monitor = LogMonitor( - str(log_dir), mock_publisher, is_proc_alive, max_files_open=5 + "127.0.0.1", str(log_dir), mock_publisher, is_proc_alive, max_files_open=5 ) # files @@ -706,7 +707,7 @@ def test_log_monitor_actor_task_name_and_job_id(tmp_path): mock_publisher = MagicMock() log_monitor = LogMonitor( - str(log_dir), mock_publisher, lambda _: True, max_files_open=5 + "127.0.0.1", str(log_dir), mock_publisher, lambda _: True, max_files_open=5 ) worker_out_log_file = f"worker-{worker_id}-{job_id}-{pid}.out" first_line = "First line\n" @@ -792,7 +793,7 @@ def test_log_monitor_update_backpressure(tmp_path, mock_timer): log_dir.mkdir() mock_publisher = MagicMock() log_monitor = LogMonitor( - str(log_dir), mock_publisher, lambda _: True, max_files_open=5 + "127.0.0.1", str(log_dir), mock_publisher, lambda _: True, max_files_open=5 ) current = 0 @@ -816,7 +817,7 @@ def test_log_monitor_update_backpressure(tmp_path, mock_timer): assert log_monitor.should_update_filenames(current) -def test_repr_inheritance(): +def test_repr_inheritance(shutdown_only): """Tests that a subclass's repr is used in logging.""" logger = logging.getLogger(__name__) @@ -952,6 +953,28 @@ def test_log_with_import(): assert not logger.disabled +@pytest.mark.skipif(sys.platform != "linux", reason="Only works on linux.") +def test_log_monitor_ip_correct(ray_start_cluster): + cluster = ray_start_cluster + # add first node + cluster.add_node(node_ip_address="127.0.0.2") + address = cluster.address + ray.init(address) + # add second node + cluster.add_node(node_ip_address="127.0.0.3") + + @ray.remote + def print_msg(): + print("abc") + + p = init_log_pubsub() + print_msg.remote() + data = get_log_data( + p, num=6, timeout=10, job_id=ray.get_runtime_context().get_job_id() + ) + assert data[0]["ip"] == "127.0.0.2" + + if __name__ == "__main__": import sys diff --git a/python/ray/tests/test_ray_init_2.py b/python/ray/tests/test_ray_init_2.py index 2dcc823b8deb6..8ed64ecb4d08e 100644 --- a/python/ray/tests/test_ray_init_2.py +++ b/python/ray/tests/test_ray_init_2.py @@ -285,9 +285,11 @@ def verify(): def test_get_and_write_node_ip_address(shutdown_only): ray.init() - node = ray._private.worker.global_worker.node node_ip = ray.util.get_node_ip_address() - cached_node_ip_address = node._get_cached_node_ip_address() + session_dir = ray._private.worker._global_node.get_session_dir_path() + cached_node_ip_address = ray._private.services.get_cached_node_ip_address( + session_dir + ) assert cached_node_ip_address == node_ip