Skip to content
2 changes: 1 addition & 1 deletion dlt/common/configuration/exceptions.py
Original file line number Diff line number Diff line change
Expand Up @@ -113,7 +113,7 @@ def _build_config_error_message(

def __str__(self) -> str:
msg = (
f"Missing {len(self.fields)} field(s) in configuration`{self.spec_name}`:"
f"Missing {len(self.fields)} field(s) in configuration `{self.spec_name}`:"
f" {', '.join(f'`{f}`' for f in self.fields)}\n"
)
msg += self._build_config_error_message(
Expand Down
48 changes: 45 additions & 3 deletions dlt/common/destination/exceptions.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
from typing import Any, Iterable, List, Sequence
import textwrap

from dlt.common.exceptions import DltException, TerminalException, TransientException
from dlt.common.reflection.exceptions import ReferenceImportError
Expand All @@ -11,17 +12,58 @@ class DestinationException(DltException):

class UnknownDestinationModule(ReferenceImportError, DestinationException, KeyError):
def __init__(
self, ref: str, qualified_refs: Sequence[str], traces: Sequence[ImportTrace]
self,
ref: str,
qualified_refs: Sequence[str],
traces: Sequence[ImportTrace],
destination_type: str = None,
named_dest_attempted: bool = False,
) -> None:
self.ref = ref
self.qualified_refs = qualified_refs
self.destination_type = destination_type
self.named_dest_attempted = named_dest_attempted
super().__init__(traces=traces)

def __str__(self) -> str:
msg = ""
if "." in self.ref:
msg = f"Destination module `{self.ref}` is not registered."
msg += f"Destination module `{self.ref}` is not registered."
else:
msg = f"Destination `{self.ref}` is not one of the standard dlt destinations."
if self.named_dest_attempted:
msg += (
f"Destination '{self.ref}' was first attempted to be resolved as a named"
" destination with a configured type. "
)
if self.destination_type:
msg += (
f"However, the configured destination type '{self.destination_type}' is not"
" valid. Set a valid destination type. "
)
else:
msg += (
"However, no destination type was configured. "
"If your destination is a named destination, "
"set a valid destination type either as an environment variable:\n\n"
)
msg += textwrap.indent(
f"DESTINATION__{self.ref.upper()}__DESTINATION_TYPE=duckdb\n", " "
)
msg += "\nor in your configuration files:\n\n"
msg += textwrap.indent(
f'[destination.{self.ref}]\ndestination_type="duckdb"\n\n', " "
)

msg += (
f"Since no{' valid' if self.destination_type else ''} destination type was"
f" found, dlt also tried to resolve '{self.ref}' as a standard destination."
" However, "
)

msg += (
f"{'d' if self.named_dest_attempted else 'D'}estination `{self.ref}` is not one of"
" the standard dlt destinations."
)

if len(self.qualified_refs) == 1 and self.qualified_refs[0] == self.ref:
pass
Expand Down
100 changes: 64 additions & 36 deletions dlt/common/destination/reference.py
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@
from typing_extensions import TypeAlias
import inspect

import dlt
from dlt.common import logger
from dlt.common.configuration.specs.base_configuration import BaseConfiguration
from dlt.common.normalizers.naming import NamingConvention
Expand Down Expand Up @@ -267,9 +268,36 @@ def from_reference(
)
return ref

return DestinationReference.from_reference(
ref, credentials, destination_name, environment, **kwargs
)
# If destination name is provided or ref is a module ref
# don't attempt to resolve as named destination
if destination_name or "." in ref:
return DestinationReference.from_reference(
ref, credentials, destination_name, environment, **kwargs
)

# First, try to resolve as a named destination with configured type
destination_type: str = dlt.config.get(f"destination.{ref}.destination_type")
if destination_type:
try:
return DestinationReference.from_reference(
ref=destination_type,
destination_name=ref,
credentials=credentials,
environment=environment,
**kwargs,
)
except Exception:
pass

# Then, try to resolve as a shorthand destination ref
try:
return DestinationReference.from_reference(
ref, credentials, destination_name, environment, **kwargs
)
except UnknownDestinationModule as e:
e.destination_type = destination_type
e.named_dest_attempted = True
raise e


class DestinationReference:
Expand All @@ -278,17 +306,6 @@ class DestinationReference:
DESTINATIONS: ClassVar[Dict[str, Type[AnyDestination]]] = {}
"""A registry of all the destination factories"""

@staticmethod
def normalize_type(destination_type: str) -> str:
"""Normalizes destination type string into a canonical form. Assumes that type names without dots correspond to built in destinations."""
if "." not in destination_type:
destination_type = "dlt.destinations." + destination_type
# the next two lines shorten the dlt internal destination paths to dlt.destinations.<destination_type>
pattern = r"\.destinations\.impl\.[a-zA-Z_][.a-zA-Z0-9_]*\."
replacement = ".destinations."
destination_type = re.sub(pattern, replacement, destination_type)
return destination_type

@classmethod
def register(cls, factory: Type[AnyDestination_CO], ref: str) -> None:
"""Registers `factory` class under `ref`. `ref`"""
Expand All @@ -299,28 +316,6 @@ def register(cls, factory: Type[AnyDestination_CO], ref: str) -> None:
)
cls.DESTINATIONS[ref] = factory

@staticmethod
def to_fully_qualified_refs(ref: str) -> List[str]:
"""Converts ref into fully qualified form, return one or more alternatives for shorthand notations.
Run context is injected if needed. Following formats are recognized
- name
NOTE: the last component of destination type serves as destination name if not explicitly specified
"""
ref_split = ref.split(".")
ref_parts = len(ref_split)
if ref_parts < 2:
# context name is needed
refs = []
for ref_prefix in get_plugin_modules():
if ref_prefix:
ref_prefix = f"{ref_prefix}.{known_sections.DESTINATIONS}"
else:
ref_prefix = f"{known_sections.DESTINATIONS}"
refs.append(f"{ref_prefix}.{ref}")
return refs

return []

@classmethod
def find(
cls,
Expand Down Expand Up @@ -411,3 +406,36 @@ def from_reference(
if environment:
kwargs["environment"] = environment
return factory(**kwargs)

@staticmethod
def normalize_type(destination_type: str) -> str:
"""Normalizes destination type string into a canonical form. Assumes that type names without dots correspond to built in destinations."""
if "." not in destination_type:
destination_type = "dlt.destinations." + destination_type
# the next two lines shorten the dlt internal destination paths to dlt.destinations.<destination_type>
pattern = r"\.destinations\.impl\.[a-zA-Z_][.a-zA-Z0-9_]*\."
replacement = ".destinations."
destination_type = re.sub(pattern, replacement, destination_type)
return destination_type

@staticmethod
def to_fully_qualified_refs(ref: str) -> List[str]:
"""Converts ref into fully qualified form, return one or more alternatives for shorthand notations.
Run context is injected if needed. Following formats are recognized
- name
NOTE: the last component of destination type serves as destination name if not explicitly specified
"""
ref_split = ref.split(".")
ref_parts = len(ref_split)
if ref_parts < 2:
# context name is needed
refs = []
for ref_prefix in get_plugin_modules():
if ref_prefix:
ref_prefix = f"{ref_prefix}.{known_sections.DESTINATIONS}"
else:
ref_prefix = f"{known_sections.DESTINATIONS}"
refs.append(f"{ref_prefix}.{ref}")
return refs

return []
8 changes: 6 additions & 2 deletions dlt/common/runtime/run_context.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
from contextlib import contextmanager
from contextlib import contextmanager, suppress
import os
import tempfile
import warnings
Expand Down Expand Up @@ -204,7 +204,11 @@ def get_plugin_modules() -> List[str]:
ctx_module = active().module
run_module_name = ctx_module.__name__ if ctx_module else ""

return [run_module_name] + [p for p in Container()[PluginContext].plugin_modules]
plugin_modules = Container()[PluginContext].plugin_modules.copy()
with suppress(ValueError):
plugin_modules.remove(run_module_name)
plugin_modules.insert(0, run_module_name)
return plugin_modules


def context_uri(name: str, run_dir: str, runtime_kwargs: Optional[Dict[str, Any]]) -> str:
Expand Down
29 changes: 23 additions & 6 deletions docs/website/docs/general-usage/destination.md
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,21 @@ We recommend that you declare the destination type when creating a pipeline inst

Above, we want to use the **filesystem** built-in destination. You can use shorthand types only for built-ins.

* Use a **custom destination name** with a configured type
<!--@@@DLT_SNIPPET ./snippets/destination-snippets.py::custom_destination_name-->

Above, we use a custom destination name and configure the destination type to **filesystem** using an environment variable.

:::note
When resolving non-module destination references (e.g., `"filesystem"` or `"my_destination"`, not `"dlt.destinations.filesystem"`), dlt first attempts to resolve the reference as a named destination with a valid destination type configured, then falls back to shorthand type resolution.

This means that, in the previous example, if the destination type was not properly configured or was not a valid destination type, dlt would have attempted to resolve `"my_destination"` as a shorthand for a built-in type and would have eventually failed.

As another example, the following:
<!--@@@DLT_SNIPPET ./snippets/destination-snippets.py::avoid_example-->
will be resolved as a BigQuery destination that is named `"filesystem"`!
:::

* Use full **destination factory type**
<!--@@@DLT_SNIPPET ./snippets/destination-snippets.py::class_type-->

Expand All @@ -30,32 +45,34 @@ Above, we import the destination factory for **filesystem** and pass it to the p

All examples above will create the same destination class with default parameters and pull required config and secret values from [configuration](credentials/index.md) - they are equivalent.


### Pass explicit parameters and a name to a destination
### Pass explicit parameters and a name to a destination factory
You can instantiate the **destination factory** yourself to configure it explicitly. When doing this, you work with destinations the same way you work with [sources](source.md)
<!--@@@DLT_SNIPPET ./snippets/destination-snippets.py::instance-->

Above, we import and instantiate the `filesystem` destination factory. We pass the explicit URL of the bucket and name the destination `production_az_bucket`.

If a destination is not named, its shorthand type (the Python factory name) serves as a destination name. Name your destination explicitly if you need several separate configurations of destinations of the same type (i.e., you wish to maintain credentials for development, staging, and production storage buckets in the same config file). The destination name is also stored in the [load info](../running-in-production/running.md#inspect-and-save-the-load-info-and-trace) and pipeline traces, so use them also when you need more descriptive names (other than, for example, `filesystem`).
If a destination is not named, its shorthand type (the Python factory name) serves as the destination name. Name your destination explicitly if you need several separate configurations for destinations of the same type (i.e., when you wish to maintain credentials for development, staging, and production storage buckets in the same config file). The destination name is also stored in the [load info](../running-in-production/running.md#inspect-and-save-the-load-info-and-trace) and pipeline traces, so use explicit names when you need more descriptive identifiers (rather than generic names like `filesystem`).


## Configure a destination
We recommend passing the credentials and other required parameters to configuration via TOML files, environment variables, or other [config providers](credentials/setup). This allows you, for example, to easily switch to production destinations after deployment.

We recommend using the [default config section layout](credentials/advanced#organize-configuration-and-secrets-with-sections) as below:
Use the [default config section layout](credentials/advanced#organize-configuration-and-secrets-with-sections) as shown below:
<!--@@@DLT_SNIPPET ./snippets/destination-toml.toml::default_layout-->

or via environment variables:
Alternatively, you can use environment variables:
```sh
DESTINATION__FILESYSTEM__BUCKET_URL=az://dlt-azure-bucket
DESTINATION__FILESYSTEM__CREDENTIALS__AZURE_STORAGE_ACCOUNT_NAME=dltdata
DESTINATION__FILESYSTEM__CREDENTIALS__AZURE_STORAGE_ACCOUNT_KEY="storage key"
```

For named destinations, you use their names in the config section
When using named destination factories, use the destination name in the config section:
<!--@@@DLT_SNIPPET ./snippets/destination-toml.toml::name_layout-->

For custom destination names passed to your pipeline (e.g., `destination="my_destination"`), dlt resolves the destination type from configuration. Add `destination_type` to specify which destination type to use:
<!--@@@DLT_SNIPPET ./snippets/destination-toml.toml::custom_name_layout-->


Note that when you use the [`dlt init` command](../walkthroughs/add-a-verified-source.md) to create or add a data source, `dlt` creates a sample configuration for the selected destination.

Expand Down
2 changes: 1 addition & 1 deletion docs/website/docs/general-usage/pipeline.md
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,7 @@ You instantiate a pipeline by calling the `dlt.pipeline` function with the follo
events and to restore its state and data schemas on subsequent runs. If not provided, `dlt` will
create a pipeline name from the file name of the currently executing Python module.
- `destination`: a name of the [destination](../dlt-ecosystem/destinations) to which dlt
will load the data. It may also be provided to the `run` method of the `pipeline`.
will load the data. It may also be provided to the `run` method of the `pipeline` and can be declared in [various ways](destination.md).
- `dataset_name`: a name of the dataset to which the data will be loaded. A dataset is a logical
group of tables, i.e., `schema` in relational databases or a folder grouping many files. It may also be
provided later to the `run` or `load` methods of the pipeline. If not provided, then
Expand Down
24 changes: 24 additions & 0 deletions docs/website/docs/general-usage/snippets/destination-snippets.py
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,30 @@ def destination_instantiation_snippet() -> None:

assert pipeline.destination.destination_name == "filesystem"

# @@@DLT_SNIPPET_START custom_destination_name
import os
import dlt

os.environ["DESTINATION__MY_DESTINATION__DESTINATION_TYPE"] = "filesystem"

pipeline = dlt.pipeline("pipeline", destination="my_destination")
# @@@DLT_SNIPPET_END custom_destination_name

assert pipeline.destination.destination_type == "dlt.destinations.filesystem"
assert pipeline.destination.destination_name == "my_destination"

# @@@DLT_SNIPPET_START avoid_example
import os
import dlt

os.environ["DESTINATION__FILESYSTEM__DESTINATION_TYPE"] = "bigquery"

pipeline = dlt.pipeline("pipeline", destination="filesystem")
# @@@DLT_SNIPPET_END avoid_example

assert pipeline.destination.destination_type == "dlt.destinations.bigquery"
assert pipeline.destination.destination_name == "filesystem"

# @@@DLT_SNIPPET_START instance
import dlt

Expand Down
11 changes: 10 additions & 1 deletion docs/website/docs/general-usage/snippets/destination-toml.toml
Original file line number Diff line number Diff line change
Expand Up @@ -12,4 +12,13 @@ bucket_url="az://dlt-azure-bucket"
[destination.production_az_bucket.credentials]
azure_storage_account_name="dltdata"
azure_storage_account_key="storage key"
# @@@DLT_SNIPPET_END name_layout
# @@@DLT_SNIPPET_END name_layout

# @@@DLT_SNIPPET_START custom_name_layout
[destination.my_destination]
destination_type="filesystem"
bucket_url="az://dlt-azure-bucket"
[destination.my_destination.credentials]
azure_storage_account_name="dltdata"
azure_storage_account_key="storage key"
# @@@DLT_SNIPPET_END custom_name_layout
5 changes: 4 additions & 1 deletion tests/.dlt/config.toml
Original file line number Diff line number Diff line change
Expand Up @@ -12,4 +12,7 @@ bucket_url_r2="s3://dlt-ci-test-bucket"
# use "/" as root path
bucket_url_gdrive="gdrive://15eC3e5MNew2XAIefWNlG8VlEa0ISnnaG"
bucket_url_sftp="sftp://localhost/data"
memory="memory:///m"
memory="memory:///m"

[destination.custom_name]
destination_type = "duckdb"
Loading
Loading