-
Couldn't load subscription status.
- Fork 354
feat: Dataset.write()
#3092
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
base: devel
Are you sure you want to change the base?
feat: Dataset.write()
#3092
Changes from all commits
69e4f0a
73bc8a3
3f38932
f519ac3
2c08e88
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -1,7 +1,9 @@ | ||
| from __future__ import annotations | ||
|
|
||
| from contextlib import contextmanager | ||
| import tempfile | ||
| from types import TracebackType | ||
| from typing import Any, Optional, Type, Union, TYPE_CHECKING, Literal, overload | ||
| from typing import Any, Generator, Optional, Type, Union, TYPE_CHECKING, Literal, overload | ||
|
|
||
| from sqlglot.schema import Schema as SQLGlotSchema | ||
| import sqlglot.expressions as sge | ||
|
|
@@ -12,9 +14,9 @@ | |
| from dlt.common.json import json | ||
| from dlt.common.destination.reference import AnyDestination, TDestinationReferenceArg, Destination | ||
| from dlt.common.destination.client import JobClientBase, SupportsOpenTables, WithStateSync | ||
| from dlt.common.schema import Schema | ||
| from dlt.common.typing import Self | ||
| from dlt.common.schema.typing import C_DLT_LOAD_ID | ||
| from dlt.common.typing import Self, TDataItems | ||
| from dlt.common.schema.typing import C_DLT_LOAD_ID, TWriteDisposition | ||
| from dlt.common.pipeline import LoadInfo | ||
| from dlt.common.utils import simple_repr, without_none | ||
| from dlt.destinations.sql_client import SqlClientBase, WithSqlClient | ||
| from dlt.dataset import lineage | ||
|
|
@@ -27,6 +29,9 @@ | |
| from ibis import BaseBackend as IbisBackend | ||
|
|
||
|
|
||
| _INTERNAL_DATASET_PIPELINE_NAME_TEMPLATE = "_dlt_dataset_{dataset_name}" | ||
|
|
||
|
|
||
| class Dataset: | ||
| """Access to dataframes and arrow tables in the destination dataset via dbapi""" | ||
|
|
||
|
|
@@ -170,6 +175,46 @@ def is_same_physical_destination(self, other: dlt.Dataset) -> bool: | |
| """ | ||
| return is_same_physical_destination(self, other) | ||
|
|
||
| # TODO explain users can inspect `_dlt_loads` table to differentiate data originating | ||
| # from `pipeline.run()` or `dataset.write()` | ||
| @contextmanager | ||
| def write_pipeline(self) -> Generator[dlt.Pipeline, None, None]: | ||
| """Get the internal pipeline used by `Dataset.write()`. | ||
|
|
||
| Passing a `pipelines_dir` allows you to set a | ||
| """ | ||
| with tempfile.TemporaryDirectory() as tmp_dir: | ||
|
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I think we should not run in this in a temporary directory but give the pipeline a predictable name and store it with the other pipeline metadata, this way the user can debug the run like any other pipeline run. This is up for debate though. |
||
| pipeline = _get_internal_pipeline( | ||
|
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. You need to forward the staging destination here too, which probably includes knowing the staging destination on the dataset already. Alternatively one would have to provide the staging_destination via the run_kwargs. For working within notebooks where you often get the dataset from a pipeline instace it seems to me it would be good to always have it set on a dataset when you get it from the pipeline. |
||
| dataset_name=self.dataset_name, destination=self._destination, pipelines_dir=tmp_dir | ||
| ) | ||
| yield pipeline | ||
|
|
||
| def write( | ||
| self, | ||
| data: TDataItems, | ||
| *, | ||
| table_name: str, | ||
| write_disposition: TWriteDisposition = "append", | ||
|
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I think the |
||
| ) -> LoadInfo: | ||
| """Write `data` to the specified table. | ||
|
|
||
| This method uses a full-on `dlt.Pipeline` internally. You can retrieve this pipeline | ||
| using `Dataset.get_write_pipeline()` for complete flexibility. | ||
| """ | ||
| with self.write_pipeline() as internal_pipeline: | ||
| # TODO should we try/except this run to gracefully handle failed writes? | ||
| info = internal_pipeline.run( | ||
| data, | ||
| dataset_name=self.dataset_name, | ||
| table_name=table_name, | ||
| schema=self.schema, | ||
| write_disposition=write_disposition, | ||
| ) | ||
|
|
||
| # maybe update the dataset schema | ||
| self._schema = internal_pipeline.default_schema | ||
| return info | ||
|
|
||
| def query( | ||
| self, | ||
| query: Union[str, sge.Select, ir.Expr], | ||
|
|
@@ -387,7 +432,7 @@ def __str__(self) -> str: | |
| def dataset( | ||
| destination: TDestinationReferenceArg, | ||
| dataset_name: str, | ||
| schema: Union[Schema, str, None] = None, | ||
| schema: Union[dlt.Schema, str, None] = None, | ||
| ) -> Dataset: | ||
| return Dataset(destination, dataset_name, schema) | ||
|
|
||
|
|
@@ -451,3 +496,22 @@ def _get_dataset_schema_from_destination_using_dataset_name( | |
| schema = dlt.Schema.from_stored_schema(json.loads(stored_schema.schema)) | ||
|
|
||
| return schema | ||
|
|
||
|
|
||
| def _get_internal_pipeline( | ||
| dataset_name: str, | ||
| destination: TDestinationReferenceArg, | ||
| pipelines_dir: str = None, | ||
| ) -> dlt.Pipeline: | ||
| """Setup the internal pipeline used by `Dataset.write()`""" | ||
| pipeline = dlt.pipeline( | ||
| pipeline_name=_INTERNAL_DATASET_PIPELINE_NAME_TEMPLATE.format(dataset_name=dataset_name), | ||
| dataset_name=dataset_name, | ||
| destination=destination, | ||
| pipelines_dir=pipelines_dir, | ||
| ) | ||
| # the internal write pipeline should be stateless; it is limited to the data passed | ||
| # it shouldn't persist state (e.g., incremntal cursor) and interfere with other `pipeline.run()` | ||
| pipeline.config.restore_from_destination = False | ||
|
|
||
| return pipeline | ||
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,93 @@ | ||
| import pathlib | ||
|
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. We need tests for writing into tables that already exist, and reading back from those tables with our database reader methods to see whether the schema was updated properly and works.< We should also make sure that all the args provided to the write methods are forwarded properly. |
||
|
|
||
| import duckdb | ||
| import pytest | ||
|
|
||
| import dlt | ||
| from dlt.common.pipeline import LoadInfo | ||
| from dlt.dataset.dataset import ( | ||
| _INTERNAL_DATASET_PIPELINE_NAME_TEMPLATE, | ||
| is_same_physical_destination, | ||
| _get_internal_pipeline, | ||
| ) | ||
| from dlt.destinations.exceptions import DatabaseUndefinedRelation | ||
|
|
||
| from tests.utils import preserve_environ, patch_home_dir, autouse_test_storage, TEST_STORAGE_ROOT | ||
|
|
||
|
|
||
| def test_get_internal_pipeline(): | ||
| dataset_name = "foo" | ||
| destination = dlt.destinations.duckdb(duckdb.connect()) | ||
| dataset = dlt.dataset(destination, dataset_name) | ||
| expected_pipeline_name = _INTERNAL_DATASET_PIPELINE_NAME_TEMPLATE.format( | ||
| dataset_name=dataset_name | ||
| ) | ||
|
|
||
| internal_pipeline = _get_internal_pipeline(dataset_name=dataset_name, destination=destination) | ||
| internal_dataset = internal_pipeline.dataset() | ||
|
|
||
| assert isinstance(internal_pipeline, dlt.Pipeline) | ||
| assert internal_pipeline.pipeline_name == expected_pipeline_name | ||
| assert internal_pipeline.dataset_name == dataset_name | ||
| assert internal_pipeline.destination == destination | ||
| assert is_same_physical_destination(dataset, internal_dataset) | ||
| assert dataset.schema == internal_dataset.schema | ||
|
|
||
|
|
||
| def test_dataset_get_write_pipeline(): | ||
| dataset_name = "foo" | ||
| destination = dlt.destinations.duckdb(duckdb.connect()) | ||
| dataset = dlt.dataset(destination, dataset_name) | ||
| expected_pipeline_name = _INTERNAL_DATASET_PIPELINE_NAME_TEMPLATE.format( | ||
| dataset_name=dataset_name | ||
| ) | ||
|
|
||
| with dataset.write_pipeline() as write_pipeline: | ||
| write_dataset = write_pipeline.dataset() | ||
|
|
||
| assert isinstance(write_pipeline, dlt.Pipeline) | ||
| assert write_pipeline.pipeline_name == expected_pipeline_name | ||
| assert write_pipeline.dataset_name == dataset_name | ||
| assert write_pipeline.destination == destination | ||
|
|
||
| assert is_same_physical_destination(dataset, write_dataset) | ||
| assert dataset.schema == write_dataset.schema | ||
|
|
||
|
|
||
| def test_dataset_write(): | ||
| dataset_name = "foo" | ||
| destination = dlt.destinations.duckdb(duckdb.connect()) | ||
| dataset = dlt.dataset(destination, dataset_name) | ||
| table_name = "bar" | ||
| items = [{"id": 0, "value": "bingo"}, {"id": 1, "value": "bongo"}] | ||
|
|
||
| # TODO this is currently odd because the tables exists on the `Schema` | ||
|
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Schemas create the default tables on initialization. We could consider changing this, but that is the reason why they will always exist on any Schema instance regardless of wether anything was materialized. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I think the current behavior is ok and I wouldn't change it. Just wanted to leave a note in the test because the assertion could be surprising. |
||
| # used by the `Dataset` but the tables don't exist on the destination yet | ||
| assert dataset.tables == ["_dlt_version", "_dlt_loads"] | ||
| with pytest.raises(DatabaseUndefinedRelation): | ||
| dataset.table("_dlt_version").fetchall() | ||
| with pytest.raises(DatabaseUndefinedRelation): | ||
| dataset.table("_dlt_loads").fetchall() | ||
|
|
||
| load_info = dataset.write(items, table_name=table_name) | ||
|
|
||
| assert isinstance(load_info, LoadInfo) | ||
| assert dataset.tables == ["bar", "_dlt_version", "_dlt_loads"] | ||
| assert dataset.table("bar").select("id", "value").fetchall() == [ | ||
| tuple(i.values()) for i in items | ||
| ] | ||
|
|
||
|
|
||
| def test_data_write_temporary_dir(): | ||
| dataset_name = "foo" | ||
| destination = dlt.destinations.duckdb(duckdb.connect()) | ||
| dataset = dlt.dataset(destination, dataset_name) | ||
| table_name = "bar" | ||
| items = [{"id": 0, "value": "bingo"}, {"id": 1, "value": "bongo"}] | ||
| storage_dir = pathlib.Path(TEST_STORAGE_ROOT) | ||
|
|
||
| dataset.write(items, table_name=table_name) | ||
|
|
||
| # check that pipeline used a temp directory | ||
| # the patched test `pipelines_dir` should be empty | ||
| assert [str(p) for p in storage_dir.iterdir()] == ["_storage/.dlt"] | ||
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Cool!