-
Notifications
You must be signed in to change notification settings - Fork 34
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Evaluate logging input data #446
Labels
enhancement
New feature or request
need-design-decision
Several ways of implementation are possible and one must be chosen
Comments
This was referenced Aug 22, 2023
Galileo-Galilei
added
enhancement
New feature or request
need-design-decision
Several ways of implementation are possible and one must be chosen
labels
Oct 25, 2023
# catalog
integration.prm.unified_edges:
<<: *_layer_prm
type: matrix.datasets.mlflow.MlFlowInputDataDataSet
name: edges
context: integration
dataset:
<<: *_spark_parquet
filepath: ${globals:paths.prm}/unified/edges """Custom Mlflow datasets."""
import mlflow
import pandas as pd
from copy import deepcopy
from typing import Any, Dict, Union
from mlflow.tracking import MlflowClient
from kedro_mlflow.io.metrics.mlflow_abstract_metric_dataset import (
MlflowAbstractMetricDataset,
)
from kedro_datasets.pandas import ParquetDataset, CSVDataset
from kedro_datasets.spark import SparkDataset
from kedro_datasets.spark.spark_dataset import _strip_dbfs_prefix
from kedro.io.core import PROTOCOL_DELIMITER, AbstractDataset
from refit.v1.core.inject import _parse_for_objects
class MlFlowInputDataDataSet(AbstractDataset):
"""Kedro dataset to represent MLFlow Input Dataset."""
def __init__(
self,
*,
name: str,
context: str,
dataset: AbstractDataset,
metadata: dict[str, Any] | None = None,
):
"""Initialise MlflowMetricDataset.
Args:
name (str): name of dataset in MLFlow
context: context where dataset is used
dataset: Underlying Kedro dataset
metadata: kedro metadata
"""
self._name = name
self._context = context
self._dataset = _parse_for_objects(dataset)
def _load(self) -> Any:
return self._dataset._load()
def _save(self, data):
self._dataset.save(data)
# FUTURE: Support other datasets
# FUTURE: Fix the source of data
# https://github.com/mlflow/mlflow/issues/13015
if any(isinstance(self._dataset, ds) for ds in [ParquetDataset, CSVDataset]):
ds = mlflow.data.from_pandas(
data, name=self._name, source=self._get_full_path(self._dataset)
)
elif isinstance(self._dataset, SparkDataset):
ds = mlflow.data.from_spark(
data,
name=self._name,
path=_strip_dbfs_prefix(
self._dataset._fs_prefix + str(self._dataset._get_load_path())
),
)
else:
raise NotImplementedError(
f"MLFlow Logging for dataset of type {type(self._dataset)} not implemented!"
)
mlflow.log_input(ds, context=self._context)
@staticmethod
def _get_full_path(dataset: AbstractDataset):
return f"{dataset._protocol}://{str(dataset._filepath)}"
def _describe(self) -> Dict[str, Any]:
"""Describe MLflow metrics dataset.
Returns:
Dict[str, Any]: Dictionary with MLflow metrics dataset description.
"""
return {"context": self._context, "name": self._name} We're currently using the implementation above in our pipeline. |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment
Labels
enhancement
New feature or request
need-design-decision
Several ways of implementation are possible and one must be chosen
Description
Mlflow introduced an API to log dataset. I should evaluate the opportunity to integrate it.
Context
Logging input data is useful for reproducibility.
Possible implmentation
Create a
MLflowDatasetDataset
?The text was updated successfully, but these errors were encountered: