Description
We have several unique implementations of a common run_sql
task across our DAGs. This task was pulled out into common/sql.py
in #4836:
|
@task |
|
def run_sql( |
|
sql_template: str, |
|
postgres_conn_id: str = POSTGRES_CONN_ID, |
|
task: AbstractOperator = None, |
|
timeout: float = None, |
|
handler: callable = RETURN_ROW_COUNT, |
|
**kwargs, |
|
): |
|
""" |
|
Run an SQL query with the given template and parameters. Any kwargs handed |
|
into the function outside of those defined will be passed into the template |
|
`.format` call. |
|
""" |
|
query = sql_template.format(**kwargs) |
|
|
|
postgres = PostgresHook( |
|
postgres_conn_id=postgres_conn_id, |
|
default_statement_timeout=( |
|
timeout if timeout else PostgresHook.get_execution_timeout(task) |
|
), |
|
) |
|
|
|
return postgres.run(query, handler=handler) |
We have several DAGs which can now use this run_sql
function directly, rather than re-implementing their own:
|
def run_sql( |
|
sql_template: str, |
|
postgres_conn_id: str = POSTGRES_CONN_ID, |
|
task: AbstractOperator = None, |
|
timeout: timedelta = None, |
|
handler: callable = RETURN_ROW_COUNT, |
|
**kwargs, |
|
): |
|
query = sql_template.format(**kwargs) |
|
|
|
postgres = PostgresHook( |
|
postgres_conn_id=postgres_conn_id, |
|
default_statement_timeout=( |
|
timeout if timeout else PostgresHook.get_execution_timeout(task) |
|
), |
|
) |
|
|
|
return postgres.run(query, handler=handler) |
batched_update
(this one may require some additional work on either the base function or the call to accommodate the dry_run
variable):
|
def run_sql( |
|
dry_run: bool, |
|
sql_template: str, |
|
query_id: str, |
|
log_sql: bool = True, |
|
postgres_conn_id: str = POSTGRES_CONN_ID, |
|
task: AbstractOperator = None, |
|
timeout: timedelta = None, |
|
handler: callable = RETURN_ROW_COUNT, |
|
**kwargs, |
|
): |
|
query = sql_template.format( |
|
temp_table_name=constants.TEMP_TABLE_NAME.format(query_id=query_id), **kwargs |
|
) |
|
if dry_run: |
|
logger.info( |
|
"This is a dry run: no SQL will be executed. To perform the updates," |
|
" rerun the DAG with the conf option `'dry_run': false`." |
|
) |
|
logger.info(query) |
|
return 0 |
|
|
|
postgres = PostgresHook( |
|
postgres_conn_id=postgres_conn_id, |
|
default_statement_timeout=( |
|
timeout if timeout else PostgresHook.get_execution_timeout(task) |
|
), |
|
log_sql=log_sql, |
|
) |
|
|
|
return postgres.run(query, handler=handler) |
|
def run_sql( |
|
sql: str, |
|
log_sql: bool = True, |
|
method: str = "get_records", |
|
handler: callable = None, |
|
autocommit: bool = False, |
|
postgres_conn_id: str = POSTGRES_CONN_ID, |
|
dag_task: AbstractOperator = None, |
|
): |
|
postgres = PostgresHook( |
|
postgres_conn_id=postgres_conn_id, |
|
default_statement_timeout=PostgresHook.get_execution_timeout(dag_task), |
|
log_sql=log_sql, |
|
) |
|
if method == "get_records": |
|
return postgres.get_records(sql) |
|
elif method == "get_first": |
|
return postgres.get_first(sql) |
|
else: |
|
return postgres.run(sql, autocommit=autocommit, handler=handler) |
Additional context
This also came up in the discussion of #4572