Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions dlt/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@
from dlt.extract.decorators import source, resource, transformer, defer
from dlt.destinations.decorators import destination
from dlt.dataset import dataset, Relation, Dataset
from dlt.extract.history import History

from dlt.pipeline import (
pipeline as _pipeline,
Expand Down
8 changes: 4 additions & 4 deletions dlt/extract/concurrency.py
Original file line number Diff line number Diff line change
Expand Up @@ -127,15 +127,15 @@ def submit(self, pipe_item: ResolvablePipeItem) -> TItemFuture:
self.used_slots += 1

self.futures[future] = FuturePipeItem(
future, pipe_item.step, pipe_item.pipe, pipe_item.meta
future, pipe_item.step, pipe_item.pipe, pipe_item.meta, pipe_item.history
)
return future

def sleep(self) -> None:
sleep(self.poll_interval)

def _resolve_future(self, future: TItemFuture) -> Optional[ResolvablePipeItem]:
future, step, pipe, meta = self.futures.pop(future)
future, step, pipe, meta, history = self.futures.pop(future)

if ex := future.exception():
if isinstance(ex, StopAsyncIteration):
Expand All @@ -152,9 +152,9 @@ def _resolve_future(self, future: TItemFuture) -> Optional[ResolvablePipeItem]:
if item is None:
return None
elif isinstance(item, DataItemWithMeta):
return ResolvablePipeItem(item.data, step, pipe, item.meta)
return ResolvablePipeItem(item.data, step, pipe, item.meta, history)
else:
return ResolvablePipeItem(item, step, pipe, meta)
return ResolvablePipeItem(item, step, pipe, meta, history)

def _next_done_future(self) -> Optional[TItemFuture]:
"""Get the done future in the pool (if any). This does not block."""
Expand Down
28 changes: 21 additions & 7 deletions dlt/extract/decorators.py
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@
Iterator,
List,
Literal,
Mapping,
Optional,
Tuple,
Type,
Expand Down Expand Up @@ -465,6 +466,7 @@ def resource(
section: Optional[TTableHintTemplate[str]] = None,
_base_spec: Type[BaseConfiguration] = BaseConfiguration,
standalone: bool = None,
keep_history: bool = False,
) -> ResourceFactory[TResourceFunParams, TDltResourceImpl]: ...


Expand Down Expand Up @@ -492,6 +494,7 @@ def resource(
section: Optional[TTableHintTemplate[str]] = None,
_base_spec: Type[BaseConfiguration] = BaseConfiguration,
standalone: bool = None,
keep_history: bool = False,
) -> Callable[
[Callable[TResourceFunParams, Any]], ResourceFactory[TResourceFunParams, TDltResourceImpl]
]: ...
Expand Down Expand Up @@ -521,6 +524,7 @@ def resource(
section: Optional[str] = None,
_base_spec: Type[BaseConfiguration] = BaseConfiguration,
standalone: bool = None,
keep_history: bool = False,
) -> TDltResourceImpl: ...


Expand All @@ -547,6 +551,7 @@ def resource(
section: Optional[TTableHintTemplate[str]] = None,
_base_spec: Type[BaseConfiguration] = BaseConfiguration,
standalone: bool = None,
keep_history: bool = False,
data_from: TUnboundDltResource = None,
) -> Any:
"""When used as a decorator, transforms any generator (yielding) function into a `dlt resource`. When used as a function, it transforms data in `data` argument into a `dlt resource`.
Expand Down Expand Up @@ -629,6 +634,8 @@ def resource(

standalone (bool, optional): Deprecated. Past functionality got merged into regular resource

keep_history (bool, optional): When `True` the result is added to the history. Now in subsequent nodes the result can be retrieved from the history.

data_from (TUnboundDltResource, optional): Allows to pipe data from one resource to another to build multi-step pipelines.

Raises:
Expand All @@ -654,13 +661,14 @@ def make_resource(_name: str, _section: str, _data: Any) -> TDltResourceImpl:
incremental=incremental,
)
resource = _impl_cls.from_data(
_data,
_name,
_section,
table_template,
selected,
cast(DltResource, data_from),
True,
data=_data,
name=_name,
section=_section,
hints=table_template,
selected=selected,
data_from=cast(DltResource, data_from),
inject_config=True,
keep_history=keep_history,
)

if incremental:
Expand Down Expand Up @@ -796,6 +804,7 @@ def transformer(
parallelized: bool = False,
section: Optional[TTableHintTemplate[str]] = None,
standalone: bool = None,
keep_history: bool = False,
) -> Callable[
[Callable[Concatenate[TDataItem, TResourceFunParams], Any]],
ResourceFactory[TResourceFunParams, DltResource],
Expand Down Expand Up @@ -824,6 +833,7 @@ def transformer(
parallelized: bool = False,
section: Optional[TTableHintTemplate[str]] = None,
standalone: bool = None,
keep_history: bool = False,
) -> ResourceFactory[TResourceFunParams, DltResource]: ...


Expand All @@ -848,6 +858,7 @@ def transformer(
parallelized: bool = False,
section: Optional[TTableHintTemplate[str]] = None,
standalone: bool = None,
keep_history: bool = False,
_impl_cls: Type[TDltResourceImpl] = DltResource, # type: ignore[assignment]
) -> Any:
"""A form of `dlt resource` that takes input from other resources via `data_from` argument in order to enrich or transform the data.
Expand Down Expand Up @@ -925,6 +936,8 @@ def transformer(

standalone (bool, optional): Deprecated. Past functionality got merged into regular resource

keep_history (bool, optional): When `True` the result is added to the history. Now in subsequent nodes the result can be retrieved from the history.

_impl_cls (Type[TDltResourceImpl], optional): A custom implementation of DltResource, may be also used to providing just a typing stub

Raises:
Expand Down Expand Up @@ -960,6 +973,7 @@ def transformer(
parallelized=parallelized,
_impl_cls=_impl_cls,
section=section,
keep_history=keep_history,
)


Expand Down
9 changes: 9 additions & 0 deletions dlt/extract/exceptions.py
Original file line number Diff line number Diff line change
Expand Up @@ -425,3 +425,12 @@ def __init__(self, cursor_path: str) -> None:
" https://dlthub.com/docs/general-usage/incremental-loading#incremental-loading-with-last-value"
" for an example.",
)


class InvalidHistoryAccess(DltResourceException):
def __init__(self, resource_name: str) -> None:
super().__init__(
resource_name,
f"Resource `{resource_name}` does not have history enabled. To enable history set"
" `keep_history=True` in the resource or transformer.",
)
69 changes: 69 additions & 0 deletions dlt/extract/history.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,69 @@
from __future__ import annotations
from typing import Any, Optional, Union, TYPE_CHECKING
from dlt.extract.exceptions import InvalidHistoryAccess

if TYPE_CHECKING:
from dlt.extract.resource import DltResource


class History:
"""
A simple tree node:
- Each node has a `key`, `value`, `parent`, and `children`.
- Keys are normalized (string or DltResource).
- `record` always creates a new child and returns it.
- Lookup walks up the parent chain to find the first matching key.
"""

def __init__(self,
key: Optional[str] = None,
value: Any = None,
parent: Optional[History] = None) -> None:
self.key: Optional[str] = key
self.value: Any = value
self.parent: Optional[History] = parent
self.children: list[History] = []

@staticmethod
def _normalize_key(key: Union[str, "DltResource"]) -> str:
if isinstance(key, str):
return key
name = getattr(key, "__name__", None)
if not isinstance(name, str):
raise TypeError("Key must be str or DltResource-like (has __name__)")
return name

def __getitem__(self, key: Union[str, "DltResource"]) -> Any:
normalized = self._normalize_key(key)
node: Optional[History] = self
while node:
if node.key == normalized:
return node.value
node = node.parent
raise InvalidHistoryAccess(normalized)

def record(self, key: Union[str, "DltResource"], value: Any) -> History:
"""
Always add a new child node and return it.
"""
normalized = self._normalize_key(key)
child = History(key=normalized, value=value, parent=self)
self.children.append(child)
return child

def path(self) -> list[tuple[str, Any]]:
"""
Return the chain of (key, value) pairs from root to this node.
"""
out: list[tuple[str, Any]] = []
node: Optional[History] = self
while node and node.key is not None:
out.append((node.key, node.value))
node = node.parent
return list(reversed(out))

def __repr__(self) -> str:
return f"<History key={self.key!r} value={self.value!r} children={len(self.children)}>"


EMPTY_HISTORY = History()
9 changes: 7 additions & 2 deletions dlt/extract/incremental/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@
)

from dlt.extract.exceptions import IncrementalUnboundError
from dlt.extract.history import History, EMPTY_HISTORY
from dlt.extract.incremental.exceptions import (
IncrementalCursorPathMissing,
IncrementalPrimaryKeyMissing,
Expand Down Expand Up @@ -561,7 +562,9 @@ def _get_transform(self, items: TDataItems) -> IncrementalTransform:
return self._make_or_get_transformer(JsonIncremental)
return self._make_or_get_transformer(JsonIncremental)

def __call__(self, rows: TDataItems, meta: Any = None) -> Optional[TDataItems]:
def __call__(
self, rows: TDataItems, meta: Any = None, history: History = EMPTY_HISTORY
) -> Optional[TDataItems]:
# NOTE: we also forward empty lists, so special empty list types are preserved
# example: MaterializedEmptyList
if rows is None or (isinstance(rows, list) and len(rows) == 0):
Expand Down Expand Up @@ -798,7 +801,9 @@ def bind(self, pipe: SupportsPipe) -> "IncrementalResourceWrapper":
self._incremental.bind(pipe)
return self

def __call__(self, item: TDataItems, meta: Any = None) -> Optional[TDataItems]:
def __call__(
self, item: TDataItems, meta: Any = None, history: History = EMPTY_HISTORY
) -> Optional[TDataItems]:
if not self._incremental:
return item
if self._incremental.primary_key is None:
Expand Down
15 changes: 14 additions & 1 deletion dlt/extract/items.py
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@
TFunHintTemplate,
TDynHintType,
)
from dlt.extract.history import History

TDecompositionStrategy = Literal["none", "scc"]
TDeferredDataItems = Callable[[], TDataItems]
Expand All @@ -40,6 +41,7 @@ class PipeItem(NamedTuple):
step: int
pipe: "SupportsPipe"
meta: Any
history: History


class ResolvablePipeItem(NamedTuple):
Expand All @@ -48,20 +50,23 @@ class ResolvablePipeItem(NamedTuple):
step: int
pipe: "SupportsPipe"
meta: Any
history: History


class FuturePipeItem(NamedTuple):
item: TItemFuture
step: int
pipe: "SupportsPipe"
meta: Any
history: History


class SourcePipeItem(NamedTuple):
item: Union[Iterator[TPipedDataItems], Iterator[ResolvablePipeItem]]
step: int
pipe: "SupportsPipe"
meta: Any
history: History


# pipeline step may be iterator of data items or mapping function that returns data item or another iterator
Expand All @@ -72,10 +77,18 @@ class SourcePipeItem(NamedTuple):
Callable[[TDataItems, Optional[Any]], TPipedDataItems],
Callable[[TDataItems, Optional[Any]], Iterator[TPipedDataItems]],
Callable[[TDataItems, Optional[Any]], Iterator[ResolvablePipeItem]],
# Callable without meta
# Callable without meta and history
Callable[[TDataItems], TPipedDataItems],
Callable[[TDataItems], Iterator[TPipedDataItems]],
Callable[[TDataItems], Iterator[ResolvablePipeItem]],
# Callable with history
Callable[[TDataItems, Optional[History]], TPipedDataItems],
Callable[[TDataItems, Optional[History]], Iterator[TPipedDataItems]],
Callable[[TDataItems, Optional[History]], Iterator[ResolvablePipeItem]],
# Callable with meta and history
Callable[[TDataItems, Optional[Any], Optional[History]], TPipedDataItems],
Callable[[TDataItems, Optional[Any], Optional[History]], Iterator[TPipedDataItems]],
Callable[[TDataItems, Optional[Any], Optional[History]], Iterator[ResolvablePipeItem]],
]


Expand Down
21 changes: 16 additions & 5 deletions dlt/extract/items_transform.py
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@
)

from dlt.extract.items import SupportsPipe
from dlt.extract.history import History, EMPTY_HISTORY


ItemTransformFunctionWithMeta = Callable[[TDataItem, str], TAny]
Expand Down Expand Up @@ -62,7 +63,9 @@ def bind(self: "ItemTransform[TAny]", pipe: SupportsPipe) -> "ItemTransform[TAny
return self

@abstractmethod
def __call__(self, item: TDataItems, meta: Any = None) -> Optional[TDataItems]:
def __call__(
self, item: TDataItems, meta: Any = None, history: History = EMPTY_HISTORY
) -> Optional[TDataItems]:
"""Transforms `item` (a list of TDataItem or a single TDataItem) and returns or yields TDataItems. Returns None to consume item (filter out)"""
pass

Expand All @@ -72,7 +75,9 @@ class FilterItem(ItemTransform[bool]):
_f_meta: ItemTransformFunctionWithMeta[bool]
_f: ItemTransformFunctionNoMeta[bool]

def __call__(self, item: TDataItems, meta: Any = None) -> Optional[TDataItems]:
def __call__(
self, item: TDataItems, meta: Any = None, history: History = EMPTY_HISTORY
) -> Optional[TDataItems]:
if isinstance(item, list):
# preserve type of empty lists
if len(item) == 0:
Expand All @@ -98,7 +103,9 @@ class MapItem(ItemTransform[TDataItem]):
_f_meta: ItemTransformFunctionWithMeta[TDataItem]
_f: ItemTransformFunctionNoMeta[TDataItem]

def __call__(self, item: TDataItems, meta: Any = None) -> Optional[TDataItems]:
def __call__(
self, item: TDataItems, meta: Any = None, history: History = EMPTY_HISTORY
) -> Optional[TDataItems]:
if isinstance(item, list):
# preserve type of empty lists
if len(item) == 0:
Expand All @@ -120,7 +127,9 @@ class YieldMapItem(ItemTransform[Iterator[TDataItem]]):
_f_meta: ItemTransformFunctionWithMeta[TDataItem]
_f: ItemTransformFunctionNoMeta[TDataItem]

def __call__(self, item: TDataItems, meta: Any = None) -> Optional[TDataItems]:
def __call__(
self, item: TDataItems, meta: Any = None, history: History = EMPTY_HISTORY
) -> Optional[TDataItems]:
if isinstance(item, list):
# preserve type of empty lists
if len(item) == 0:
Expand Down Expand Up @@ -188,7 +197,9 @@ def limit(self, chunk_size: int) -> Optional[int]:
return None
return self.max_items * (1 if self.count_rows else chunk_size)

def __call__(self, item: TDataItems, meta: Any = None) -> Optional[TDataItems]:
def __call__(
self, item: TDataItems, meta: Any = None, history: History = EMPTY_HISTORY
) -> Optional[TDataItems]:
# do not count None
if item is None:
return None
Expand Down
Loading