Skip to content

Commit

Permalink
Neptune logger (mosaicml#2447)
Browse files Browse the repository at this point in the history
* draft neptune logger

* correct setup file

* add upload_file method

* implement download_file method

* misc fixes

* add basic test cases

* add test for uploading and deleting file

* docstring update

* Apply suggestions from code review

Co-authored-by: Siddhant Sadangi <siddhant.sadangi@gmail.com>

* name improvement + minor refactor

* code review

* fix logging big ints

* update neptune reqs

* no neptune imports outside of logger methods

* Update composer/loggers/neptune_logger.py

Co-authored-by: Mihir Patel <mihir.v.patel7@gmail.com>

* update tests

* Update composer/loggers/neptune_logger.py

Co-authored-by: Siddhant Sadangi <siddhant.sadangi@gmail.com>

* Docstrings

Co-authored-by: Sabine <sabine.nyholm@neptune.ai>

* code review 1

* code review 2

* set neptune run to None after stopping

* Update composer/loggers/neptune_logger.py

Co-authored-by: Sabine <sabine.nyholm@neptune.ai>

* validate images and channels

* code review + tests

* remove example

* docs augmented

* Update docs/source/trainer/file_uploading.rst

Co-authored-by: Sabine <sabine.nyholm@neptune.ai>

* code review 1

* more fixes

* try to fix pre-commit

* small refactor

* fix doctest

* docstrings adjustment

* remove step from metrics logging

* try to resolve step duplication

* call original epoch_end method

* don't import in vain

* handle duplicated step numbers by adding epoch number

* code review 1 (to be continued...)

* code review 2 (to be continued...)

* walrus operator (composer min py version is 3.8)

* code review 3

* docs and readme update

* extract part of sanitization logic to seperate method

* image visualizer docstring

* Update composer/loggers/neptune_logger.py

Co-authored-by: Sabine <sabine.nyholm@neptune.ai>

* fix

* fix typo ind ocs

* remove example, creds fail

* fix

* patch key

* remove doctest

* restore

* maybe a fix

* fix

* remove doctest

* fix

* debug mode

* fix

---------

Co-authored-by: Siddhant Sadangi <siddhant.sadangi@gmail.com>
Co-authored-by: Mihir Patel <mihir.v.patel7@gmail.com>
Co-authored-by: Sabine <sabine.nyholm@neptune.ai>
  • Loading branch information
4 people authored Feb 7, 2024
1 parent 7e6b775 commit a48f9fa
Show file tree
Hide file tree
Showing 12 changed files with 498 additions and 8 deletions.
3 changes: 3 additions & 0 deletions .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -136,6 +136,9 @@ venv/
# WandB
wandb/

# Neptune
.neptune/

# Spacemacs
._#*
.#*
Expand Down
2 changes: 1 addition & 1 deletion README.md
Original file line number Diff line number Diff line change
Expand Up @@ -105,7 +105,7 @@ Composer is built to automate away low-level pain points and headaches so you ca
Integrate with the tools you know and love for experiment tracking and data streaming.

- **Cloud integrations**: Our Checkpointing and logging features have first-class support for remote storage and loading from Cloud bucket (OCI, GCP, AWS S3).
- **********Experiment tracking:********** Weights and Biases, MLFlow, and CometML — the choice is yours, easily log your data to your favorite platform.
- **********Experiment tracking:********** Weights and Biases, MLFlow, CometML, and neptune.ai — the choice is yours, easily log your data to your favorite platform.

# **🚀 Getting Started**

Expand Down
2 changes: 1 addition & 1 deletion composer/callbacks/image_visualizer.py
Original file line number Diff line number Diff line change
Expand Up @@ -46,7 +46,7 @@ class ImageVisualizer(Callback):
+---------------------------------------------+---------------------------------------+
.. note::
This callback only works with wandb logging for now.
This callback only works with wandb and Neptune logging for now.
Args:
interval (int | str | Time, optional): Time string specifying how often to log train images. For example, ``interval='1ep'``
Expand Down
2 changes: 2 additions & 0 deletions composer/loggers/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@
from composer.loggers.logger_destination import LoggerDestination
from composer.loggers.mlflow_logger import MLFlowLogger
from composer.loggers.mosaicml_logger import MosaicMLLogger
from composer.loggers.neptune_logger import NeptuneLogger
from composer.loggers.progress_bar_logger import ProgressBarLogger
from composer.loggers.remote_uploader_downloader import RemoteUploaderDownloader
from composer.loggers.slack_logger import SlackLogger
Expand All @@ -32,6 +33,7 @@
'LoggerDestination',
'FileLogger',
'InMemoryLogger',
'NeptuneLogger',
'ProgressBarLogger',
'WandBLogger',
'RemoteUploaderDownloader',
Expand Down
307 changes: 307 additions & 0 deletions composer/loggers/neptune_logger.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,307 @@
# Copyright 2022 MosaicML Composer authors
# SPDX-License-Identifier: Apache-2.0

"""Log training metadata to [neptune.ai](https://neptune.ai/)."""

__all__ = ['NeptuneLogger']

import os
import pathlib
import warnings
from functools import partial
from typing import TYPE_CHECKING, Any, Dict, Optional, Sequence, Set, Union

import numpy as np
import torch

from composer._version import __version__
from composer.loggers import LoggerDestination
from composer.utils import MissingConditionalImportError, dist

if TYPE_CHECKING:
from composer import Logger
from composer.core import State


class NeptuneLogger(LoggerDestination):
"""Log to `neptune.ai <https://neptune.ai/>`_.
For more, see the [Neptune-Composer integration guide](https://docs.neptune.ai/integrations/composer/).
Args:
project (str, optional): The name of your Neptune project,
in the form "workspace-name/project-name". If you leave it empty, the
``NEPTUNE_PROJECT`` environment variable will be used.
api_token (str, optional): Your Neptune API token.
You can leave out this argument if you save your token to the
``NEPTUNE_API_TOKEN`` environment variable (recommended).
You can find your API token in the user menu of the Neptune web app.
rank_zero_only (bool, optional): Whether to log only on the rank-zero process.
(default: ``True``).
upload_artifacts (bool, optional): Whether the logger should upload artifacts to Neptune.
(default: ``False``).
base_namespace (str, optional): The name of the base namespace to log the metadata to.
(default: "training").
neptune_kwargs (Dict[str, Any], optional): Any additional keyword arguments to the
``neptune.init_run()`` function. For options, see the
`Run API reference <https://docs.neptune.ai/api/neptune/#init_run>`_ in the
Neptune docs.
"""
metric_namespace = 'metrics'
hyperparam_namespace = 'hyperparameters'
trace_namespace = 'traces'
integration_version_key = 'source_code/integrations/neptune-MosaicML'

def __init__(
self,
*,
project: Optional[str] = None,
api_token: Optional[str] = None,
rank_zero_only: bool = True,
upload_artifacts: bool = False,
base_namespace: str = 'training',
**neptune_kwargs,
) -> None:
try:
from neptune.internal.utils import verify_type
except ImportError as e:
raise MissingConditionalImportError(extra_deps_group='neptune',
conda_package='neptune',
conda_channel='conda-forge') from e

verify_type('project', project, (str, type(None)))
verify_type('api_token', api_token, (str, type(None)))
verify_type('rank_zero_only', rank_zero_only, bool)
verify_type('upload_artifacts', upload_artifacts, bool)
verify_type('base_namespace', base_namespace, str)

if not base_namespace:
raise ValueError("Argument 'base_namespace' cannot be an empty string.")

self._project = project
self._api_token = api_token
self._rank_zero_only = rank_zero_only
self._upload_artifacts = upload_artifacts
self._base_namespace = base_namespace
self._neptune_kwargs = neptune_kwargs

mode = self._neptune_kwargs.pop('mode', 'async')

self._enabled = (not rank_zero_only) or dist.get_global_rank() == 0

self._mode = mode if self._enabled else 'debug'

self._neptune_run = None
self._base_handler = None

self._metrics_dict: Dict[str, int] = {} # used to prevent duplicate step logging

super().__init__()

@property
def neptune_run(self):
"""Gets the Neptune run object from a NeptuneLogger instance.
You can log additional metadata to the run by accessing a path inside the run and assigning metadata to it
with "=" or [Neptune logging methods](https://docs.neptune.ai/logging/methods/).
Example:
from composer import Trainer
from composer.loggers import NeptuneLogger
neptune_logger = NeptuneLogger()
trainer = Trainer(loggers=neptune_logger, ...)
trainer.fit()
neptune_logger.neptune_run["some_metric"] = 1
trainer.close()
"""
from neptune import Run

if not self._neptune_run:
self._neptune_run = Run(
project=self._project,
api_token=self._api_token,
mode=self._mode,
**self._neptune_kwargs,
)
return self._neptune_run

@property
def base_handler(self):
"""Gets a handler for the base logging namespace.
Use the handler to log extra metadata to the run and organize it under the base namespace (default: "training").
You can operate on it like a run object: Access a path inside the handler and assign metadata to it with "=" or
other [Neptune logging methods](https://docs.neptune.ai/logging/methods/).
Example:
from composer import Trainer
from composer.loggers import NeptuneLogger
neptune_logger = NeptuneLogger()
trainer = Trainer(loggers=neptune_logger, ...)
trainer.fit()
neptune_logger.base_handler["some_metric"] = 1
trainer.close()
Result: The value `1` is organized under "training/some_metric" inside the run.
"""
return self.neptune_run[self._base_namespace]

def init(self, state: 'State', logger: 'Logger') -> None:
del logger # unused

self.base_handler['rank'] = dist.get_global_rank()

if self._enabled:
self.neptune_run['sys/name'] = state.run_name
self.neptune_run[self.integration_version_key] = __version__

def _sanitize_metrics(self, metrics: Dict[str, float], step: Optional[int]) -> Dict[str, float]:
"""Sanitize metrics to prevent duplicate step logging.
Args:
metrics (Dict[str, float]): Metrics to log.
step (Optional[int]): Step to log metrics at.
Returns:
Dict[str, float]: Sanitized metrics.
"""
keys_to_delete: Set[str] = set()

for k in metrics:
self._process_single_metric(k, step, keys_to_delete)

return dict(filter(lambda x: x[0] not in keys_to_delete, metrics.items()))

def _process_single_metric(self, metric_key: str, step: Optional[int], keys_to_delete: Set[str]) -> None:
if metric_key not in self._metrics_dict:
self._metrics_dict[metric_key] = step if step is not None else 0
else:
if step is not None:
if step <= self._metrics_dict[metric_key]:
# we cannot insert metrics earlier than or in place of an existing metric point
keys_to_delete.add(metric_key)
else:
self._metrics_dict[metric_key] = step
else:
self._metrics_dict[metric_key] += 1

def log_metrics(self, metrics: Dict[str, float], step: Optional[int] = None) -> None:
if not self._enabled:
return

from neptune.utils import stringify_unsupported

if metrics_to_log := self._sanitize_metrics(metrics, step):
self.base_handler[NeptuneLogger.metric_namespace].append(stringify_unsupported(metrics_to_log), step=step)

def log_hyperparameters(self, hyperparameters: Dict[str, Any]) -> None:
if not self._enabled:
return

from neptune.utils import stringify_unsupported

self.base_handler[NeptuneLogger.hyperparam_namespace] = stringify_unsupported(hyperparameters)

def log_traces(self, traces: Dict[str, Any]):
if not self._enabled:
return

from neptune.utils import stringify_unsupported

self.base_handler[NeptuneLogger.trace_namespace] = stringify_unsupported(traces)

def can_upload_files(self) -> bool:
"""Whether the logger supports uploading files."""
return self._enabled and self._upload_artifacts

def upload_file(
self,
state: 'State',
remote_file_name: str,
file_path: pathlib.Path,
*,
overwrite: bool = False,
):
if not self.can_upload_files():
return

neptune_path = f'{self._base_namespace}/{remote_file_name}'
if self.neptune_run.exists(neptune_path) and not overwrite:

warnings.warn(f"The file '{neptune_path}' already exists and overwrite is set to False."
'No action will be taken.')
return

del state # unused
self.base_handler[remote_file_name].upload(str(file_path))

def download_file(
self,
remote_file_name: str,
destination: str,
overwrite: bool = False,
progress_bar: bool = True,
):
del progress_bar # not supported

if not self._enabled:
return

if os.path.exists(os.path.join(
destination,
remote_file_name,
)) and not overwrite:
warnings.warn(f"Destination '{destination}' already exists and overwrite is set to False."
'No action will be taken.')
return

file_path = f'{self._base_namespace}/{remote_file_name}'
if not self.neptune_run.exists(file_path):
raise FileNotFoundError(f'File {file_path} not found')

self.base_handler[remote_file_name].download(destination=destination)

def log_images(
self,
images: Union[np.ndarray, torch.Tensor, Sequence[Union[np.ndarray, torch.Tensor]]],
name: str = 'Images',
channels_last: bool = False,
step: Optional[int] = None,
masks: Optional[Dict[str, Union[np.ndarray, torch.Tensor, Sequence[Union[np.ndarray, torch.Tensor]]]]] = None,
mask_class_labels: Optional[Dict[int, str]] = None,
use_table: bool = True,
):
if not self._enabled:
return

from neptune.types import File

if not isinstance(images, Sequence) and images.ndim <= 3:
images = _validate_image(images, channels_last=channels_last)
self.base_handler[name].append(File.as_image(images), step=step)

else:
images = list(map(partial(_validate_image, channels_last=channels_last), images))
self.base_handler[name].extend([File.as_image(img) for img in images])

def post_close(self) -> None:
if not self._enabled:
return

if self._neptune_run:
self._neptune_run.stop()
self._neptune_run = None


def _validate_image(img: Union[np.ndarray, torch.Tensor], channels_last: bool) -> np.ndarray:
img_numpy = img.data.cpu().numpy() if isinstance(img, torch.Tensor) else img

assert isinstance(img_numpy, np.ndarray)

# Error out for empty arrays or weird arrays of dimension 0.
if np.any(np.equal(img_numpy.shape, 0)):
raise ValueError(f'Got an image (shape {img_numpy.shape}) with at least one dimension being 0! ')

if not channels_last:
img_numpy = np.moveaxis(img_numpy, 0, -1)

return img_numpy
7 changes: 7 additions & 0 deletions docs/source/doctest_fixtures.py
Original file line number Diff line number Diff line change
Expand Up @@ -71,6 +71,13 @@
except ImportError:
_COMETML_INSTALLED = False

try:
import neptune
_NEPTUNE_INSTALLED = True
del neptune # unused
except ImportError:
_NEPTUNE_INSTALLED = False

try:
import libcloud
_LIBCLOUD_INSTALLED = True
Expand Down
1 change: 1 addition & 0 deletions docs/source/getting_started/installation.rst
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ the following installation targets are available:
* ``pip install 'mosaicml[nlp]'``: Installs Composer with support for NLP models and algorithms.
* ``pip install 'mosaicml[wandb]'``: Installs Composer with support for :mod:`wandb`.
* ``pip install 'mosaicml[comet_ml]'``: Installs Composer with support for :mod:`comet_ml`.
* ``pip install 'mosaicml[neptune]'``: Installs Composer with support for :mod:`neptune`.
* ``pip install 'mosaicml[tensorboard]'``: Installs Composer with support for :mod:`tensorboard`.
* ``pip install 'mosaicml[streaming]'``: Installs Composer with support for `streaming <https://github.com/mosaicml/streaming>`_.
* ``pip install 'mosaicml[mlflow]'``: Installs Composer with support for :mod:`mlflow`.
Expand Down
Loading

0 comments on commit a48f9fa

Please sign in to comment.