Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 3 additions & 3 deletions docs/website/docs/general-usage/incremental-loading.md
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@ achieve this, use `write_disposition='replace'` in your resources. Learn more in
- **Append**: appends the new data to the destination. Use `write_disposition='append'`.

- **Merge**: Merges new data to the destination using `merge_key` and/or deduplicates/upserts new data
using `private_key`. Use `write_disposition='merge'`.
using `primary_key`. Use `write_disposition='merge'`.

### Two simple questions determine the write disposition you use

Expand Down Expand Up @@ -56,8 +56,8 @@ The `merge` write disposition is used in two scenarios:
instance of a record for each batch even in case you load an old batch or load the current batch
several times a day (i.e. to receive "live" updates).

The `merge` write disposition loads data to a `staging` dataset, deduplicates the staging data if a
`primary_key` is provided, deletes the data from the destination using `merge_key` and `primary_key`,
The `merge` write disposition loads data to a `staging` dataset, deduplicates the staging data if a
`primary_key` is provided, deletes the data from the destination using `merge_key` and `primary_key`,
and then inserts the new records. All of this happens in a single atomic transaction for a parent and all
child tables.

Expand Down
94 changes: 71 additions & 23 deletions tests/utils.py
Original file line number Diff line number Diff line change
@@ -1,44 +1,63 @@
import os
import sys
import multiprocessing
import os
import platform
import requests
import pytest
import sys
from os import environ
from typing import Iterator, List
from typing import Iterator
from unittest.mock import patch

import pytest
import requests
from requests import Response

import dlt
from dlt.common.configuration.container import Container
from dlt.common.configuration.providers import DictionaryProvider
from dlt.common.configuration.resolve import resolve_configuration
from dlt.common.configuration.specs import RunConfiguration
from dlt.common.configuration.specs.config_providers_context import ConfigProvidersContext
from dlt.common.configuration.specs.config_providers_context import (
ConfigProvidersContext,
)
from dlt.common.pipeline import PipelineContext
from dlt.common.runtime.logger import init_logging
from dlt.common.runtime.telemetry import start_telemetry, stop_telemetry
from dlt.common.storages import FileStorage
from dlt.common.schema import Schema
from dlt.common.storages import FileStorage
from dlt.common.storages.versioned_storage import VersionedStorage
from dlt.common.typing import StrAny
from dlt.common.utils import custom_environ, uniq_id
from dlt.common.pipeline import PipelineContext

TEST_STORAGE_ROOT = "_storage"


# destination constants
IMPLEMENTED_DESTINATIONS = {"athena", "duckdb", "bigquery", "redshift", "postgres", "snowflake", "filesystem", "weaviate", "dummy", "motherduck", "mssql", "qdrant"}
IMPLEMENTED_DESTINATIONS = {
"athena",
"duckdb",
"bigquery",
"redshift",
"postgres",
"snowflake",
"filesystem",
"weaviate",
"dummy",
"motherduck",
"mssql",
"qdrant",
}
NON_SQL_DESTINATIONS = {"filesystem", "weaviate", "dummy", "motherduck", "qdrant"}
SQL_DESTINATIONS = IMPLEMENTED_DESTINATIONS - NON_SQL_DESTINATIONS

# exclude destination configs (for now used for athena and athena iceberg separation)
EXCLUDED_DESTINATION_CONFIGURATIONS = set(dlt.config.get("EXCLUDED_DESTINATION_CONFIGURATIONS", list) or set())
EXCLUDED_DESTINATION_CONFIGURATIONS = set(
dlt.config.get("EXCLUDED_DESTINATION_CONFIGURATIONS", list) or set()
)


# filter out active destinations for current tests
ACTIVE_DESTINATIONS = set(dlt.config.get("ACTIVE_DESTINATIONS", list) or IMPLEMENTED_DESTINATIONS)
ACTIVE_DESTINATIONS = set(
dlt.config.get("ACTIVE_DESTINATIONS", list) or IMPLEMENTED_DESTINATIONS
)

ACTIVE_SQL_DESTINATIONS = SQL_DESTINATIONS.intersection(ACTIVE_DESTINATIONS)
ACTIVE_NON_SQL_DESTINATIONS = NON_SQL_DESTINATIONS.intersection(ACTIVE_DESTINATIONS)
Expand All @@ -47,13 +66,20 @@
assert len(ACTIVE_DESTINATIONS) >= 0, "No active destinations selected"

for destination in NON_SQL_DESTINATIONS:
assert destination in IMPLEMENTED_DESTINATIONS, f"Unknown non sql destination {destination}"
assert (
destination in IMPLEMENTED_DESTINATIONS
), f"Unknown non sql destination {destination}"

for destination in SQL_DESTINATIONS:
assert destination in IMPLEMENTED_DESTINATIONS, f"Unknown sql destination {destination}"
assert (
destination in IMPLEMENTED_DESTINATIONS
), f"Unknown sql destination {destination}"

for destination in ACTIVE_DESTINATIONS:
assert destination in IMPLEMENTED_DESTINATIONS, f"Unknown active destination {destination}"
assert (
destination in IMPLEMENTED_DESTINATIONS
), f"Unknown active destination {destination}"


def TEST_DICT_CONFIG_PROVIDER():
# add test dictionary provider
Expand All @@ -65,6 +91,7 @@ def TEST_DICT_CONFIG_PROVIDER():
providers_context.add_provider(provider)
return provider


class MockHttpResponse(Response):
def __init__(self, status_code: int) -> None:
self.status_code = status_code
Expand Down Expand Up @@ -161,15 +188,19 @@ def start_test_telemetry(c: RunConfiguration = None):
start_telemetry(c)


def clean_test_storage(init_normalize: bool = False, init_loader: bool = False, mode: str = "t") -> FileStorage:
def clean_test_storage(
init_normalize: bool = False, init_loader: bool = False, mode: str = "t"
) -> FileStorage:
storage = FileStorage(TEST_STORAGE_ROOT, mode, makedirs=True)
storage.delete_folder("", recursively=True, delete_ro=True)
storage.create_folder(".")
if init_normalize:
from dlt.common.storages import NormalizeStorage

NormalizeStorage(True)
if init_loader:
from dlt.common.storages import LoadStorage

LoadStorage(True, "jsonl", LoadStorage.ALL_SUPPORTED_FILE_FORMATS)
return storage

Expand All @@ -182,17 +213,33 @@ def create_schema_with_name(schema_name) -> Schema:
def assert_no_dict_key_starts_with(d: StrAny, key_prefix: str) -> None:
assert all(not key.startswith(key_prefix) for key in d.keys())


def skip_if_not_active(destination: str) -> None:
assert destination in IMPLEMENTED_DESTINATIONS, f"Unknown skipped destination {destination}"
assert (
destination in IMPLEMENTED_DESTINATIONS
), f"Unknown skipped destination {destination}"
if destination not in ACTIVE_DESTINATIONS:
pytest.skip(f"{destination} not in ACTIVE_DESTINATIONS", allow_module_level=True)
pytest.skip(
f"{destination} not in ACTIVE_DESTINATIONS", allow_module_level=True
)


def is_running_in_github_fork() -> bool:
is_github_actions = os.environ.get("GITHUB_ACTIONS") == "true"
head_ref = os.environ.get("GITHUB_HEAD_REF", "")
repo = os.environ.get("GITHUB_REPOSITORY", "")
return is_github_actions and ":" in head_ref and not head_ref.startswith(repo.split("/")[0])
event_path = os.environ["GITHUB_EVENT_PATH"]

# Extract necessary information from the GitHub Actions event payload
with open(event_path, encoding="utf-8") as f:
event_data = dlt.common.json.load(f)

# Check if the pull request is from a fork
is_pull_request_from_fork = (
event_data.get("pull_request", {})
.get("head", {})
.get("repo", {})
.get("fork", False)
)

return is_pull_request_from_fork


skipifspawn = pytest.mark.skipif(
Expand All @@ -212,5 +259,6 @@ def is_running_in_github_fork() -> bool:
)

skipifgithubfork = pytest.mark.skipif(
is_running_in_github_fork(), reason="Skipping test because it runs on a PR coming from fork"
)
is_running_in_github_fork(),
reason="Skipping test because it runs on a PR coming from fork",
)