Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion .github/workflows/test_tools_dashboard.yml
Original file line number Diff line number Diff line change
Expand Up @@ -114,7 +114,7 @@ jobs:
- name: Run dashboard e2e windows
run: |
start marimo run --headless dlt/_workspace/helpers/dashboard/dlt_dashboard.py -- -- --pipelines-dir _storage\.dlt\pipelines\ --with_test_identifiers true
timeout /t 2 /nobreak >NUL
timeout /t 6 /nobreak
pytest --browser chromium tests/e2e
if: matrix.python-version != '3.9' && matrix.python-version != '3.14.0-beta.4' && matrix.os == 'windows-latest'

Expand Down
2 changes: 1 addition & 1 deletion dlt/_workspace/cli/_pipeline_command.py
Original file line number Diff line number Diff line change
Expand Up @@ -141,7 +141,7 @@ def _display_pending_packages() -> Tuple[Sequence[str], Sequence[str]]:
if operation == "show":
from dlt.common.runtime import signals

with signals.delayed_signals():
with signals.intercepted_signals():
streamlit_cmd = [
"streamlit",
"run",
Expand Down
19 changes: 13 additions & 6 deletions dlt/common/destination/client.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
from abc import ABC, abstractmethod
import dataclasses

import contextlib
from threading import BoundedSemaphore
from types import TracebackType
from typing import (
ClassVar,
Expand Down Expand Up @@ -349,6 +350,7 @@ def metrics(self) -> Optional[LoadJobMetrics]:
self._finished_at,
self.state(),
None,
self._parsed_file_name.retry_count,
)


Expand All @@ -371,13 +373,16 @@ def __init__(self, file_path: str) -> None:
# ensure file name
super().__init__(file_path)
self._state: TLoadJobState = "ready"
self._started_at = pendulum.now()
self._exception: BaseException = None

# variables needed by most jobs, set by the loader in set_run_vars
self._schema: Schema = None
self._load_table: PreparedTableSchema = None
self._load_id: str = None
# set by run_managed method
self._job_client: "JobClientBase" = None
self._done_event: BoundedSemaphore = None

def set_run_vars(self, load_id: str, schema: Schema, load_table: PreparedTableSchema) -> None:
"""
Expand All @@ -394,21 +399,21 @@ def load_table_name(self) -> str:
def run_managed(
self,
job_client: "JobClientBase",
done_event: BoundedSemaphore,
/,
) -> None:
"""
wrapper around the user implemented run method
"""
from dlt.common.runtime import signals

# only jobs that are not running or have not reached a final state
# may be started
assert self._state in ("ready", "retry")
self._job_client = job_client
self._done_event = done_event

# filepath is now moved to running
try:
self._state = "running"
self._started_at = pendulum.now()
self._job_client.prepare_load_job_execution(self)
self.run()
self._state = "completed"
Expand All @@ -423,12 +428,14 @@ def run_managed(
f"Transient exception in job {self.job_id()} in file {self._file_path}"
)
finally:
self._finished_at = pendulum.now()
# sanity check
assert self._state in ("completed", "retry", "failed")
if self._state != "retry":
self._finished_at = pendulum.now()
# wake up waiting threads
signals.wake_all()
if self._done_event:
with contextlib.suppress(ValueError):
self._done_event.release()

@abstractmethod
def run(self) -> None:
Expand Down
1 change: 1 addition & 0 deletions dlt/common/metrics.py
Original file line number Diff line number Diff line change
Expand Up @@ -91,6 +91,7 @@ class LoadJobMetrics(NamedTuple):
finished_at: datetime.datetime
state: Optional[str]
remote_url: Optional[str]
retry_count: Optional[int]


class LoadMetrics(StepMetrics):
Expand Down
21 changes: 17 additions & 4 deletions dlt/common/pipeline.py
Original file line number Diff line number Diff line change
Expand Up @@ -413,14 +413,27 @@ def _step_info_start_load_id(self, load_id: str) -> None:
self._current_load_started = precise_time()
self._load_id_metrics.setdefault(load_id, [])

def _step_info_complete_load_id(self, load_id: str, metrics: TStepMetrics) -> None:
def _step_info_update_metrics(
self, load_id: str, metrics: TStepMetrics, immutable: bool = False
) -> None:
metrics["started_at"] = ensure_pendulum_datetime_utc(self._current_load_started)
step_metrics = self._load_id_metrics[load_id]
if immutable or len(step_metrics) == 0:
step_metrics.append(metrics)
else:
step_metrics[0] = metrics

def _step_info_complete_load_id(self, load_id: str, finished: bool = True) -> None:
assert self._current_load_id == load_id, (
f"Current load id mismatch {self._current_load_id} != {load_id} when completing step"
" info"
)
metrics["started_at"] = ensure_pendulum_datetime_utc(self._current_load_started)
metrics["finished_at"] = ensure_pendulum_datetime_utc(precise_time())
self._load_id_metrics[load_id].append(metrics)
# metrics must be present
metrics = self._load_id_metrics[load_id][-1]
# update finished at
assert metrics["finished_at"] is None
if finished:
metrics["finished_at"] = ensure_pendulum_datetime_utc(precise_time())
self._current_load_id = None
self._current_load_started = None

Expand Down
86 changes: 47 additions & 39 deletions dlt/common/runtime/signals.py
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
import os
import sys
import threading
import signal
Expand All @@ -6,19 +7,20 @@
from types import FrameType
from typing import Any, Callable, Dict, Iterator, Optional, Union

from dlt.common import logger
from dlt.common.exceptions import SignalReceivedException

_received_signal: int = 0
exit_event = Event()
_signal_counts: Dict[int, int] = {}
_original_handlers: Dict[int, Union[int, Callable[[int, Optional[FrameType]], Any]]] = {}

# NOTE: do not use logger and print in signal handlers

def signal_receiver(sig: int, frame: FrameType) -> None:

def _signal_receiver(sig: int, frame: FrameType) -> None:
"""Handle POSIX signals with two-stage escalation.

This handler is installed by delayed_signals(). On the first occurrence of a
This handler is installed by intercepted_signals(). On the first occurrence of a
supported signal (eg. SIGINT, SIGTERM) it requests a graceful shutdown by
setting a process-wide flag and waking sleeping threads via exit_event.
A second occurrence of the same signal escalates by delegating to the
Expand All @@ -34,49 +36,61 @@ def signal_receiver(sig: int, frame: FrameType) -> None:
Worker threads must cooperatively observe shutdown via raise_if_signalled()
or the signal-aware sleep().
"""
global _received_signal

# track how many times this signal type has been received
_signal_counts[sig] = _signal_counts.get(sig, 0) + 1

if _signal_counts[sig] == 1:
# first signal of this type: set flag and wake threads
_received_signal = sig
if sig == signal.SIGINT:
sig_desc = "CTRL-C"
else:
sig_desc = f"Signal {sig}"
msg = (
f"{sig_desc} received. Trying to shut down gracefully. It may take time to drain job"
f" pools. Send {sig_desc} again to force stop."
)
set_received_signal(sig)
if sys.stdin.isatty():
# log to console
sys.stderr.write(msg)
sys.stderr.flush()
else:
logger.warning(msg)
# log to console using low level functions that are safe for signal handlers
if sig == signal.SIGINT:
sig_desc = "CTRL-C"
else:
sig_desc = f"Signal {sig}"
msg = (
f"{sig_desc} received. Trying to shut down gracefully. It may take time to drain"
f" job pools. Send {sig_desc} again to force stop."
)
try:
os.write(sys.stderr.fileno(), msg.encode(encoding="utf-8"))
except OSError:
pass
elif _signal_counts[sig] >= 2:
# Second signal of this type: call original handler
logger.debug(f"Second signal {sig} received, calling default handler")
# second signal of this type: call original handler
original_handler = _original_handlers.get(sig, signal.SIG_DFL)
if callable(original_handler):
original_handler(sig, frame)
elif original_handler == signal.SIG_DFL:
# Restore default and re-raise to trigger default behavior
# restore default and re-raise to trigger default behavior
signal.signal(sig, signal.SIG_DFL)
signal.raise_signal(sig)

exit_event.set()
logger.debug("Sleeping threads signalled")


def _clear_signals() -> None:
global _received_signal

_received_signal = 0
_signal_counts.clear()
_original_handlers.clear()


def set_received_signal(sig: int) -> None:
"""Called when signal was received"""
global _received_signal

_received_signal = sig


def raise_if_signalled() -> None:
if _received_signal:
"""Raises `SignalReceivedException` if signal was received."""
if was_signal_received():
raise SignalReceivedException(_received_signal)


def signal_received() -> bool:
def was_signal_received() -> bool:
"""check if a signal was received"""
return True if _received_signal else False

Expand All @@ -93,18 +107,10 @@ def wake_all() -> None:
exit_event.set()


def _clear_signals() -> None:
global _received_signal

_received_signal = 0
_signal_counts.clear()
_original_handlers.clear()


@contextmanager
def delayed_signals() -> Iterator[None]:
"""Will delay signalling until `raise_if_signalled` is explicitly used or when
a second signal with the same int value arrives.
def intercepted_signals() -> Iterator[None]:
"""Will intercept SIGINT and SIGTERM and will delay calling signal handlers until
`raise_if_signalled` is explicitly used or when a second signal with the same int value arrives.

A no-op when not called on main thread.

Expand All @@ -115,7 +121,7 @@ def delayed_signals() -> Iterator[None]:
# check if handlers are already installed (nested call)
current_sigint_handler = signal.getsignal(signal.SIGINT)

if current_sigint_handler is signal_receiver:
if current_sigint_handler is _signal_receiver:
# already installed, this is a nested call - just yield
yield
return
Expand All @@ -129,14 +135,16 @@ def delayed_signals() -> Iterator[None]:
_original_handlers[signal.SIGTERM] = original_sigterm_handler

try:
signal.signal(signal.SIGINT, signal_receiver)
signal.signal(signal.SIGTERM, signal_receiver)
signal.signal(signal.SIGINT, _signal_receiver)
signal.signal(signal.SIGTERM, _signal_receiver)
yield
finally:
signal.signal(signal.SIGINT, original_sigint_handler)
signal.signal(signal.SIGTERM, original_sigterm_handler)
_clear_signals()

else:
from dlt.common import logger

logger.info("Running in daemon thread, signals not enabled")
yield
3 changes: 1 addition & 2 deletions dlt/destinations/impl/clickhouse/clickhouse.py
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,6 @@
from dlt.common.destination.client import (
PreparedTableSchema,
SupportsStagingDestination,
TLoadJobState,
HasFollowupJobs,
RunnableLoadJob,
FollowupJobRequest,
Expand Down Expand Up @@ -53,7 +52,7 @@
SqlJobClientBase,
SqlJobClientWithStagingDataset,
)
from dlt.destinations.job_impl import ReferenceFollowupJobRequest, FinalizedLoadJobWithFollowupJobs
from dlt.destinations.job_impl import ReferenceFollowupJobRequest
from dlt.destinations.sql_client import SqlClientBase
from dlt.destinations.sql_jobs import SqlMergeFollowupJob
from dlt.destinations.utils import get_deterministic_temp_table_name
Expand Down
4 changes: 1 addition & 3 deletions dlt/destinations/impl/dremio/dremio.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,16 +5,14 @@
from dlt.common.destination.client import (
HasFollowupJobs,
PreparedTableSchema,
TLoadJobState,
RunnableLoadJob,
SupportsStagingDestination,
FollowupJobRequest,
LoadJob,
)
from dlt.common.schema import TColumnSchema, Schema
from dlt.common.schema.typing import TColumnType, TTableFormat
from dlt.common.schema.typing import TColumnType
from dlt.common.storages.file_storage import FileStorage
from dlt.common.utils import uniq_id
from dlt.destinations.exceptions import LoadJobTerminalException
from dlt.destinations.impl.dremio.configuration import DremioClientConfiguration
from dlt.destinations.impl.dremio.sql_client import DremioSqlClient
Expand Down
3 changes: 3 additions & 0 deletions dlt/destinations/impl/ducklake/ducklake.py
Original file line number Diff line number Diff line change
Expand Up @@ -75,6 +75,9 @@ def __init__(self, file_path: str) -> None:
def metrics(self) -> Optional[LoadJobMetrics]:
"""Generate remote url metrics which point to the table in storage"""
m = super().metrics()
# job client not available before run_managed called
if not self._job_client:
return m
# TODO: read location from catalog. ducklake supports customized table layouts
return m._replace(
remote_url=str(
Expand Down
3 changes: 3 additions & 0 deletions dlt/destinations/impl/filesystem/filesystem.py
Original file line number Diff line number Diff line change
Expand Up @@ -146,6 +146,9 @@ def make_remote_url(self) -> str:

def metrics(self) -> Optional[LoadJobMetrics]:
m = super().metrics()
# job client not available before run_managed called
if not self._job_client:
return m
return m._replace(remote_url=self.make_remote_url())


Expand Down
4 changes: 0 additions & 4 deletions dlt/destinations/impl/qdrant/qdrant_job_client.py
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,6 @@
from dlt.common.destination import DestinationCapabilitiesContext
from dlt.common.destination.client import (
PreparedTableSchema,
TLoadJobState,
RunnableLoadJob,
JobClientBase,
WithStateSync,
Expand All @@ -25,11 +24,8 @@
from dlt.common.destination.exceptions import DestinationUndefinedEntity

from dlt.common.storages import FileStorage
from dlt.common.time import precise_time

from dlt.destinations.job_impl import FinalizedLoadJobWithFollowupJobs
from dlt.destinations.job_client_impl import StorageSchemaInfo, StateInfo

from dlt.destinations.utils import get_pipeline_state_query_columns
from dlt.destinations.impl.qdrant.configuration import QdrantClientConfiguration
from dlt.destinations.impl.qdrant.qdrant_adapter import VECTORIZE_HINT
Expand Down
1 change: 0 additions & 1 deletion dlt/destinations/impl/weaviate/weaviate_client.py
Original file line number Diff line number Diff line change
Expand Up @@ -45,7 +45,6 @@
from dlt.common.destination import DestinationCapabilitiesContext
from dlt.common.destination.client import (
PreparedTableSchema,
TLoadJobState,
RunnableLoadJob,
JobClientBase,
WithStateSync,
Expand Down
Loading