Skip to content
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
20 changes: 15 additions & 5 deletions airflow/utils/db_cleanup.py
Original file line number Diff line number Diff line change
Expand Up @@ -150,18 +150,28 @@ def _do_delete(*, query, orm_model, skip_archive, session):
timestamp_str = re.sub(r"[^\d]", "", datetime.utcnow().isoformat())[:14]
target_table_name = f"{ARCHIVE_TABLE_PREFIX}{orm_model.name}__{timestamp_str}"
print(f"Moving data to table {target_table_name}")
stmt = CreateTableAs(target_table_name, query.selectable)
logger.debug("ctas query:\n%s", stmt.compile())
session.execute(stmt)
bind = session.get_bind()
dialect_name = bind.dialect.name
if dialect_name == "mysql":
# MySQL with replication needs this split into two queries, so just do it for all MySQL
# ERROR 1786 (HY000): Statement violates GTID consistency: CREATE TABLE ... SELECT.
session.execute(f"CREATE TABLE {target_table_name} LIKE {orm_model.name}")
Copy link
Member

@potiuk potiuk Mar 12, 2023

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
session.execute(f"CREATE TABLE {target_table_name} LIKE {orm_model.name}")
# MySQL with replication needs this split in to two queries, so just do it for all MySQL
# ERROR 1786 (HY000): Statement violates GTID consistency: CREATE TABLE ... SELECT.
session.execute(f"CREATE TABLE {target_table_name} LIKE {orm_model.name}")

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks! I added the comment!

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Instead of writing custom SQL here, would it be better to enhance CreateTableAs compilation to support the MySQL syntax instead?

Copy link
Contributor Author

@tanvn tanvn Mar 13, 2023

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@uranusjr I would love to do so but I am new to sqlalchemy and at first I do not know how to let the compiler execute 2 queries instead of one?
(I just took a quick look at https://docs.sqlalchemy.org/en/20/core/compiler.html#dialect-specific-compilation-rules)

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hmm good point. It’s probably possible with some SQL but likely not worthwhile.

metadata = reflect_tables([target_table_name], session)
target_table = metadata.tables[target_table_name]
insert_stm = target_table.insert().from_select(target_table.c, query)
logger.debug("insert statement:\n%s", insert_stm.compile())
session.execute(insert_stm)
else:
stmt = CreateTableAs(target_table_name, query.selectable)
logger.debug("ctas query:\n%s", stmt.compile())
session.execute(stmt)
session.commit()

# delete the rows from the old table
metadata = reflect_tables([orm_model.name, target_table_name], session)
source_table = metadata.tables[orm_model.name]
target_table = metadata.tables[target_table_name]
logger.debug("rows moved; purging from %s", source_table.name)
bind = session.get_bind()
dialect_name = bind.dialect.name
if dialect_name == "sqlite":
pk_cols = source_table.primary_key.columns
delete = source_table.delete().where(
Expand Down