-
Notifications
You must be signed in to change notification settings - Fork 365
Add Goodput & Badput recording and monitoring support. #783
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Changes from all commits
bcd8618
d8474f7
5c62244
1dff92c
1417133
f5d6a37
0221426
8d0c58d
31eb0e1
6212098
380dcac
0e9f4dc
7bd0fc8
62bf113
eeef352
878a26e
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -5,24 +5,39 @@ | |
import jax | ||
from absl import flags, logging | ||
from ml_goodput_measurement import goodput | ||
from ml_goodput_measurement import monitoring as goodput_monitoring | ||
|
||
from axlearn.cloud.common.utils import parse_kv_flags | ||
from axlearn.common import measurement | ||
from axlearn.common.config import maybe_set_config | ||
from axlearn.common.config import REQUIRED, Required, config_class, maybe_set_config | ||
|
||
|
||
@measurement.register_recorder("goodput") | ||
class GoodputRecorder(measurement.Recorder): | ||
"""Records overall training goodput.""" | ||
|
||
Config = measurement.Recorder.Config | ||
@config_class | ||
class Config(measurement.Recorder.Config): | ||
"""Configures GoodputRecorder. | ||
|
||
Attributes: | ||
upload_dir: Directory to store metrics for the monitor. | ||
upload_interval: Time interval (seconds) for monitoring uploads. | ||
""" | ||
|
||
upload_dir: Required[str] = REQUIRED | ||
upload_interval: Required[int] = REQUIRED | ||
|
||
@classmethod | ||
def from_flags(cls, fv: flags.FlagValues) -> "GoodputRecorder": | ||
"""Converts flags to a recorder. | ||
|
||
`fv.recorder_spec` will be interpreted as a list of `key=value` pairs; config names | ||
corresponding to keys will be set to the corresponding values. | ||
corresponding to keys will be set to the corresponding values. A GoodputRecorder can | ||
additionally take in following Tensorboard configs in the recorder_spec: | ||
- upload_dir: The directory to write Tensorboard data to. | ||
- upload_interval: The time interval in seconds at which to query and upload data | ||
to Tensorboard. | ||
""" | ||
cfg: measurement.Recorder.Config = cls.default_config() | ||
cfg = maybe_set_config(cfg, **parse_kv_flags(fv.recorder_spec, delimiter="=")) | ||
|
@@ -32,6 +47,7 @@ def __init__(self, cfg): | |
super().__init__(cfg) | ||
cfg: GoodputRecorder.Config = self.config | ||
self._recorder = None | ||
self._monitor = None | ||
|
||
def record(self, event: measurement.Event, *args, **kwargs): | ||
# Lazily instantiate the recorder. This avoids invoking jax before setup is complete. | ||
|
@@ -49,10 +65,47 @@ def record(self, event: measurement.Event, *args, **kwargs): | |
self._recorder.record_job_end_time(*args, **kwargs) | ||
elif event == measurement.Event.START_STEP: | ||
self._recorder.record_step_start_time(*args, **kwargs) | ||
elif event == measurement.Event.START_ACCELERATOR_INIT: | ||
self._recorder.record_tpu_init_start_time(*args, **kwargs) | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. OOI, is there anything here specific to TPUs or can we use the same API for GPUs on GCP? There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Its not specific to TPUs, since this badput bucket is computed based on recorded markers. The API name needs to be updated to reflect this and whenever this is done, we would need to refactor this piece of code as well. |
||
elif event == measurement.Event.END_ACCELERATOR_INIT: | ||
self._recorder.record_tpu_init_end_time(*args, **kwargs) | ||
elif event == measurement.Event.START_TRAINING_PREPARATION: | ||
self._recorder.record_training_preparation_start_time(*args, **kwargs) | ||
elif event == measurement.Event.END_TRAINING_PREPARATION: | ||
self._recorder.record_training_preparation_end_time(*args, **kwargs) | ||
elif event == measurement.Event.START_DATA_LOADING: | ||
self._recorder.record_data_loading_start_time(*args, **kwargs) | ||
elif event == measurement.Event.END_DATA_LOADING: | ||
self._recorder.record_data_loading_end_time(*args, **kwargs) | ||
else: | ||
logging.log_first_n( | ||
logging.WARNING, | ||
"Ignoring unknown event %s", | ||
1, | ||
event, | ||
) | ||
|
||
def start_monitoring(self, *args, **kwargs): | ||
"""Starts Monitoring of Goodput. | ||
|
||
Instantiate ml-goodput-measurement's GoodputMonitor to asynchronously calculate | ||
Goodput and Badput at the upload_interval and upload to the specified TensorBoard | ||
directory. | ||
Note: This function requires initialization of distributed JAX before it is called. | ||
If there are internal GCP errors from querying and uploading data, these will be | ||
logged without affecting the workload. GoodputMonitor logs will provide further | ||
information if data is not being uploaded correctly. | ||
""" | ||
if self._monitor is None: | ||
cfg: GoodputRecorder.Config = self.config | ||
self._monitor = goodput_monitoring.GoodputMonitor( | ||
job_name=cfg.name, | ||
logger_name=f"goodput_logger_{cfg.name}", | ||
tensorboard_dir=cfg.upload_dir, | ||
upload_interval=int(cfg.upload_interval), | ||
monitoring_enabled=(jax.process_index() == 0), | ||
include_badput_breakdown=True, | ||
) | ||
|
||
self._monitor.start_goodput_uploader(*args, **kwargs) | ||
logging.info("Started Goodput upload to Tensorboard in the background!") |
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -18,11 +18,23 @@ class Event(enum.Enum): | |
START_JOB: Start of job. | ||
END_JOB: End of job. | ||
START_STEP: Start of a training step. Should be recorded with `step` as a positional arg. | ||
START_ACCELERATOR_INIT: Start of accelerator mesh initialization. | ||
END_ACCELERATOR_INIT: End of accelerator mesh initialization. | ||
START_TRAINING_PREPARATION: Start of training preparation. | ||
END_TRAINING_PREPARATION: End of training preparation. | ||
START_DATA_LOADING: Start of data loading. | ||
END_DATA_LOADING: End of data loading. | ||
""" | ||
|
||
START_JOB = "START_JOB" | ||
END_JOB = "END_JOB" | ||
START_STEP = "START_STEP" | ||
START_ACCELERATOR_INIT = "START_ACCELERATOR_INIT" | ||
END_ACCELERATOR_INIT = "END_ACCELERATOR_INIT" | ||
START_TRAINING_PREPARATION = "START_TRAINING_PREPARATION" | ||
END_TRAINING_PREPARATION = "END_TRAINING_PREPARATION" | ||
START_DATA_LOADING = "START_DATA_LOADING" | ||
END_DATA_LOADING = "END_DATA_LOADING" | ||
|
||
|
||
class Recorder(Configurable): | ||
|
@@ -47,6 +59,10 @@ def record(self, event: Event, *args, **kwargs): | |
"""Records an event with the given name.""" | ||
raise NotImplementedError(type(self)) | ||
|
||
def start_monitoring(self, **kwargs): | ||
"""Starts computing and uploading metrics at some configured interval in the background.""" | ||
raise NotImplementedError(type(self)) | ||
dipannita08 marked this conversation as resolved.
Show resolved
Hide resolved
|
||
|
||
|
||
_recorders: dict[str, type] = {} | ||
_T = TypeVar("_T") | ||
|
@@ -120,3 +136,16 @@ def record_event(event: Event): | |
logging.log_first_n(logging.INFO, "No recorder configured, ignoring events.", 1) | ||
else: | ||
global_recorder.record(event) | ||
|
||
|
||
def start_monitoring(): | ||
"""Begins monitoring events as per global monitor functionality.""" | ||
if global_recorder is None: | ||
logging.log_first_n( | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. nit -- since There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Since this is a no-op, I'll leave this for now and address it the next PR. Expecting to complete integration with package v5 in the next one as well. Here is the tracking bug. |
||
logging.INFO, "Since recorder is not set up, monitoring cannot be started.", 1 | ||
) | ||
else: | ||
global_recorder.start_monitoring() | ||
logging.info( | ||
"Starting monitoring of events using global recorder's monitor: %s", global_recorder | ||
) |
Uh oh!
There was an error while loading. Please reload this page.