Skip to content

Commit

Permalink
Merge pull request #16380 from mvdbeek/object_store_fix_quota_label
Browse files Browse the repository at this point in the history
[23.1] Fix disk usage recalculation for distributed object stores
  • Loading branch information
mvdbeek authored Jul 7, 2023
2 parents c189230 + 53dc916 commit de40c46
Show file tree
Hide file tree
Showing 2 changed files with 129 additions and 28 deletions.
35 changes: 22 additions & 13 deletions lib/galaxy/model/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -566,9 +566,7 @@ def stderr(self, stderr):
LEFT OUTER JOIN library_dataset_dataset_association ON dataset.id = library_dataset_dataset_association.dataset_id
WHERE dataset.id IN (SELECT dataset_id FROM per_hist_hdas)
AND library_dataset_dataset_association.id IS NULL
AND (
{dataset_condition}
)
{and_dataset_condition}
"""


Expand All @@ -577,15 +575,17 @@ def calculate_user_disk_usage_statements(user_id, quota_source_map, for_sqlite=F
statements = []
default_quota_enabled = quota_source_map.default_quota_enabled
default_exclude_ids = quota_source_map.default_usage_excluded_ids()
default_cond = "dataset.object_store_id IS NULL" if default_quota_enabled else ""
default_cond = "dataset.object_store_id IS NULL" if default_quota_enabled and default_exclude_ids else ""
exclude_cond = "dataset.object_store_id NOT IN :exclude_object_store_ids" if default_exclude_ids else ""
use_or = " OR " if (default_cond != "" and exclude_cond != "") else ""
default_usage_dataset_condition = "{default_cond} {use_or} {exclude_cond}".format(
default_cond=default_cond,
exclude_cond=exclude_cond,
use_or=use_or,
)
default_usage = UNIQUE_DATASET_USER_USAGE.format(dataset_condition=default_usage_dataset_condition)
if default_usage_dataset_condition.strip():
default_usage_dataset_condition = f"AND ( {default_usage_dataset_condition} )"
default_usage = UNIQUE_DATASET_USER_USAGE.format(and_dataset_condition=default_usage_dataset_condition)
default_usage = (
"""
UPDATE galaxy_user SET disk_usage = (%s)
Expand All @@ -602,7 +602,7 @@ def calculate_user_disk_usage_statements(user_id, quota_source_map, for_sqlite=F
# the object_store_id to quota_source_label into a temp table of values
for quota_source_label, object_store_ids in source.items():
label_usage = UNIQUE_DATASET_USER_USAGE.format(
dataset_condition="dataset.object_store_id IN :include_object_store_ids"
and_dataset_condition="AND ( dataset.object_store_id IN :include_object_store_ids )"
)
if for_sqlite:
# hacky alternative for older sqlite
Expand All @@ -622,7 +622,7 @@ def calculate_user_disk_usage_statements(user_id, quota_source_map, for_sqlite=F
else:
statement = """
INSERT INTO user_quota_source_usage(user_id, quota_source_label, disk_usage)
VALUES(:user_id, :label, ({label_usage}))
VALUES(:id, :label, ({label_usage}))
ON CONFLICT
ON constraint uqsu_unique_label_per_user
DO UPDATE SET disk_usage = excluded.disk_usage
Expand Down Expand Up @@ -997,16 +997,25 @@ def calculate_disk_usage_default_source(self, object_store):
assert object_store is not None
quota_source_map = object_store.get_quota_source_map()
default_quota_enabled = quota_source_map.default_quota_enabled
default_cond = "dataset.object_store_id IS NULL OR" if default_quota_enabled else ""
exclude_objectstore_ids = quota_source_map.default_usage_excluded_ids()
default_cond = "dataset.object_store_id IS NULL OR" if default_quota_enabled and exclude_objectstore_ids else ""
default_usage_dataset_condition = (
"{default_cond} dataset.object_store_id NOT IN :exclude_object_store_ids".format(
default_cond=default_cond,
(
"AND ( {default_cond} dataset.object_store_id NOT IN :exclude_object_store_ids )".format(
default_cond=default_cond,
)
)
if exclude_objectstore_ids
else ""
)
default_usage = UNIQUE_DATASET_USER_USAGE.format(dataset_condition=default_usage_dataset_condition)
default_usage = UNIQUE_DATASET_USER_USAGE.format(and_dataset_condition=default_usage_dataset_condition)
sql_calc = text(default_usage)
sql_calc = sql_calc.bindparams(bindparam("id"), bindparam("exclude_object_store_ids", expanding=True))
params = {"id": self.id, "exclude_object_store_ids": quota_source_map.default_usage_excluded_ids()}
params = {"id": self.id}
bindparams = [bindparam("id")]
if exclude_objectstore_ids:
params["exclude_object_store_ids"] = exclude_objectstore_ids
bindparams.append(bindparam("exclude_object_store_ids", expanding=True))
sql_calc = sql_calc.bindparams(*bindparams)
sa_session = object_session(self)
usage = sa_session.scalar(sql_calc, params)
return usage
Expand Down
122 changes: 107 additions & 15 deletions test/integration/test_recalculate_user_disk_usage.py
Original file line number Diff line number Diff line change
@@ -1,19 +1,35 @@
import string
import time

from galaxy_test.base.populators import DatasetPopulator
from galaxy_test.driver.integration_util import IntegrationTestCase
from .objectstore._base import BaseObjectStoreIntegrationTestCase
from .objectstore.test_selection_with_resource_parameters import DISTRIBUTED_OBJECT_STORE_CONFIG_TEMPLATE

SIMPLE_DISTRIBUTED_OBJECT_STORE_CONFIG_TEMPLATE = string.Template(
"""<?xml version="1.0"?>
<object_store type="distributed" id="primary" order="0">
<backends>
<backend id="default" type="disk" weight="1" name="Default Store">
<description>This is my description of the default store with *markdown*.</description>
<files_dir path="${temp_directory}/files_default"/>
<extra_dir type="temp" path="${temp_directory}/tmp_default"/>
<extra_dir type="job_work" path="${temp_directory}/job_working_directory_default"/>
</backend>
<backend id="static" type="disk" weight="0">
<files_dir path="${temp_directory}/files_static"/>
<extra_dir type="temp" path="${temp_directory}/tmp_static"/>
<extra_dir type="job_work" path="${temp_directory}/job_working_directory_static"/>
</backend>
</backends>
</object_store>
"""
)

class TestRecalculateUserDiskUsageIntegration(IntegrationTestCase):
task_based = True
dataset_populator: DatasetPopulator

def setUp(self):
super().setUp()
self.dataset_populator = DatasetPopulator(self.galaxy_interactor)

@classmethod
def handle_galaxy_config_kwds(cls, config):
super().handle_galaxy_config_kwds(config)
config["allow_user_dataset_purge"] = True
class RecalculateDiskUsage:
task_based: bool
dataset_populator: DatasetPopulator

def test_recalculate_user_disk_usage(self):
# The initial disk usage is 0
Expand All @@ -26,17 +42,93 @@ def test_recalculate_user_disk_usage(self):
# The usage should be the total of the datasets
current_usage = self.dataset_populator.get_usage_for(None)
assert current_usage["total_disk_usage"] == expected_usage

self.recalculate_disk_usage()
# The disk usage should still be the expected usage
current_usage = self.dataset_populator.get_usage_for(None)
assert current_usage["total_disk_usage"] == expected_usage

self.dataset_populator.delete_dataset(history_id, hda_id, purge=True, wait_for_purge=True)

# Purging that dataset should result in usage dropping back
# down to zero.
current_usage = self.dataset_populator.get_usage_for(None)
assert current_usage["total_disk_usage"] == 0

recalculate_response = self._put("users/current/recalculate_disk_usage")
task_ok = self.dataset_populator.wait_on_task(recalculate_response)
assert task_ok

self.recalculate_disk_usage()
# The disk usage should be 0 again
current_usage = self.dataset_populator.get_usage_for(None)
assert current_usage["total_disk_usage"] == 0

def recalculate_disk_usage(self):
recalculate_response = self.dataset_populator._put("users/current/recalculate_disk_usage")
if self.task_based:
task_ok = self.dataset_populator.wait_on_task(recalculate_response)
assert task_ok
else:
time.sleep(2)


class TestRecalculateUserDiskUsageIntegration(IntegrationTestCase, RecalculateDiskUsage):
task_based = True
dataset_populator: DatasetPopulator

def setUp(self):
super().setUp()
self.dataset_populator = DatasetPopulator(self.galaxy_interactor)

@classmethod
def handle_galaxy_config_kwds(cls, config):
super().handle_galaxy_config_kwds(config)
config["allow_user_dataset_purge"] = True


class TestRecalculateUserDiskUsageHierarchicalIntegration(BaseObjectStoreIntegrationTestCase, RecalculateDiskUsage):
task_based = True
dataset_populator: DatasetPopulator

def setUp(self):
super().setUp()
self.dataset_populator = DatasetPopulator(self.galaxy_interactor)

@classmethod
def handle_galaxy_config_kwds(cls, config):
cls._configure_object_store(DISTRIBUTED_OBJECT_STORE_CONFIG_TEMPLATE, config)
super().handle_galaxy_config_kwds(config)
config["allow_user_dataset_purge"] = True


class TestRecalculateUserDiskUsageSimpleHierarchicalIntegration(
BaseObjectStoreIntegrationTestCase, RecalculateDiskUsage
):
task_based = True
dataset_populator: DatasetPopulator

def setUp(self):
super().setUp()
self.dataset_populator = DatasetPopulator(self.galaxy_interactor)

@classmethod
def handle_galaxy_config_kwds(cls, config):
cls._configure_object_store(SIMPLE_DISTRIBUTED_OBJECT_STORE_CONFIG_TEMPLATE, config)
super().handle_galaxy_config_kwds(config)
config["allow_user_dataset_purge"] = True


class TestRecalculateUserDiskUsageHierarchicalNoTaskIntegration(
BaseObjectStoreIntegrationTestCase, RecalculateDiskUsage
):
task_based = False
dataset_populator: DatasetPopulator

def setUp(self):
super().setUp()
self.dataset_populator = DatasetPopulator(self.galaxy_interactor)

@classmethod
def handle_galaxy_config_kwds(cls, config):
cls._configure_object_store(DISTRIBUTED_OBJECT_STORE_CONFIG_TEMPLATE, config)
super().handle_galaxy_config_kwds(config)
config["allow_user_dataset_purge"] = True
config["enable_celery_tasks"] = False
config["metadata_strategy"] = "extended"

0 comments on commit de40c46

Please sign in to comment.