Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
11 changes: 11 additions & 0 deletions dlt/common/schema/schema.py
Original file line number Diff line number Diff line change
Expand Up @@ -1128,3 +1128,14 @@ def __getstate__(self) -> Any:
del state["naming"]
del state["data_item_normalizer"]
return state

def __eq__(self, other: Any) -> bool:
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Cool!

if not isinstance(other, Schema):
raise NotImplementedError(
f"Equality between `dlt.Schema` object and {type(other).__name__} is not supported."
)

return self.version_hash == other.version_hash

def __hash__(self) -> int:
return hash(self.version_hash)
74 changes: 69 additions & 5 deletions dlt/dataset/dataset.py
Original file line number Diff line number Diff line change
@@ -1,7 +1,9 @@
from __future__ import annotations

from contextlib import contextmanager
import tempfile
from types import TracebackType
from typing import Any, Optional, Type, Union, TYPE_CHECKING, Literal, overload
from typing import Any, Generator, Optional, Type, Union, TYPE_CHECKING, Literal, overload

from sqlglot.schema import Schema as SQLGlotSchema
import sqlglot.expressions as sge
Expand All @@ -12,9 +14,9 @@
from dlt.common.json import json
from dlt.common.destination.reference import AnyDestination, TDestinationReferenceArg, Destination
from dlt.common.destination.client import JobClientBase, SupportsOpenTables, WithStateSync
from dlt.common.schema import Schema
from dlt.common.typing import Self
from dlt.common.schema.typing import C_DLT_LOAD_ID
from dlt.common.typing import Self, TDataItems
from dlt.common.schema.typing import C_DLT_LOAD_ID, TWriteDisposition
from dlt.common.pipeline import LoadInfo
from dlt.common.utils import simple_repr, without_none
from dlt.destinations.sql_client import SqlClientBase, WithSqlClient
from dlt.dataset import lineage
Expand All @@ -27,6 +29,9 @@
from ibis import BaseBackend as IbisBackend


_INTERNAL_DATASET_PIPELINE_NAME_TEMPLATE = "_dlt_dataset_{dataset_name}"


class Dataset:
"""Access to dataframes and arrow tables in the destination dataset via dbapi"""

Expand Down Expand Up @@ -170,6 +175,46 @@ def is_same_physical_destination(self, other: dlt.Dataset) -> bool:
"""
return is_same_physical_destination(self, other)

# TODO explain users can inspect `_dlt_loads` table to differentiate data originating
# from `pipeline.run()` or `dataset.write()`
@contextmanager
def write_pipeline(self) -> Generator[dlt.Pipeline, None, None]:
"""Get the internal pipeline used by `Dataset.write()`.

Passing a `pipelines_dir` allows you to set a
"""
with tempfile.TemporaryDirectory() as tmp_dir:
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think we should not run in this in a temporary directory but give the pipeline a predictable name and store it with the other pipeline metadata, this way the user can debug the run like any other pipeline run. This is up for debate though.

pipeline = _get_internal_pipeline(
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

You need to forward the staging destination here too, which probably includes knowing the staging destination on the dataset already. Alternatively one would have to provide the staging_destination via the run_kwargs. For working within notebooks where you often get the dataset from a pipeline instace it seems to me it would be good to always have it set on a dataset when you get it from the pipeline.

dataset_name=self.dataset_name, destination=self._destination, pipelines_dir=tmp_dir
)
yield pipeline

def write(
self,
data: TDataItems,
*,
table_name: str,
write_disposition: TWriteDisposition = "append",
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think the columns and loader_file_format args from the run method would also be good canditates here. You can also consider a pipeline_kwargs argument that get's forwarded to the internal pipeline instantiation. But maybe we do not need this and can add it if requested.

) -> LoadInfo:
"""Write `data` to the specified table.

This method uses a full-on `dlt.Pipeline` internally. You can retrieve this pipeline
using `Dataset.get_write_pipeline()` for complete flexibility.
"""
with self.write_pipeline() as internal_pipeline:
# TODO should we try/except this run to gracefully handle failed writes?
info = internal_pipeline.run(
data,
dataset_name=self.dataset_name,
table_name=table_name,
schema=self.schema,
write_disposition=write_disposition,
)

# maybe update the dataset schema
self._schema = internal_pipeline.default_schema
return info

def query(
self,
query: Union[str, sge.Select, ir.Expr],
Expand Down Expand Up @@ -387,7 +432,7 @@ def __str__(self) -> str:
def dataset(
destination: TDestinationReferenceArg,
dataset_name: str,
schema: Union[Schema, str, None] = None,
schema: Union[dlt.Schema, str, None] = None,
) -> Dataset:
return Dataset(destination, dataset_name, schema)

Expand Down Expand Up @@ -451,3 +496,22 @@ def _get_dataset_schema_from_destination_using_dataset_name(
schema = dlt.Schema.from_stored_schema(json.loads(stored_schema.schema))

return schema


def _get_internal_pipeline(
dataset_name: str,
destination: TDestinationReferenceArg,
pipelines_dir: str = None,
) -> dlt.Pipeline:
"""Setup the internal pipeline used by `Dataset.write()`"""
pipeline = dlt.pipeline(
pipeline_name=_INTERNAL_DATASET_PIPELINE_NAME_TEMPLATE.format(dataset_name=dataset_name),
dataset_name=dataset_name,
destination=destination,
pipelines_dir=pipelines_dir,
)
# the internal write pipeline should be stateless; it is limited to the data passed
# it shouldn't persist state (e.g., incremntal cursor) and interfere with other `pipeline.run()`
pipeline.config.restore_from_destination = False

return pipeline
Empty file added tests/dataset/__init__.py
Empty file.
93 changes: 93 additions & 0 deletions tests/dataset/test_dataset_write.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,93 @@
import pathlib
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We need tests for writing into tables that already exist, and reading back from those tables with our database reader methods to see whether the schema was updated properly and works.<

We should also make sure that all the args provided to the write methods are forwarded properly.


import duckdb
import pytest

import dlt
from dlt.common.pipeline import LoadInfo
from dlt.dataset.dataset import (
_INTERNAL_DATASET_PIPELINE_NAME_TEMPLATE,
is_same_physical_destination,
_get_internal_pipeline,
)
from dlt.destinations.exceptions import DatabaseUndefinedRelation

from tests.utils import preserve_environ, patch_home_dir, autouse_test_storage, TEST_STORAGE_ROOT


def test_get_internal_pipeline():
dataset_name = "foo"
destination = dlt.destinations.duckdb(duckdb.connect())
dataset = dlt.dataset(destination, dataset_name)
expected_pipeline_name = _INTERNAL_DATASET_PIPELINE_NAME_TEMPLATE.format(
dataset_name=dataset_name
)

internal_pipeline = _get_internal_pipeline(dataset_name=dataset_name, destination=destination)
internal_dataset = internal_pipeline.dataset()

assert isinstance(internal_pipeline, dlt.Pipeline)
assert internal_pipeline.pipeline_name == expected_pipeline_name
assert internal_pipeline.dataset_name == dataset_name
assert internal_pipeline.destination == destination
assert is_same_physical_destination(dataset, internal_dataset)
assert dataset.schema == internal_dataset.schema


def test_dataset_get_write_pipeline():
dataset_name = "foo"
destination = dlt.destinations.duckdb(duckdb.connect())
dataset = dlt.dataset(destination, dataset_name)
expected_pipeline_name = _INTERNAL_DATASET_PIPELINE_NAME_TEMPLATE.format(
dataset_name=dataset_name
)

with dataset.write_pipeline() as write_pipeline:
write_dataset = write_pipeline.dataset()

assert isinstance(write_pipeline, dlt.Pipeline)
assert write_pipeline.pipeline_name == expected_pipeline_name
assert write_pipeline.dataset_name == dataset_name
assert write_pipeline.destination == destination

assert is_same_physical_destination(dataset, write_dataset)
assert dataset.schema == write_dataset.schema


def test_dataset_write():
dataset_name = "foo"
destination = dlt.destinations.duckdb(duckdb.connect())
dataset = dlt.dataset(destination, dataset_name)
table_name = "bar"
items = [{"id": 0, "value": "bingo"}, {"id": 1, "value": "bongo"}]

# TODO this is currently odd because the tables exists on the `Schema`
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Schemas create the default tables on initialization. We could consider changing this, but that is the reason why they will always exist on any Schema instance regardless of wether anything was materialized.

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think the current behavior is ok and I wouldn't change it. Just wanted to leave a note in the test because the assertion could be surprising.

# used by the `Dataset` but the tables don't exist on the destination yet
assert dataset.tables == ["_dlt_version", "_dlt_loads"]
with pytest.raises(DatabaseUndefinedRelation):
dataset.table("_dlt_version").fetchall()
with pytest.raises(DatabaseUndefinedRelation):
dataset.table("_dlt_loads").fetchall()

load_info = dataset.write(items, table_name=table_name)

assert isinstance(load_info, LoadInfo)
assert dataset.tables == ["bar", "_dlt_version", "_dlt_loads"]
assert dataset.table("bar").select("id", "value").fetchall() == [
tuple(i.values()) for i in items
]


def test_data_write_temporary_dir():
dataset_name = "foo"
destination = dlt.destinations.duckdb(duckdb.connect())
dataset = dlt.dataset(destination, dataset_name)
table_name = "bar"
items = [{"id": 0, "value": "bingo"}, {"id": 1, "value": "bongo"}]
storage_dir = pathlib.Path(TEST_STORAGE_ROOT)

dataset.write(items, table_name=table_name)

# check that pipeline used a temp directory
# the patched test `pipelines_dir` should be empty
assert [str(p) for p in storage_dir.iterdir()] == ["_storage/.dlt"]
Loading