Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
16 changes: 4 additions & 12 deletions dlt/helpers/marimo/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -15,23 +15,15 @@ Edit the `_pipeline_browser.py` notebook until you're satisfied. Use a [setup ce

## 3. "register" the widget

To make the widget available, you need to modify `dlt/helpers/marimo/_widgets/__init__.py` and `dlt/helpers/marimo/__init__.py`

```python
# `dlt/helpers/marimo/_widgets/__init__.py`
# import the `app` variable from the notebook file
# the alias for `app` will be the widget name imported by users
from dlt.helpers.marimo._widgets._pipeline_browser import app as pipeline_browser
```
To make the widget available, you need to modify `dlt/helpers/marimo/__init__.py`.

```python
# `dlt/helpers/marimo/__init__.py`
# import the `pipeline_browser` (the aliased `app` variable from the notebook)
# and add it to the `__all__` public interface
from dlt.helpers.marimo._widgets import render, pipeline_browser
# import the `app` variable from the notebook file and alias it to the intended widget name;
# add it to the `__all__` public interface
from dlt.helpers.marimo._widgets._pipeline_browser import app as pipeline_browser

__all__ = (
"render",
"pipeline_browser",
)
```
Expand Down
28 changes: 26 additions & 2 deletions dlt/helpers/marimo/__init__.py
Original file line number Diff line number Diff line change
@@ -1,8 +1,32 @@
from dlt.helpers.marimo._widgets import render, load_package_viewer, schema_viewer
from __future__ import annotations

from typing import TYPE_CHECKING

from dlt.common.exceptions import MissingDependencyException

# when creating a widget, import the `app` variable and rename it to the widget name
from dlt.helpers.marimo._widgets._load_package_viewer import app as load_package_viewer
from dlt.helpers.marimo._widgets._schema_viewer import app as schema_viewer
from dlt.helpers.marimo._widgets._local_pipelines_summary_viewer import (
app as local_pipelines_summary_viewer,
)

if TYPE_CHECKING:
import marimo
from mowidgets._widget import _MoWidgetBase # type: ignore[import-untyped]


__all__ = (
"render",
"load_package_viewer",
"schema_viewer",
"local_pipelines_summary_viewer",
)


def render(widget: marimo.App) -> _MoWidgetBase:
try:
import mowidgets # type: ignore[import-untyped]

return mowidgets.widgetize(widget, data_access=True)
except ImportError:
raise MissingDependencyException("dlt.helpers.marimo", ["marimo", "mowidgets"])
10 changes: 0 additions & 10 deletions dlt/helpers/marimo/_widgets/__init__.py
Original file line number Diff line number Diff line change
@@ -1,10 +0,0 @@
import marimo as mo

# when adding widget, import the `app` variable and rename it to the widget name
from dlt.helpers.marimo._widgets._load_package_viewer import app as load_package_viewer
from dlt.helpers.marimo._widgets._schema_viewer import app as schema_viewer


async def render(widget: mo.App) -> mo.Html:
result = await widget.embed()
return result.output
181 changes: 181 additions & 0 deletions dlt/helpers/marimo/_widgets/_local_pipelines_summary_viewer.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,181 @@
import marimo

__generated_with = "0.14.10"
app = marimo.App(width="medium")

with app.setup:
import pathlib
import functools
from typing import Optional, Union

import marimo as mo

from dlt.common.json import json
from dlt.common.pendulum import datetime
from dlt.common.storages import FileStorage
from dlt.common.pipeline import get_dlt_pipelines_dir


@app.function
def get_pipeline_options(pipelines_dir: str):
_storage = FileStorage(pipelines_dir)

try:
_pipelines = _storage.list_folder_dirs(".", to_root=False)
except Exception:
_pipelines = []

return {p: _storage.storage_path + "/" + p for p in sorted(_pipelines)}


@app.cell
def _():
pipelines_dir_textbox = mo.ui.text(
value=get_dlt_pipelines_dir(), full_width=True, label="Pipelines directory"
)
pipelines_dir_textbox
return (pipelines_dir_textbox,)


@app.cell
def _(pipelines_dir_textbox):
pipelines_options = {}
_msg = None

_pipelines_dir = pathlib.Path(pipelines_dir_textbox.value)
if _pipelines_dir.exists() and _pipelines_dir.is_dir():
pipelines_options = get_pipeline_options(_pipelines_dir)
else:
_msg = mo.md(f"No directory found at `{_pipelines_dir.resolve()}`")

_msg
return (pipelines_options,)


@app.function
def open_directory(*args, path: Union[str, pathlib.Path]) -> None:
import platform
import os
import subprocess

path = pathlib.Path(path).resolve()
if not path.is_dir():
raise NotADirectoryError(f"`{path}` is not a valid directory.")

system = platform.system()

try:
if system == "Windows":
os.startfile(path)
elif system == "Darwin": # macOS
subprocess.run(["open", path])
elif system == "Linux":
subprocess.run(["xdg-open", path])
else:
raise NotImplementedError(f"Unsupported OS: `{system}`")

except Exception:
raise


@app.function
def pipeline_state(pipeline_path: str) -> dict:
return json.typed_loads(pathlib.Path(pipeline_path, "state.json").read_text())


@app.function
def last_extracted_date(pipeline_state: dict) -> Optional:
return pipeline_state["_local"].get("_last_extracted_at")


@app.function
def destination_type(pipeline_state: dict) -> Optional[str]:
return (
pipeline_state.get("destination_type").rpartition(".")[2]
if pipeline_state.get("destination_type")
else None
)


@app.function
def destination_name(pipeline_state: dict) -> Optional[str]:
return (
pipeline_state.get("destination_name").rpartition(".")[2]
if pipeline_state.get("destination_name")
else None
)


@app.function
def last_run_directory(pipeline_state: dict) -> Optional[str]:
if pipeline_state.get("_local").get("last_run_context"):
return pipeline_state["_local"]["last_run_context"].get("run_dir")

return None


@app.function
def open_pipeline_dir_btn(full_path: str) -> mo.ui.button:
return mo.ui.button(
on_click=functools.partial(open_directory, path=full_path),
label="open",
)


@app.function
def open_last_run_dir_btn(last_run_dir: Optional[str]) -> Optional[mo.ui.button]:
if last_run_dir is None:
return None

return mo.ui.button(
on_click=functools.partial(open_directory, path=last_run_dir),
label=last_run_dir,
)


@app.cell
def _(pipelines_options):
data = []
for _pipeline_name, _full_p in pipelines_options.items():
_p_stats = pathlib.Path(_full_p).stat()
_p_state = pipeline_state(_full_p)

_d = {
"name": _pipeline_name,
" ": open_pipeline_dir_btn(_full_p),
"created_at": datetime.datetime.fromtimestamp(_p_stats.st_mtime),
"dataset": _p_state["dataset_name"],
"destination": destination_type(_p_state),
"last_extracted_at": last_extracted_date(_p_state),
"last_run_dir": open_last_run_dir_btn(last_run_directory(_p_state)),
}

data.append(_d)

data = sorted(data, key=lambda d: d["created_at"], reverse=True)
return (data,)


@app.cell
def _(data):
_FORMATTING = (
{
"created_at": lambda r: r.strftime("%Y-%m-%d %H:%M:%S"),
"last_extracted_at": lambda r: (
r.strftime("%Y-%m-%d %H:%M:%S") if r is not None else None
),
},
)

mo.ui.table(
data,
label="Local pipelines",
freeze_columns_left=["name", " "],
selection=None,
format_mapping=_FORMATTING,
)
return


if __name__ == "__main__":
app.run()
2 changes: 1 addition & 1 deletion dlt/helpers/marimo/utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@

import pathlib
import textwrap
from typing import TYPE_CHECKING, Any
from typing import TYPE_CHECKING, Any, Union

from dlt.common.storages import FileStorage
from dlt.common.pipeline import get_dlt_pipelines_dir
Expand Down
4 changes: 4 additions & 0 deletions pyproject.toml
Original file line number Diff line number Diff line change
Expand Up @@ -180,6 +180,10 @@ workspace = [
"pandas>=2.1.4",
"marimo>=0.14.5",
]
marimo = [
"marimo>=0.14.5",
"mowidgets>=0.1.2",
]
dbml = [
"pydbml"
]
Expand Down
16 changes: 16 additions & 0 deletions tests/helpers/marimo_tests/test_widgets.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,16 @@
import marimo

import dlt.helpers.marimo


def test_available_widgets():
expected_widgets = (
"load_package_viewer",
"schema_viewer",
"local_pipelines_summary_viewer",
)

for expected_widget_name in expected_widgets:
assert expected_widget_name in dlt.helpers.marimo.__all__
widget = getattr(dlt.helpers.marimo, expected_widget_name, None)
assert isinstance(widget, marimo.App)
20 changes: 19 additions & 1 deletion uv.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

Loading