Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
33 commits
Select commit Hold shift + click to select a range
4e6d6c5
makes pyiceberg helper more generic, makes clear catalog is ephemeral
rudolfix Mar 23, 2025
2883043
filesystem config normalizes bucket url also on partial, saved origin…
rudolfix Mar 23, 2025
48ac8a2
extracts base cache sql client to create views on any destination
rudolfix Mar 23, 2025
0b71976
refactors filesystem config to add with local files mixin
rudolfix Mar 23, 2025
1f9515b
bumps pyiceberg to 0.9
rudolfix Mar 23, 2025
1522b07
Merge branch 'devel' into feat/refactors-iceberg-support
rudolfix Mar 25, 2025
d930b6a
passes file_format via schema so it can be used to recognize file for…
rudolfix Mar 25, 2025
a58d36d
improves how secrets are handles in WithTableScanners
rudolfix Mar 25, 2025
fffe74a
fixes wrong resolve for WithLocalFiles configuration
rudolfix Mar 29, 2025
a586a0d
implements aws credentials from fileio
rudolfix Mar 29, 2025
a27e1bc
defines SupportsOpenTables interface and implements it for filesystem
rudolfix Mar 29, 2025
91ecd51
Merge branch 'devel' into feat/refactors-iceberg-support
rudolfix Mar 29, 2025
3d1e0a8
defines exceptions for supports open tables
rudolfix Mar 30, 2025
3145766
bumps and simplifies deltalake
rudolfix Mar 30, 2025
d77c65d
fixes nullability warning and skips NOT NULL on duckdb ALTER with a w…
rudolfix Mar 31, 2025
a351b8a
adds FileIO to credentials ops
rudolfix Apr 5, 2025
19e6944
makes Athena Iceberg location tag configurable
rudolfix Apr 5, 2025
c4c87c8
Merge branch 'devel' into feat/refactors-iceberg-support
rudolfix Apr 6, 2025
31c8420
disables duckdb skipping NOT NULL on alter, adds tests
rudolfix Apr 6, 2025
5cac80c
adds open table client tests
rudolfix Apr 6, 2025
5426eae
adds replace strategy selector, internal x-replace-strategy hint, rem…
rudolfix Apr 6, 2025
0f37a77
excludes certain statements from transactions when running jobs
rudolfix Apr 6, 2025
ea5930c
borrows and returns sqlalchemy connections in destination
rudolfix Apr 7, 2025
96c7a1f
better recognition of terminal and not terminal errors in sqlalchemy
rudolfix Apr 7, 2025
1c3d738
bumps to alpha release
rudolfix Apr 7, 2025
50f8b06
fixes dropping of temp tables in sqlalchemy merge job
rudolfix Apr 7, 2025
920b886
fixes some tests
rudolfix Apr 7, 2025
0a33f69
adds a public property to get config locations from Provider
rudolfix Apr 8, 2025
5d606cb
shows info on locations for config providers when displaying exceptio…
rudolfix Apr 8, 2025
6e624f2
detaches sqllite databases before returning connection. sql alchemy d…
rudolfix Apr 8, 2025
997a827
Merge branch 'devel' into feat/refactors-iceberg-support
rudolfix Apr 9, 2025
015f521
raises when open table client not available
rudolfix Apr 9, 2025
df2bdcc
applies naming convention to sql client with table scanners
rudolfix Apr 9, 2025
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@ secrets.toml
*.duckdb
*.wal
logs/
.continuerules

# Byte-compiled / optimized / DLL files
**/__pycache__/
Expand Down
51 changes: 39 additions & 12 deletions dlt/common/configuration/exceptions.py
Original file line number Diff line number Diff line change
Expand Up @@ -58,22 +58,49 @@ def __str__(self) -> str:
msg += f'\tfor field "{f}" config providers and keys were tried in following order:\n'
for tr in field_traces:
msg += f"\t\tIn {tr.provider} key {tr.key} was not found.\n"
# check if entry point is run with path. this is common problem so warn the user
main_path = main_module_file_path()
if main_path:
main_dir = os.path.dirname(main_path)
abs_main_dir = os.path.abspath(main_dir)
if abs_main_dir != os.getcwd():
# directory was specified

from dlt.common.configuration.container import Container
from dlt.common.configuration.specs import PluggableRunContext

# print locations for config providers
msg += "\n"
providers = Container()[PluggableRunContext].providers
for provider in providers.providers:
if provider.locations:
locations = "\n".join([f"\t- {os.path.abspath(loc)}" for loc in provider.locations])
msg += (
"WARNING: dlt looks for .dlt folder in your current working directory and your"
" cwd (%s) is different from directory of your pipeline script (%s).\n"
% (os.getcwd(), abs_main_dir)
f"Provider {provider.name} used following locations to load"
f" values:\n{locations}\n"
)
if provider.is_empty:
msg += (
"If you keep your secret files in the same folder as your pipeline script but"
" run your script from some other folder, secrets/configs will not be found\n"
f"WARNING: provider {provider.name} is empty. Locations (ie. files) may not"
" exist or may be empty.\n"
)

# check if entry point is run with path. this is common problem so warn the user
main_path = main_module_file_path()
if main_path and main_path.endswith(".py"):
from dlt.common.runtime import run_context

# check if settings are relative
settings = run_context.active().settings_dir
# settings are relative so check makes sense
if not os.path.isabs(settings):
main_dir = os.path.dirname(main_path)
abs_main_dir = os.path.abspath(main_dir)
if abs_main_dir != os.getcwd():
# directory was specified
msg += (
f"WARNING: dlt looks for {settings} folder in your current working"
" directory and your cwd (%s) is different from directory of your pipeline"
" script (%s).\n" % (os.getcwd(), abs_main_dir)
)
msg += (
"If you keep your secret files in the same folder as your pipeline script"
" but run your script from some other folder, secrets/configs will not be"
" found\n"
)
msg += (
"Please refer to https://dlthub.com/docs/general-usage/credentials/ for more"
" information\n"
Expand Down
13 changes: 11 additions & 2 deletions dlt/common/configuration/providers/doc.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
import tomlkit
import yaml
from typing import Any, Callable, Dict, MutableMapping, Optional, Tuple, Type
from typing import Any, Callable, Dict, MutableMapping, Sequence, Optional, Tuple, Type

from dlt.common.configuration.utils import auto_cast, auto_config_fragment
from dlt.common.utils import update_dict_nested
Expand Down Expand Up @@ -136,7 +136,11 @@ def _set_fragment(

class CustomLoaderDocProvider(BaseDocProvider):
def __init__(
self, name: str, loader: Callable[[], Dict[str, Any]], supports_secrets: bool = True
self,
name: str,
loader: Callable[[], Dict[str, Any]],
supports_secrets: bool = True,
locations: Sequence[str] = None,
) -> None:
"""Provider that calls `loader` function to get a Python dict with config/secret values to be queried.
The `loader` function typically loads a string (ie. from file), parses it (ie. as toml or yaml), does additional
Expand All @@ -154,6 +158,7 @@ def __init__(
"""
self._name = name
self._supports_secrets = supports_secrets
self._locations = locations
super().__init__(loader())

@property
Expand All @@ -167,3 +172,7 @@ def supports_secrets(self) -> bool:
@property
def is_writable(self) -> bool:
return True

@property
def locations(self) -> Sequence[str]:
return self._locations
8 changes: 8 additions & 0 deletions dlt/common/configuration/providers/google_secrets.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
import base64
import string
import re
from typing import Sequence

from dlt.common.json import json
from dlt.common.configuration.specs import GcpServiceAccountCredentials
Expand Down Expand Up @@ -63,6 +64,13 @@ def get_key_name(key: str, *sections: str) -> str:
def name(self) -> str:
return "Google Secrets"

@property
def locations(self) -> Sequence[str]:
if self.credentials:
return [str(self.credentials)]
else:
return super().locations

def _look_vault(self, full_key: str, hint: type) -> str:
try:
from googleapiclient.discovery import build
Expand Down
24 changes: 19 additions & 5 deletions dlt/common/configuration/providers/provider.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
import abc
from typing import Any, Tuple, Type, Optional
from typing import Any, Sequence, Tuple, Type, Optional

from dlt.common.configuration.exceptions import ConfigurationException

Expand All @@ -9,34 +9,48 @@ class ConfigProvider(abc.ABC):
def get_value(
self, key: str, hint: Type[Any], pipeline_name: str, *sections: str
) -> Tuple[Optional[Any], str]:
pass
"""Looks for a value under `key` in section(s) `sections` and tries to coerce the
value to type `hint`. A pipeline context (top level section) will be added if
`pipeline_name` was specified.
"""

def set_value(self, key: str, value: Any, pipeline_name: str, *sections: str) -> None:
raise NotImplementedError()

@property
@abc.abstractmethod
def supports_secrets(self) -> bool:
pass
"""If true, provider is allowed to store secret. Configuration resolution fails if
a secret value is discovered in a config provider that does not support secrets.
"""

@property
@abc.abstractmethod
def supports_sections(self) -> bool:
pass
"""If true, config resolution will query this provider for all allowed section combinations
otherwise values are queried only by field name.
"""

@property
@abc.abstractmethod
def name(self) -> str:
pass
"""Human readable name of config provider"""

@property
def is_empty(self) -> bool:
"""Tells if config provider holds any values"""
return False

@property
def is_writable(self) -> bool:
"""Tells if `set_value` may be used"""
return False

@property
def locations(self) -> Sequence[str]:
"""Returns a list of locations where secrets are stored, human readable"""
return []


def get_key_name(key: str, separator: str, /, *sections: str) -> str:
if sections:
Expand Down
1 change: 1 addition & 0 deletions dlt/common/configuration/providers/toml.py
Original file line number Diff line number Diff line change
Expand Up @@ -81,6 +81,7 @@ def __init__(
name,
self._config_toml.unwrap,
supports_secrets,
self._toml_paths,
)

def _resolve_toml_paths(self, file_name: str, resolvable_dirs: List[str]) -> List[str]:
Expand Down
14 changes: 13 additions & 1 deletion dlt/common/configuration/specs/aws_credentials.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@

from dlt.common.utils import without_none
from dlt.common.exceptions import MissingDependencyException
from dlt.common.typing import TSecretStrValue, DictStrAny
from dlt.common.typing import TSecretStrValue, DictStrAny, Self
from dlt.common.configuration.specs import (
CredentialsConfiguration,
CredentialsWithDefault,
Expand Down Expand Up @@ -92,6 +92,18 @@ def to_pyiceberg_fileio_config(self) -> Dict[str, Any]:
"s3.connect-timeout": 300,
}

@classmethod
def from_pyiceberg_fileio_config(cls, file_io: Dict[str, Any]) -> Self:
credentials: Self = cls()
credentials.aws_access_key_id = file_io.get("s3.access-key-id")
credentials.aws_secret_access_key = file_io.get("s3.secret-access-key")
credentials.aws_session_token = file_io.get("s3.session-token")
credentials.region_name = file_io.get("s3.region")
credentials.endpoint_url = file_io.get("s3.endpoint")
if not credentials.is_partial():
credentials.resolve()
return credentials


@configspec
class AwsCredentials(AwsCredentialsWithoutDefaults, CredentialsWithDefault):
Expand Down
36 changes: 34 additions & 2 deletions dlt/common/configuration/specs/azure_credentials.py
Original file line number Diff line number Diff line change
@@ -1,8 +1,9 @@
from copy import copy
from typing import Optional, Dict, Any, Union

from dlt.common.pendulum import pendulum
from dlt.common.exceptions import MissingDependencyException
from dlt.common.typing import TSecretStrValue
from dlt.common.typing import TSecretStrValue, Self
from dlt.common.configuration.specs import (
CredentialsConfiguration,
CredentialsWithDefault,
Expand Down Expand Up @@ -66,6 +67,26 @@ def to_pyiceberg_fileio_config(self) -> Dict[str, Any]:
"adls.sas-token": self.azure_storage_sas_token,
}

@classmethod
def from_pyiceberg_fileio_config(cls, file_io: Dict[str, Any]) -> Self:
# we'll modify file_io so make a copy
file_io = copy(file_io)
# convert signed uri to credentials
for key, value in list(file_io.items()):
if key.startswith("adls.sas-token."):
if "adls.account-name" not in file_io:
file_io["adls.account-name"] = key.split(".")[2]
if "adls.sas-token" not in file_io:
file_io["adls.sas-token"] = value # key value is a sas token
credentials: Self = cls()
credentials.azure_account_host = file_io.get("adls.connection-string")
credentials.azure_storage_account_key = file_io.get("adls.account-key")
credentials.azure_storage_account_name = file_io.get("adls.account-name")
credentials.azure_storage_sas_token = file_io.get("adls.sas-token")
if not credentials.is_partial():
credentials.resolve()
return credentials

def create_sas_token(self) -> None:
try:
from azure.storage.blob import generate_account_sas, ResourceTypes
Expand Down Expand Up @@ -103,14 +124,25 @@ def to_adlfs_credentials(self) -> Dict[str, Any]:
client_secret=self.azure_client_secret,
)

def to_pyiceberg_fileio_config(self) -> Dict[str, Any]:
def to_pyiceberg_fileio_config(self) -> Dict[str, str]:
return {
"adls.account-name": self.azure_storage_account_name,
"adls.tenant-id": self.azure_tenant_id,
"adls.client-id": self.azure_client_id,
"adls.client-secret": self.azure_client_secret,
}

@classmethod
def from_pyiceberg_fileio_config(cls, file_io: Dict[str, Any]) -> Self:
credentials: Self = cls()
credentials.azure_tenant_id = file_io.get("adls.tenant-id")
credentials.azure_client_id = file_io.get("adls.client-id")
credentials.azure_storage_account_name = file_io.get("adls.account-name")
credentials.azure_client_secret = file_io.get("adls.client-secret")
if not credentials.is_partial():
credentials.resolve()
return credentials


@configspec
class AzureCredentials(AzureCredentialsWithoutDefaults, CredentialsWithDefault):
Expand Down
41 changes: 29 additions & 12 deletions dlt/common/configuration/specs/gcp_credentials.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@
)
from dlt.common.configuration.specs.mixins import WithObjectStoreRsCredentials, WithPyicebergConfig
from dlt.common.exceptions import MissingDependencyException
from dlt.common.typing import DictStrAny, TSecretStrValue, StrAny
from dlt.common.typing import DictStrAny, TSecretStrValue, StrAny, Self
from dlt.common.configuration.specs.base_configuration import (
CredentialsConfiguration,
CredentialsWithDefault,
Expand All @@ -25,7 +25,7 @@


@configspec
class GcpCredentials(CredentialsConfiguration, WithObjectStoreRsCredentials, WithPyicebergConfig):
class GcpCredentials(CredentialsConfiguration, WithObjectStoreRsCredentials):
token_uri: Final[str] = dataclasses.field(
default="https://oauth2.googleapis.com/token", init=False, repr=False, compare=False
)
Expand Down Expand Up @@ -77,7 +77,7 @@ def to_object_store_rs_credentials(self) -> Dict[str, str]:


@configspec
class GcpServiceAccountCredentialsWithoutDefaults(GcpCredentials):
class GcpServiceAccountCredentialsWithoutDefaults(GcpCredentials, WithPyicebergConfig):
private_key: TSecretStrValue = None
private_key_id: Optional[str] = None
client_email: str = None
Expand Down Expand Up @@ -134,12 +134,19 @@ def to_pyiceberg_fileio_config(self) -> Dict[str, Any]:
" authentication instead."
)

@classmethod
def from_pyiceberg_fileio_config(cls, file_io: Dict[str, Any]) -> Self:
raise UnsupportedAuthenticationMethodException(
"Service Account authentication not supported with `iceberg` table format. Use OAuth"
" authentication instead."
)

def __str__(self) -> str:
return f"{self.client_email}@{self.project_id}"


@configspec
class GcpOAuthCredentialsWithoutDefaults(GcpCredentials, OAuth2Credentials):
class GcpOAuthCredentialsWithoutDefaults(GcpCredentials, OAuth2Credentials, WithPyicebergConfig):
# only desktop app supported
refresh_token: TSecretStrValue = None
client_type: Final[str] = dataclasses.field(
Expand Down Expand Up @@ -190,13 +197,24 @@ def to_object_store_rs_credentials(self) -> Dict[str, str]:
)

def to_pyiceberg_fileio_config(self) -> Dict[str, Any]:
self.auth()
# do auth only if token is not yet present
if not self.token:
self.auth()
return {
"gcs.project-id": self.project_id,
"gcs.oauth2.token": self.token,
"gcs.oauth2.token-expires-at": (pendulum.now().timestamp() + 60) * 1000,
}

@classmethod
def from_pyiceberg_fileio_config(cls, file_io: Dict[str, Any]) -> Self:
credentials: Self = cls()
credentials.project_id = file_io.get("gcs.project-id")
credentials.token = file_io.get("gcs.oauth2.token")
# if token and project are set, credentials are resolved
credentials.on_partial()
return credentials

def auth(self, scopes: Union[str, List[str]] = None, redirect_url: str = None) -> None:
if not self.refresh_token:
self.add_scopes(scopes)
Expand All @@ -221,6 +239,11 @@ def on_partial(self) -> None:
if not self.is_partial():
self.resolve()
self.refresh_token = None
# if token and project are set, credentials are resolved
if self.token and self.project_id:
self.refresh_token = ""
self.client_id = ""
self.resolve()

def _get_access_token(self) -> str:
try:
Expand Down Expand Up @@ -329,12 +352,6 @@ def to_native_credentials(self) -> Any:
else:
return super().to_native_credentials()

def to_pyiceberg_fileio_config(self) -> Dict[str, Any]:
raise UnsupportedAuthenticationMethodException(
"Application Default Credentials authentication not supported with `iceberg` table"
" format. Use OAuth authentication instead."
)


@configspec
class GcpServiceAccountCredentials(
Expand All @@ -359,6 +376,6 @@ def parse_native_representation(self, native_value: Any) -> None:

def to_pyiceberg_fileio_config(self) -> Dict[str, Any]:
if self.has_default_credentials():
return GcpDefaultCredentials.to_pyiceberg_fileio_config(self)
raise NotImplementedError()
else:
return GcpOAuthCredentialsWithoutDefaults.to_pyiceberg_fileio_config(self)
Loading
Loading