Skip to content

Add a TConnectionInitializer optional callback for sql_database and sql_table resources #1920

@neuromantik33

Description

@neuromantik33

Feature description

I would like the ability to configure the SQLAlchemy connection at a lower level before TableLoader._load_rows() begins reading data. Specifically, I need more flexibility than just exposing execution options (as done with execution_options). This includes the ability to execute raw SQL statements to set various PostgreSQL session-level configurations.

The current implementation exposes some options via execution_options, as shown below:

def _load_rows(self, query: SelectAny, backend_kwargs: Dict[str, Any]) -> TDataItem:
    with self.engine.connect() as conn:
        result = conn.execution_options(yield_per=self.chunk_size).execute(query)
        ...

While this approach is helpful for setting execution options (like yield_per or isolation_level), it does not provide a mechanism to execute raw SQL statements at the connection level before data loading. The ability to influence the connection itself before query execution is essential for certain use cases, such as setting specific transaction isolation levels, adjusting session-specific parameters, or executing SET commands (e.g., SET TRANSACTION SNAPSHOT).

The feature would allow the use of session parameters essential for data retrieval tasks, such as:

  • Transaction settings: default_transaction_isolation, to control the isolation level during read operations (e.g., REPEATABLE READ or SERIALIZABLE for consistency).
  • Timeouts: Session timeouts like statement_timeout to prevent long-running queries from consuming excessive resources.
  • Client connection parameters: client_encoding to ensure correct encoding for reading non-ASCII data.

Are you a dlt user?

Yes, I'm already a dlt user.

Use case

I frequently need to configure transaction isolation and other session parameters when reading data from PostgreSQL. For instance, setting SET default_transaction_isolation = 'REPEATABLE READ' would allow maintaining consistent reads throughout a transaction. Additionally, statement_timeout can help mitigate long-running queries in environments with large datasets by capping execution time.

Proposed solution

Introducing an additional parameter that allows a callback function to be passed, which can modify the SQLAlchemy Connection object before the query is executed. For example, something like this:

from sqlalchemy.engine.base import Connection
TConnectionInitializer = Callable[[Connection], Connection]
conn_init_callback: Optional[TConnectionInitializer] = None

This conn_init_callback could then be added as an optional parameter to sql_database or sql_table so that users can apply custom connection-level settings before any data is read. It would be passed as part of the configuration and invoked within the connection context, allowing users to set session-specific behaviors dynamically.

Related issues

Currently, there are no related issues as I have already forked the source and implemented the required changes within my own codebase. However, now that the sql_database component has been integrated into the core library, it would be beneficial to avoid having to replicate the entire codebase just to introduce this one additional parameter.

Metadata

Metadata

Assignees

Labels

No labels
No labels

Type

No type

Projects

Status

Done

Milestone

No milestone

Relationships

None yet

Development

No branches or pull requests

Issue actions