Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
36 commits
Select commit Hold shift + click to select a range
9b10706
remove some duplicate test utils
sh-rp Apr 28, 2025
a569649
use dataset to get table counts
sh-rp Apr 28, 2025
b266825
add exception for sftp but use dataset otherwise for loading table co…
sh-rp Apr 28, 2025
6f3f2a5
update checking of empty tables in filesystem tests
sh-rp Apr 28, 2025
fe3ca8c
support filesystemsqlclient for tables that have prefixes rather than…
sh-rp Apr 28, 2025
c11643f
fix table location resolution for internal tables
sh-rp Apr 28, 2025
bcad180
make sftp check raise same errors as filesystemsqlclient
sh-rp Apr 28, 2025
39eb08e
more cleanup
sh-rp Apr 28, 2025
40ca7bd
fix replace disposition tests
sh-rp Apr 28, 2025
01a3754
simplify table count code in many places
sh-rp Apr 28, 2025
0f6e343
small cleanup
sh-rp Apr 28, 2025
583cbaf
fix tables to dicts function
sh-rp Apr 29, 2025
61553a4
disable databricks and synapse ibis backend tests
sh-rp Apr 24, 2025
b5fd22b
simplify table assertions
sh-rp Apr 29, 2025
a87f5d5
add tests for tests :)
sh-rp Apr 29, 2025
40a2318
fix two tests
sh-rp Apr 29, 2025
2cceb5d
fix dbt tests
sh-rp Apr 29, 2025
d88c641
makes open table locations to work in windows fs
rudolfix Apr 29, 2025
c458862
review comments
sh-rp May 4, 2025
91aa41b
adds docstrings plus linting to pipeline utils
sh-rp May 4, 2025
1f71909
fix docstring linting on utils class
sh-rp May 5, 2025
1911b6f
bump adlfs in lockfile
sh-rp May 5, 2025
7ea806b
Merge branch 'devel' into tests/simplify-load-utils
rudolfix May 15, 2025
540a3bd
Merge branch 'devel' into tests/simplify-load-utils
sh-rp May 20, 2025
3d7b0e5
test loading abfss first
sh-rp May 20, 2025
af17763
test getting tables one by one for azure
sh-rp May 26, 2025
c176a73
Merge branch 'devel' into tests/simplify-load-utils
sh-rp May 26, 2025
2a90c76
fix resolving of sql_client
sh-rp May 26, 2025
bed3a5c
change folder detection
sh-rp May 26, 2025
33e58aa
add comment for abfss fix
sh-rp May 26, 2025
d756a8a
move abfss fallback into utils method
sh-rp May 27, 2025
c2f0a6d
normalizes trailing separator in paths in filesystem
rudolfix May 27, 2025
a0056ae
fixes two tests
sh-rp May 28, 2025
2ed8889
fix glob resolution for tables that have nested folders
sh-rp May 28, 2025
5d432b6
removes globs from duckdb filesystem sql client, adds tests for edge …
rudolfix May 29, 2025
9d6722d
disables globbing for iceberg, adds optional autorefresh flag for vie…
rudolfix May 29, 2025
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 2 additions & 1 deletion Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -97,7 +97,8 @@ lint-docstrings:
dlt/common/destination/dataset.py \
dlt/destinations/impl/**/factory.py \
dlt/pipeline/pipeline.py \
dlt/pipeline/__init__.py
dlt/pipeline/__init__.py \
tests/pipeline/utils.py

test:
poetry run pytest tests
Expand Down
7 changes: 6 additions & 1 deletion dlt/common/destination/client.py
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@
Dict,
Any,
TypeVar,
Tuple,
)
from typing_extensions import Annotated
import datetime # noqa: 251
Expand Down Expand Up @@ -655,7 +656,11 @@ def get_open_table_catalog(self, table_format: TTableFormat, catalog_name: str =

@abstractmethod
def get_open_table_location(self, table_format: TTableFormat, table_name: str) -> str:
"""Computes location in which table metadata is stored. Does not verify if table exists."""
"""Computes location in which table is stored which is typically a "folder" with table
data and metadata. Does not verify if table exists.
Returns:
str: fully formed url with table location
"""

@abstractmethod
def load_open_table(self, table_format: TTableFormat, table_name: str, **kwargs: Any) -> Any:
Expand Down
4 changes: 2 additions & 2 deletions dlt/common/libs/pyiceberg.py
Original file line number Diff line number Diff line change
Expand Up @@ -102,7 +102,7 @@ def evolve_table(
table = catalog.load_table(table_id)
except NoSuchTableError:
# add table to catalog
metadata_path = f"{table_location}/metadata"
metadata_path = f"{table_location.rstrip('/')}/metadata"
if client.fs_client.exists(metadata_path):
# found metadata; register existing table
table = register_table(
Expand Down Expand Up @@ -166,7 +166,7 @@ def _get_fileio_config(credentials: CredentialsConfiguration) -> Dict[str, Any]:
def get_last_metadata_file(
metadata_path: str, fs_client: AbstractFileSystem, config: FilesystemConfiguration
) -> str:
# TODO: implement faster way to obtain `last_metadata_file` (listing is slow)
# TODO: read version-hint.txt first and save it in filesystem
try:
metadata_files = [f for f in fs_client.ls(metadata_path) if f.endswith(".json")]
except FileNotFoundError:
Expand Down
17 changes: 14 additions & 3 deletions dlt/common/storages/configuration.py
Original file line number Diff line number Diff line change
Expand Up @@ -107,11 +107,17 @@ def _make_az_url(scheme: str, fs_path: str, bucket_url: str) -> str:
# az://<container_name>@<storage_account_name>.dfs.core.windows.net/<path>
# fs_path always starts with container
split_path = fs_path.split("/", maxsplit=1)
# preserve slash at the end
if len(split_path) == 2 and split_path[1] == "":
split_path[1] = "/"
# if just a container name, add empty path
if len(split_path) == 1:
split_path.append("")
container, path = split_path
netloc = f"{container}@{parsed_bucket_url.hostname}"
return urlunparse(parsed_bucket_url._replace(path=path, scheme=scheme, netloc=netloc))
# this strips trailing slash
uri = urlunparse(parsed_bucket_url._replace(path=path, scheme=scheme, netloc=netloc))
return uri
return f"{scheme}://{fs_path}"


Expand All @@ -121,8 +127,12 @@ def _make_file_url(scheme: str, fs_path: str, bucket_url: str) -> str:
netloc is never set. UNC paths are represented as file://host/path
"""
p_ = pathlib.Path(fs_path)
# will remove trailing separator
p_ = p_.expanduser().resolve()
return p_.as_uri()
uri = p_.as_uri()
if fs_path.endswith(os.path.sep):
uri += "/"
return uri


MAKE_URI_DISPATCH = {"az": _make_az_url, "file": _make_file_url, "sftp": _make_sftp_url}
Expand All @@ -135,7 +145,8 @@ def _make_file_url(scheme: str, fs_path: str, bucket_url: str) -> str:


def make_fsspec_url(scheme: str, fs_path: str, bucket_url: str) -> str:
"""Creates url from `fs_path` and `scheme` using bucket_url as an `url` template
"""Creates url from `fs_path` and `scheme` using bucket_url as an `url` template, if `fs_path`
ends with separator (indicating folder), it is preserved

Args:
scheme (str): scheme of the resulting url
Expand Down
2 changes: 1 addition & 1 deletion dlt/destinations/dataset/ibis_relation.py
Original file line number Diff line number Diff line change
Expand Up @@ -40,7 +40,7 @@ def query(self) -> Any:
"""build the query"""
from dlt.helpers.ibis import ibis

target_dialect = self._dataset._destination.capabilities().sqlglot_dialect
target_dialect = self._dataset.sql_client.capabilities.sqlglot_dialect

# render sql directly if possible
if target_dialect not in TRANSPILE_VIA_DEFAULT:
Expand Down
8 changes: 6 additions & 2 deletions dlt/destinations/fs_client.py
Original file line number Diff line number Diff line change
Expand Up @@ -14,11 +14,15 @@ class FSClientBase(ABC):
@property
@abstractmethod
def dataset_path(self) -> str:
"""A path within a bucket to tables in a dataset, ending with separator"""
pass

@abstractmethod
def get_table_dir(self, table_name: str) -> str:
"""returns directory for given table"""
"""Returns a directory containing table files, ending with separator.
Native filesystem paths are used for local filesystems.
Note that many tables can share the same table dir.
"""
pass

@abstractmethod
Expand All @@ -28,7 +32,7 @@ def get_table_dirs(self, table_names: Iterable[str]) -> List[str]:

@abstractmethod
def list_table_files(self, table_name: str) -> List[str]:
"""returns all filepaths for a given table"""
"""Returns all filepaths for a given table. Native filesystem paths are used for local filesystems."""
pass

@abstractmethod
Expand Down
10 changes: 7 additions & 3 deletions dlt/destinations/impl/duckdb/sql_client.py
Original file line number Diff line number Diff line change
Expand Up @@ -210,14 +210,18 @@ def _make_database_exception(cls, ex: Exception) -> Exception:
# duckdb raises TypeError on malformed query parameters
return DatabaseTransientException(duckdb.ProgrammingError(ex))
elif isinstance(ex, duckdb.IOException):
message = str(ex)
if (
"read from delta table" in str(ex) and "No files in log segment" in str(ex)
) or "Path does not exist" in str(ex):
"read from delta table" in message and "No files in log segment" in message
) or "Path does not exist" in message:
# delta scanner with no delta data and metadata exist in the location
return DatabaseUndefinedRelation(ex)
if "Could not guess Iceberg table version" in str(ex):
if "Could not guess Iceberg table version" in message:
# same but iceberg
return DatabaseUndefinedRelation(ex)
if "No files found" in message:
# glob patterns not found
return DatabaseUndefinedRelation(ex)
return DatabaseTransientException(ex)
elif isinstance(ex, duckdb.InternalException):
if "INTERNAL Error: Value::LIST(values)" in str(ex):
Expand Down
2 changes: 2 additions & 0 deletions dlt/destinations/impl/filesystem/configuration.py
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,8 @@ class FilesystemDestinationClientConfiguration(FilesystemConfigurationWithLocalF
extra_placeholders: Optional[TExtraPlaceholders] = None
max_state_files: int = 100
"""Maximum number of pipeline state files to keep; 0 or negative value disables cleanup."""
always_refresh_views: bool = False
"""Always refresh table scanner views by setting the newest table metadata or globbing table files"""

@resolve_type("credentials")
def resolve_credentials_type(self) -> Type[CredentialsConfiguration]:
Expand Down
3 changes: 3 additions & 0 deletions dlt/destinations/impl/filesystem/factory.py
Original file line number Diff line number Diff line change
Expand Up @@ -86,6 +86,7 @@ def __init__(
layout: str = DEFAULT_FILE_LAYOUT,
extra_placeholders: Optional[TExtraPlaceholders] = None,
current_datetime: Optional[TCurrentDateTime] = None,
always_refresh_views: bool = None,
destination_name: str = None,
environment: str = None,
**kwargs: Any,
Expand Down Expand Up @@ -113,6 +114,7 @@ def __init__(
are mapped to string values or to callables evaluated at runtime.
current_datetime (Optional[TCurrentDateTime]): Current datetime used by date/time related placeholders. If not provided, load package creation timestamp
will be used.
always_refresh_views (bool, optional): Always refresh sql_client views by setting the newest table metadata or globbing table files
destination_name (str, optional): Name of the destination, can be used in config section to differentiate between multiple of the same type
environment (str, optional): Environment of the destination
**kwargs (Any): Additional arguments passed to the destination config
Expand All @@ -123,6 +125,7 @@ def __init__(
layout=layout,
extra_placeholders=extra_placeholders,
current_datetime=current_datetime,
always_refresh_views=always_refresh_views,
destination_name=destination_name,
environment=environment,
**kwargs,
Expand Down
20 changes: 15 additions & 5 deletions dlt/destinations/impl/filesystem/filesystem.py
Original file line number Diff line number Diff line change
Expand Up @@ -320,7 +320,7 @@ def dataset_path(self) -> str:
"""A path within a bucket to tables in a dataset
NOTE: dataset_name changes if with_staging_dataset is active
"""
return self.pathlib.join(self.bucket_path, self.dataset_name) # type: ignore[no-any-return]
return self.pathlib.join(self.bucket_path, self.dataset_name, "") # type: ignore[no-any-return]

@contextmanager
def with_staging_dataset(self) -> Iterator["FilesystemClient"]:
Expand Down Expand Up @@ -478,14 +478,18 @@ def prepare_load_table(self, table_name: str) -> PreparedTableSchema:
return table

def get_table_dir(self, table_name: str, remote: bool = False) -> str:
"""Returns a directory containing table files, ending with separator.
Note that many tables can share the same table dir
"""
# dlt tables do not respect layout (for now)
table_prefix = self.get_table_prefix(table_name)
table_dir: str = self.pathlib.dirname(table_prefix)
table_dir: str = self.pathlib.dirname(table_prefix) + self.pathlib.sep
if remote:
table_dir = self.make_remote_url(table_dir)
return table_dir

def get_table_prefix(self, table_name: str) -> str:
"""For table prefixes that are folders, trailing separator will be preserved"""
# dlt tables do not respect layout (for now)
if table_name.startswith(self.schema._dlt_tables_prefix):
# dlt tables get layout where each tables is a folder
Expand Down Expand Up @@ -896,9 +900,15 @@ def get_open_table_catalog(self, table_format: TTableFormat, catalog_name: str =
return catalog

def get_open_table_location(self, table_format: TTableFormat, table_name: str) -> str:
"""All tables have location, also those in "native" table format."""
folder = self.get_table_dir(table_name)
location = self.make_remote_url(folder)
"""All tables have location, also those in "native" table format. Native format
in case of filesystem is a set of parquet/csv/jsonl files where a table may
be placed in a separate folder or share common prefix define in the layout.
Locations of native tables will are normalized to include trailing separator
if path is a "folder" (includes buckets)
Note: location is fully formed url
"""
prefix = self.get_table_prefix(table_name)
location = self.make_remote_url(prefix)
if self.config.is_local_filesystem and os.name == "nt":
# pyiceberg cannot deal with windows absolute urls
location = location.replace("file:///", "file://")
Expand Down
87 changes: 46 additions & 41 deletions dlt/destinations/impl/filesystem/sql_client.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,5 @@
from typing import Any, TYPE_CHECKING
from typing import Any, TYPE_CHECKING, Tuple, List
import os
import re
import duckdb

from dlt.common import logger
Expand Down Expand Up @@ -45,17 +44,15 @@ def __init__(
def can_create_view(self, table_schema: PreparedTableSchema) -> bool:
if table_schema.get("table_format") in ("delta", "iceberg"):
return True
file_format = self.get_file_format(table_schema)
return file_format in ("jsonl", "parquet", "csv")
# checking file type is expensive so we optimistically allow to create view and prune later
return True

def get_file_format(self, table_schema: PreparedTableSchema) -> str:
def get_file_format_and_files(self, table_schema: PreparedTableSchema) -> Tuple[str, List[str]]:
table_name = table_schema["name"]
if table_name in self.schema.dlt_table_names():
return "jsonl"
files = self.remote_client.list_table_files(table_name)
if len(files) == 0:
raise DestinationUndefinedEntity(table_name)
return os.path.splitext(files[0])[1][1:]
return os.path.splitext(files[0])[1][1:], files

def create_secret(
self,
Expand Down Expand Up @@ -85,13 +82,18 @@ def create_secret(
raise ValueError(
f"Cannot create secret or register filesystem for protocol {protocol}"
)

return True

def open_connection(self) -> duckdb.DuckDBPyConnection:
first_connection = self.credentials.never_borrowed
super().open_connection()

if first_connection:
# TODO: we need to frontload the httpfs extension for abfss for some reason
if self.is_abfss:
self._conn.sql("LOAD httpfs;")

# create single authentication for the whole client
self.create_secret(
self.remote_client.config.bucket_url, self.remote_client.config.credentials
Expand All @@ -101,9 +103,12 @@ def open_connection(self) -> duckdb.DuckDBPyConnection:
return self._conn

def should_replace_view(self, view_name: str, table_schema: PreparedTableSchema) -> bool:
# we use alternative method to get snapshot on abfss and we need to replace
# the view each time to control the freshness (abfss cannot glob)
return self.is_abfss # and table_format == "iceberg"
if self.remote_client.config.always_refresh_views:
table_format = table_schema.get("table_format")
if table_format == "delta":
# delta will auto refresh
return False
return self.remote_client.config.always_refresh_views

@raise_database_error
def create_view(self, view_name: str, table_schema: PreparedTableSchema) -> None:
Expand Down Expand Up @@ -139,40 +144,37 @@ def _escape_column_name(col_name: str) -> str:
# create from statement
from_statement = ""
if table_format == "delta":
table_location = table_location.rstrip("/")
from_statement = f"delta_scan('{table_location}')"
elif table_format == "iceberg":
table_location = table_location.rstrip("/")
if not self.iceberg_initialized:
self._setup_iceberg(self._conn)
self.iceberg_initialized = True
if self.is_abfss:
# duckdb can't glob on abfss 🤯
from dlt.common.libs.pyiceberg import get_last_metadata_file

metadata_path = f"{table_location}/metadata"
last_metadata_file = get_last_metadata_file(
metadata_path, self.remote_client.fs_client, self.remote_client.config
)
from_statement = (
f"iceberg_scan('{last_metadata_file}', skip_schema_inference=false)"
)
from dlt.common.libs.pyiceberg import get_last_metadata_file

metadata_path = f"{table_location}/metadata"
last_metadata_file = get_last_metadata_file(
metadata_path, self.remote_client.fs_client, self.remote_client.config
)
if ".gz." in last_metadata_file:
compression = ", metadata_compression_codec = 'gzip'"
else:
# skip schema inference to make nested data types work
# https://github.com/duckdb/duckdb_iceberg/issues/47
from_statement = (
f"iceberg_scan('{table_location}', version='?', allow_moved_paths = true,"
" skip_schema_inference=false)"
)
compression = ""

from_statement = (
f"iceberg_scan('{last_metadata_file}', {compression} skip_schema_inference=false)"
)
else:
# get file format from schema
# get file format and list of table files
# NOTE: this does not support cases where table contains many different file formats
first_file_type = self.get_file_format(table_schema)

# build files string
supports_wildcard_notation = not self.is_abfss

resolved_files_string = f"'{table_location}/**/*.{first_file_type}'"
if not supports_wildcard_notation:
files = self.remote_client.list_table_files(table_name)
# NOTE: since we must list all the files anyway we just pass them to duckdb without further globbing
# list is in the memory already and query size in duckdb is very large
first_file_type, files = self.get_file_format_and_files(table_schema)
if protocol == "file":
resolved_files_string = ",".join(map(lambda f: f"'{f}'", files))
else:
resolved_files_string = ",".join(map(lambda f: f"'{protocol}://{f}'", files))

if first_file_type == "parquet":
Expand Down Expand Up @@ -219,11 +221,14 @@ def _escape_column_name(col_name: str) -> str:
)

else:
raise NotImplementedError(
f"Unknown filetype {first_file_type} for table {table_name}. Currently only"
" jsonl and parquet files as well as delta and iceberg tables are"
" supported."
)
# we skipped checking file type in can_create_view to not repeat globs which are expensive
# so we skip here.
return
# raise NotImplementedError(
# f"Unknown filetype {first_file_type} for table {table_name}. Currently only"
# " jsonl and parquet files as well as delta and iceberg tables are"
# " supported."
# )

# create table
view_name = self.make_qualified_table_name(view_name)
Expand Down
1 change: 1 addition & 0 deletions dlt/destinations/path_utils.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
import re
import os
from typing import Any, Dict, List, Optional, Sequence, Set, Tuple

from dlt.common import logger
Expand Down
Loading
Loading