Skip to content
This repository has been archived by the owner on Nov 3, 2023. It is now read-only.

Commit

Permalink
chore(datasets): Refactor DatasetDAO update to leverage bulk logic fo…
Browse files Browse the repository at this point in the history
…r create, update, and delete operations (apache#22957)
  • Loading branch information
john-bodley authored Feb 13, 2023
1 parent 4980621 commit f3bdcdc
Showing 1 changed file with 68 additions and 47 deletions.
115 changes: 68 additions & 47 deletions superset/datasets/dao.py
Original file line number Diff line number Diff line change
Expand Up @@ -187,39 +187,50 @@ def update_columns(
then we delete.
"""

column_by_id = {column.id: column for column in model.columns}
seen = set()
original_cols = {obj.id for obj in model.columns}

if override_columns:
for id_ in original_cols:
DatasetDAO.delete_column(column_by_id[id_], commit=False)
db.session.query(TableColumn).filter(
TableColumn.table_id == model.id
).delete(synchronize_session="fetch")

db.session.flush()
db.session.bulk_insert_mappings(
TableColumn,
[
{**properties, "table_id": model.id}
for properties in property_columns
],
)
else:
columns_by_id = {column.id: column for column in model.columns}

property_columns_by_id = {
properties["id"]: properties
for properties in property_columns
if "id" in properties
}

db.session.bulk_insert_mappings(
TableColumn,
[
{**properties, "table_id": model.id}
for properties in property_columns
if not "id" in properties
],
)

db.session.bulk_update_mappings(
TableColumn,
[
{**columns_by_id[properties["id"]].__dict__, **properties}
for properties in property_columns_by_id.values()
],
)

for properties in property_columns:
DatasetDAO.create_column(
{**properties, "table_id": model.id},
commit=False,
db.session.query(TableColumn).filter(
TableColumn.id.in_(
{column.id for column in model.columns}
- property_columns_by_id.keys()
)
else:
for properties in property_columns:
if "id" in properties:
seen.add(properties["id"])

DatasetDAO.update_column(
column_by_id[properties["id"]],
properties,
commit=False,
)
else:
DatasetDAO.create_column(
{**properties, "table_id": model.id},
commit=False,
)

for id_ in {obj.id for obj in model.columns} - seen:
DatasetDAO.delete_column(column_by_id[id_], commit=False)
).delete(synchronize_session="fetch")

if commit:
db.session.commit()
Expand All @@ -241,26 +252,36 @@ def update_metrics(
then we delete.
"""

metric_by_id = {metric.id: metric for metric in model.metrics}
seen = set()

for properties in property_metrics:
if "id" in properties:
seen.add(properties["id"])
metrics_by_id = {metric.id: metric for metric in model.metrics}

property_metrics_by_id = {
properties["id"]: properties
for properties in property_metrics
if "id" in properties
}

db.session.bulk_insert_mappings(
SqlMetric,
[
{**properties, "table_id": model.id}
for properties in property_metrics
if not "id" in properties
],
)

DatasetDAO.update_metric(
metric_by_id[properties["id"]],
properties,
commit=False,
)
else:
DatasetDAO.create_metric(
{**properties, "table_id": model.id},
commit=False,
)
db.session.bulk_update_mappings(
SqlMetric,
[
{**metrics_by_id[properties["id"]].__dict__, **properties}
for properties in property_metrics_by_id.values()
],
)

for id_ in {obj.id for obj in model.metrics} - seen:
DatasetDAO.delete_column(metric_by_id[id_], commit=False)
db.session.query(SqlMetric).filter(
SqlMetric.id.in_(
{metric.id for metric in model.metrics} - property_metrics_by_id.keys()
)
).delete(synchronize_session="fetch")

if commit:
db.session.commit()
Expand Down

0 comments on commit f3bdcdc

Please sign in to comment.