forked from galaxyproject/galaxy
-
Notifications
You must be signed in to change notification settings - Fork 0
Commit
This commit does not belong to any branch on this repository, and may belong to a fork outside of the repository.
- Loading branch information
Showing
5 changed files
with
909 additions
and
0 deletions.
There are no files selected for viewing
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,203 @@ | ||
# Identify a set of unused histories (by whichever process is fastest) | ||
# NEW: Mark those histories as deleted and purged to render them unusable | ||
# Delete association tables we don't care about (currently, all the delete statements in the draft script) | ||
# Update galaxy_session (set history_id to null for affected rows) | ||
# Delete histories | ||
|
||
# TODO: add step 1 on n to logs (n should be correct) | ||
|
||
""" | ||
args: | ||
- max history updated date (recommended not less than a month?) | ||
- history batch size | ||
""" | ||
|
||
import os | ||
import sys | ||
|
||
sys.path.insert(1, os.path.abspath(os.path.join(os.path.dirname(__file__), os.pardir, "lib"))) | ||
|
||
import logging # TODO do we need more setup? | ||
|
||
from sqlalchemy import text, create_engine | ||
|
||
from galaxy.model.orm.scripts import get_config | ||
|
||
TMP_TABLE = "tmp_unused_history" | ||
|
||
ASSOC_TABLES = ( | ||
"event", | ||
"history_audit", | ||
"history_tag_association", | ||
"history_annotation_association", | ||
"history_rating_association", | ||
"history_user_share_association", | ||
"default_history_permissions", | ||
"data_manager_history_association", | ||
"cleanup_event_history_association", | ||
"galaxy_session_to_history", | ||
) | ||
|
||
EXCLUDED_ASSOC_TABLES = ( | ||
"job_import_history_archive", | ||
"job_export_history_archive", | ||
"workflow_invocation", | ||
"history_dataset_collection_association", | ||
"job", | ||
"history_dataset_association", | ||
) | ||
|
||
|
||
class HistoryTablePruner: | ||
""" Removes unused histories (user is null, hid == 1). """ | ||
|
||
def __init__(self): | ||
# TODO add option to pass min id and max id | ||
self.max_update_time = "01-01-2025" # TODO read from args | ||
self.batch_size = 3 # TODO read from args | ||
self.engine = self._create_engine() | ||
self.min_id, self.max_id = self._get_min_max_ids() | ||
|
||
def run(self): | ||
""" | ||
Due to the very large size of some tables, we run operations in batches, using low/high history id as boundaries. | ||
""" | ||
low = self.min_id | ||
high = min(self.max_id, low + self.batch_size) | ||
while low <= self.max_id: | ||
self._run_batch(low, high) | ||
low = high | ||
high = high + self.batch_size | ||
|
||
def _run_batch(self, low, high): | ||
self._mark_histories_as_deleted_and_purged(low, high) | ||
histories = self._get_histories(low, high) | ||
exclude = self._get_histories_to_exclude(low, high) | ||
|
||
# Calculate set of histories to delete. | ||
to_delete = set(histories) - exclude | ||
if not to_delete: | ||
logging.info(f"No histories to delete in the id range {low} - {high}") | ||
return | ||
|
||
self._create_tmp_table() | ||
try: | ||
self._populate_tmp_table(to_delete) | ||
self._delete_associations() | ||
self._set_references_to_null() | ||
self._delete_histories() | ||
except Exception as e: | ||
raise e | ||
finally: | ||
self._drop_tmp_table() | ||
|
||
def _get_min_max_ids(self): | ||
stmt = text(f"SELECT min(id), max(id) FROM history") | ||
with self.engine.begin() as conn: | ||
minmax = conn.execute(stmt).all() | ||
return minmax[0][0], minmax[0][1] | ||
|
||
def _create_engine(self): | ||
db_url = get_config(sys.argv)["db_url"] | ||
return create_engine(db_url) | ||
|
||
def _mark_histories_as_deleted_and_purged(self, low, high): | ||
""" Mark target histories as deleted and purged to prevent their further usage.""" | ||
logging.info(f"STEP 1 OF 10: Marking histories {low}-{high} as deleted and purged") | ||
stmt = text(f""" | ||
UPDATE history | ||
SET deleted = TRUE, purged = TRUE | ||
WHERE user_id IS NULL AND hid_counter = 1 AND update_time < :update_time AND id >= :low AND id < :high | ||
""") | ||
params = { | ||
"update_time": self.max_update_time, | ||
"low": low, | ||
"high": high, | ||
} | ||
with self.engine.begin() as conn: | ||
return conn.execute(stmt, params) | ||
|
||
def _get_histories(self, low, high): | ||
""" Return ids of histories to delete.""" | ||
logging.info(f"STEP 2 OF 10: Collecting history ids between {low}-{high}") | ||
stmt = text("SELECT id FROM history WHERE user_id IS NULL AND hid_counter = 1 AND update_time < :update_time AND id >= :low AND id < :high") | ||
params = { | ||
"update_time": self.max_update_time, | ||
"low": low, | ||
"high": high, | ||
} | ||
with self.engine.begin() as conn: | ||
return conn.scalars(stmt, params).all() | ||
|
||
def _get_histories_to_exclude(self, low, high): | ||
""" Retrieve histories that should NOT be deleted due to existence of associated records that should be preserved.""" | ||
logging.info(f"STEP 3 OF 10: Collecting ids of histories to exclude based on {len(EXCLUDED_ASSOC_TABLES)} associated tables:") | ||
statements = [] | ||
for table in EXCLUDED_ASSOC_TABLES: | ||
statements.append((table, text(f"SELECT history_id FROM {table} WHERE history_id >= :low AND id < :high"))) | ||
|
||
params = { | ||
"low": low, | ||
"high": high, | ||
} | ||
|
||
ids = [] | ||
with self.engine.begin() as conn: # TODO or maybe 1 transaction per table? | ||
for table, stmt in statements: | ||
logging.info(f"\tCollecting history_id from {table}") | ||
ids += conn.scalars(stmt, params).all() | ||
|
||
excluded = set(ids) | ||
if None in excluded: | ||
excluded.remove(None) | ||
return excluded | ||
|
||
def _create_tmp_table(self): | ||
""" Create temporary table to hold history ids.""" | ||
stmt = text(f"CREATE TEMPORARY TABLE {TMP_TABLE} (id INT PRIMARY KEY)") | ||
with self.engine.begin() as conn: | ||
conn.execute(stmt) | ||
|
||
def _drop_tmp_table(self): | ||
stmt = text(f"CREATE TEMPORARY TABLE {TMP_TABLE} (id INT PRIMARY KEY)") | ||
stmt = text(f"DROP TABLE {TMP_TABLE}") | ||
with self.engine.begin() as conn: | ||
conn.execute(stmt) | ||
|
||
def _populate_tmp_table(self, to_delete): | ||
""" Load ids of histories to delete into temporary table.""" | ||
assert to_delete | ||
logging.info("STEP 4 OF 10: Populating temporary table") | ||
sql_values = ",".join([f"({id})" for id in to_delete]) | ||
stmt = text(f"INSERT INTO {TMP_TABLE} VALUES {sql_values}") | ||
with self.engine.begin() as conn: | ||
conn.execute(stmt) | ||
|
||
def _delete_associations(self): | ||
""" Delete records associated with histories to be deleted.""" | ||
logging.info("STEP 5 OF 10: Deleting associated records from ...") | ||
|
||
for table in ASSOC_TABLES: | ||
stmt = text(f"DELETE FROM {table} WHERE history_id IN (SELECT id FROM {TMP_TABLE})") | ||
with self.engine.begin() as conn: | ||
conn.execute(stmt) | ||
|
||
def _set_references_to_null(self): | ||
""" Set history_id to null in galaxy_session table for records referring to histories to be deleted.""" | ||
logging.info("STEP 6 OF 10: Set history_id to null in galaxy_session") | ||
stmt = text("UPDATE galaxy_session SET current_history_id = NULL WHERE current_history_id IN (SELECT id FROM {TMP_TABLE})") | ||
with self.engine.begin() as conn: | ||
conn.execute(stmt) | ||
|
||
def _delete_histories(self): | ||
""" Last step: delete histories that are safe to delete.""" | ||
logging.info("STEP 7 OF 10: Delete histories in the id range {low} - {high}") | ||
stmt = text("DELETE FROM history WHERE id IN (SELECT id FROM {TMP_TABLE})") | ||
with self.engine.begin() as conn: | ||
conn.execute(stmt) | ||
|
||
|
||
if __name__ == "__main__": | ||
htp = HistoryTablePruner() | ||
htp.run() |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,15 @@ | ||
#!/bin/sh | ||
|
||
####### | ||
# TODO add description | ||
####### | ||
|
||
|
||
cd "$(dirname "$0")" || exit | ||
|
||
cd .. | ||
|
||
# TODO add params | ||
# TODO add help | ||
python ./scripts/cleanup_histories.py | ||
#python ./scripts/cleanup_histories.py >> ./scripts/cleanup_histories.log |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,24 @@ | ||
from collections import ( | ||
Counter, | ||
namedtuple, | ||
) | ||
|
||
PRIVATE_OBJECT_STORE_ID = "my_private_data" | ||
|
||
MockTransaction = namedtuple("MockTransaction", "user") | ||
|
||
|
||
class MockObjectStore: | ||
|
||
def is_private(self, object): | ||
if object.object_store_id == PRIVATE_OBJECT_STORE_ID: | ||
return True | ||
else: | ||
return False | ||
|
||
|
||
def verify_items(items, expected_items): | ||
""" | ||
Assert that items and expected_items contain the same elements. | ||
""" | ||
assert Counter(items) == Counter(expected_items) |
Oops, something went wrong.