-
Notifications
You must be signed in to change notification settings - Fork 202
/
constants.py
43 lines (37 loc) · 1.29 KB
/
constants.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
from datetime import datetime, timedelta
DAG_ID = "batched_update"
AUTOMATED_DAG_ID = f"automated_{DAG_ID}"
START_DATE = datetime(2023, 5, 1)
SLACK_USERNAME = "Upstream Batched Update"
SLACK_ICON = ":database:"
DEFAULT_BATCH_SIZE = 10_000
DAGRUN_TIMEOUT = timedelta(days=31 * 3)
SELECT_TIMEOUT = timedelta(hours=24)
UPDATE_TIMEOUT = timedelta(days=30 * 3) # 3 months
# Task IDs used for branching operator
GET_EXPECTED_COUNT_TASK_ID = "get_expected_update_count"
CREATE_TEMP_TABLE_TASK_ID = "select_rows_to_update"
# Timeout for an individual batch, given in seconds
DEFAULT_UPDATE_BATCH_TIMEOUT = 60 * 60 # 1 hour
TEMP_TABLE_NAME = "{query_id}_rows_to_update"
CREATE_TEMP_TABLE_QUERY = """
CREATE TABLE {temp_table_name} AS
SELECT ROW_NUMBER() OVER() row_id, identifier
FROM {table_name}
{select_query};
"""
CREATE_TEMP_TABLE_INDEX_QUERY = "CREATE INDEX ON {temp_table_name}(row_id)"
SELECT_TEMP_TABLE_COUNT_QUERY = """
SELECT COUNT(*)
FROM {temp_table_name};
"""
UPDATE_BATCH_QUERY = """
UPDATE {table_name}
{update_query}
WHERE identifier in (
SELECT identifier FROM {temp_table_name}
WHERE row_id > {batch_start} AND row_id <= {batch_end}
FOR UPDATE SKIP LOCKED
);
"""
DROP_TABLE_QUERY = "DROP TABLE IF EXISTS {temp_table_name} CASCADE;"