- 
                Notifications
    You must be signed in to change notification settings 
- Fork 356
Description
Feature description
I would like the ability to configure the SQLAlchemy connection at a lower level before TableLoader._load_rows() begins reading data. Specifically, I need more flexibility than just exposing execution options (as done with execution_options). This includes the ability to execute raw SQL statements to set various PostgreSQL session-level configurations.
The current implementation exposes some options via execution_options, as shown below:
def _load_rows(self, query: SelectAny, backend_kwargs: Dict[str, Any]) -> TDataItem:
    with self.engine.connect() as conn:
        result = conn.execution_options(yield_per=self.chunk_size).execute(query)
        ...While this approach is helpful for setting execution options (like yield_per or isolation_level), it does not provide a mechanism to execute raw SQL statements at the connection level before data loading. The ability to influence the connection itself before query execution is essential for certain use cases, such as setting specific transaction isolation levels, adjusting session-specific parameters, or executing SET commands (e.g., SET TRANSACTION SNAPSHOT).
The feature would allow the use of session parameters essential for data retrieval tasks, such as:
- Transaction settings: default_transaction_isolation, to control the isolation level during read operations (e.g., REPEATABLE READorSERIALIZABLEfor consistency).
- Timeouts: Session timeouts like statement_timeoutto prevent long-running queries from consuming excessive resources.
- Client connection parameters: client_encodingto ensure correct encoding for reading non-ASCII data.
Are you a dlt user?
Yes, I'm already a dlt user.
Use case
I frequently need to configure transaction isolation and other session parameters when reading data from PostgreSQL. For instance, setting SET default_transaction_isolation = 'REPEATABLE READ' would allow maintaining consistent reads throughout a transaction. Additionally, statement_timeout can help mitigate long-running queries in environments with large datasets by capping execution time.
Proposed solution
Introducing an additional parameter that allows a callback function to be passed, which can modify the SQLAlchemy Connection object before the query is executed. For example, something like this:
from sqlalchemy.engine.base import Connection
TConnectionInitializer = Callable[[Connection], Connection]
conn_init_callback: Optional[TConnectionInitializer] = NoneThis conn_init_callback could then be added as an optional parameter to sql_database or sql_table so that users can apply custom connection-level settings before any data is read. It would be passed as part of the configuration and invoked within the connection context, allowing users to set session-specific behaviors dynamically.
Related issues
Currently, there are no related issues as I have already forked the source and implemented the required changes within my own codebase. However, now that the sql_database component has been integrated into the core library, it would be beneficial to avoid having to replicate the entire codebase just to introduce this one additional parameter.
Metadata
Metadata
Assignees
Labels
Type
Projects
Status