Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Evaluate logging input data #446

Open
Galileo-Galilei opened this issue Aug 18, 2023 · 1 comment
Open

Evaluate logging input data #446

Galileo-Galilei opened this issue Aug 18, 2023 · 1 comment
Labels
enhancement New feature or request need-design-decision Several ways of implementation are possible and one must be chosen

Comments

@Galileo-Galilei
Copy link
Owner

Description

Mlflow introduced an API to log dataset. I should evaluate the opportunity to integrate it.

Context

Logging input data is useful for reproducibility.

Possible implmentation

Create a MLflowDatasetDataset?

@lvijnck
Copy link

lvijnck commented Aug 28, 2024

Hi @Galileo-Galilei

# catalog
integration.prm.unified_edges:
  <<: *_layer_prm
  type: matrix.datasets.mlflow.MlFlowInputDataDataSet
  name: edges
  context: integration
  dataset:
    <<: *_spark_parquet
    filepath: ${globals:paths.prm}/unified/edges
"""Custom Mlflow datasets."""
import mlflow

import pandas as pd
from copy import deepcopy
from typing import Any, Dict, Union

from mlflow.tracking import MlflowClient

from kedro_mlflow.io.metrics.mlflow_abstract_metric_dataset import (
    MlflowAbstractMetricDataset,
)

from kedro_datasets.pandas import ParquetDataset, CSVDataset
from kedro_datasets.spark import SparkDataset
from kedro_datasets.spark.spark_dataset import _strip_dbfs_prefix

from kedro.io.core import PROTOCOL_DELIMITER, AbstractDataset

from refit.v1.core.inject import _parse_for_objects


class MlFlowInputDataDataSet(AbstractDataset):
    """Kedro dataset to represent MLFlow Input Dataset."""

    def __init__(
        self,
        *,
        name: str,
        context: str,
        dataset: AbstractDataset,
        metadata: dict[str, Any] | None = None,
    ):
        """Initialise MlflowMetricDataset.

        Args:
            name (str): name of dataset in MLFlow
            context: context where dataset is used
            dataset: Underlying Kedro dataset
            metadata: kedro metadata
        """
        self._name = name
        self._context = context
        self._dataset = _parse_for_objects(dataset)

    def _load(self) -> Any:
        return self._dataset._load()

    def _save(self, data):
        self._dataset.save(data)

        # FUTURE: Support other datasets
        # FUTURE: Fix the source of data
        # https://github.com/mlflow/mlflow/issues/13015
        if any(isinstance(self._dataset, ds) for ds in [ParquetDataset, CSVDataset]):
            ds = mlflow.data.from_pandas(
                data, name=self._name, source=self._get_full_path(self._dataset)
            )
        elif isinstance(self._dataset, SparkDataset):
            ds = mlflow.data.from_spark(
                data,
                name=self._name,
                path=_strip_dbfs_prefix(
                    self._dataset._fs_prefix + str(self._dataset._get_load_path())
                ),
            )
        else:
            raise NotImplementedError(
                f"MLFlow Logging for dataset of type {type(self._dataset)} not implemented!"
            )

        mlflow.log_input(ds, context=self._context)

    @staticmethod
    def _get_full_path(dataset: AbstractDataset):
        return f"{dataset._protocol}://{str(dataset._filepath)}"

    def _describe(self) -> Dict[str, Any]:
        """Describe MLflow metrics dataset.

        Returns:
            Dict[str, Any]: Dictionary with MLflow metrics dataset description.
        """
        return {"context": self._context, "name": self._name}

We're currently using the implementation above in our pipeline.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
enhancement New feature or request need-design-decision Several ways of implementation are possible and one must be chosen
Projects
Status: 📋 Backlog
Development

No branches or pull requests

2 participants