Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions doc/changes/DM-52398.bugfix.md
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
Query generation will no longer throw `sqlalchemy.exc.ArgumentError` when attempting to use a dataset calibration timespan constraint for a dataset search without any calibration collections.
1 change: 1 addition & 0 deletions doc/changes/DM-52398.misc.md
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
`Butler.find_dataset()`/`Butler.registry.findDataset()` have been re-implemented using the same query framework backing `Butler.query()`. These methods now always return `DatasetRef` instances with `dataId.hasFull() = True`.
73 changes: 71 additions & 2 deletions python/lsst/daf/butler/_registry_shim.py
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,7 @@
from ._collection_type import CollectionType
from ._dataset_ref import DatasetId, DatasetIdGenEnum, DatasetRef
from ._dataset_type import DatasetType
from ._exceptions import CalibrationLookupError
from ._storage_class import StorageClassFactory
from ._timespan import Timespan
from .dimensions import (
Expand All @@ -48,7 +49,9 @@
)
from .registry._collection_summary import CollectionSummary
from .registry._defaults import RegistryDefaults
from .registry._exceptions import NoDefaultCollectionError
from .registry._registry_base import RegistryBase
from .registry.queries._query_common import resolve_collections

if TYPE_CHECKING:
from .direct_butler import DirectButler
Expand Down Expand Up @@ -182,13 +185,79 @@ def findDataset(
*,
collections: CollectionArgType | None = None,
timespan: Timespan | None = None,
datastore_records: bool = False,
**kwargs: Any,
) -> DatasetRef | None:
# Docstring inherited from a base class.
return self._registry.findDataset(
datasetType, dataId, collections=collections, timespan=timespan, **kwargs
if not isinstance(datasetType, DatasetType):
datasetType = self.getDatasetType(datasetType)

dataId = DataCoordinate.standardize(
dataId,
dimensions=datasetType.dimensions,
universe=self.dimensions,
defaults=self.defaults.dataId,
**kwargs,
)

with self._butler.query() as query:
resolved_collections = resolve_collections(self._butler, collections)
if not resolved_collections:
if collections is None:
raise NoDefaultCollectionError("No collections provided, and no default collections set")
else:
return None

if datasetType.isCalibration() and timespan is None:
# Filter out calibration collections, because with no timespan
# we have no way of selecting a dataset from them.
collection_info = self._butler.collections.query_info(
resolved_collections, flatten_chains=True
)
resolved_collections = [
info.name for info in collection_info if info.type != CollectionType.CALIBRATION
]
if not resolved_collections:
return None

result = query.datasets(datasetType, resolved_collections, find_first=True).limit(2)
dataset_type_name = result.dataset_type.name
# Search only on the 'required' dimensions for the dataset type.
# Any extra values provided by the user are ignored.
minimal_data_id = DataCoordinate.standardize(
dataId.subset(datasetType.dimensions.required).required, universe=self.dimensions
)
result = result.where(minimal_data_id)
if (
datasetType.isCalibration()
and timespan is not None
and (timespan.begin is not None or timespan.end is not None)
):
timespan_column = query.expression_factory[dataset_type_name].timespan
# The 'logical_or(timespan.is_null)' allows non-calibration
# collections to participate in the search, with the assumption
# that they are valid for any time range.
result = result.where(timespan_column.overlaps(timespan).logical_or(timespan_column.is_null))

datasets = list(result)
if len(datasets) == 1:
ref = datasets[0]
if dataId.hasRecords():
ref = ref.expanded(dataId)
# Propagate storage class from user-provided DatasetType, which
# may not match the definition in the database.
ref = ref.overrideStorageClass(datasetType.storageClass_name)
if datastore_records:
ref = self._registry.get_datastore_records(ref)
return ref
elif len(datasets) == 0:
return None
else:
raise CalibrationLookupError(
f"Ambiguous calibration lookup for {datasetType} with timespan {timespan}"
f" in collections {resolved_collections}."
)

def insertDatasets(
self,
datasetType: DatasetType | str,
Expand Down
2 changes: 1 addition & 1 deletion python/lsst/daf/butler/direct_butler/_direct_butler.py
Original file line number Diff line number Diff line change
Expand Up @@ -1302,7 +1302,7 @@ def find_dataset(

data_id, kwargs = self._rewrite_data_id(data_id, parent_type, **kwargs)

ref = self._registry.findDataset(
ref = self.registry.findDataset(
parent_type,
data_id,
collections=collections,
Expand Down
2 changes: 1 addition & 1 deletion python/lsst/daf/butler/queries/_expression_strings.py
Original file line number Diff line number Diff line change
Expand Up @@ -241,7 +241,7 @@ def visitIdentifier(self, name: str, node: Node) -> _VisitorResult:

def visitBind(self, name: str, node: Node) -> _VisitorResult:
if name not in self.context.bind:
raise InvalidQueryError("Name {name!r} is not in the bind map.")
raise InvalidQueryError(f"Name {name!r} is not in the bind map.")
# Logic in visitIdentifier handles binds.
return self.visitIdentifier(name, node)

Expand Down
174 changes: 2 additions & 172 deletions python/lsst/daf/butler/registry/sql_registry.py
Original file line number Diff line number Diff line change
Expand Up @@ -36,24 +36,18 @@
import warnings
from collections import defaultdict
from collections.abc import Iterable, Iterator, Mapping, Sequence
from typing import TYPE_CHECKING, Any, cast
from typing import TYPE_CHECKING, Any

import sqlalchemy

from lsst.resources import ResourcePathExpression
from lsst.utils.iteration import ensure_iterable

from .._collection_type import CollectionType
from .._column_tags import DatasetColumnTag
from .._config import Config
from .._dataset_ref import DatasetId, DatasetIdGenEnum, DatasetRef
from .._dataset_type import DatasetType
from .._exceptions import (
CalibrationLookupError,
DataIdValueError,
DimensionNameError,
InconsistentDataIdError,
)
from .._exceptions import DataIdValueError, DimensionNameError, InconsistentDataIdError
from .._storage_class import StorageClassFactory
from .._timespan import Timespan
from ..dimensions import (
Expand Down Expand Up @@ -90,7 +84,6 @@
from .._butler_config import ButlerConfig
from ..datastore._datastore import DatastoreOpaqueTable
from ..datastore.stored_file_info import StoredDatastoreItemInfo
from ..registry._registry import CollectionArgType
from ..registry.interfaces import (
CollectionRecord,
Database,
Expand Down Expand Up @@ -830,169 +823,6 @@ def supportsIdGenerationMode(self, mode: DatasetIdGenEnum) -> bool:
"""
return True

def findDataset(
self,
datasetType: DatasetType | str,
dataId: DataId | None = None,
*,
collections: CollectionArgType | None = None,
timespan: Timespan | None = None,
datastore_records: bool = False,
**kwargs: Any,
) -> DatasetRef | None:
"""Find a dataset given its `DatasetType` and data ID.

This can be used to obtain a `DatasetRef` that permits the dataset to
be read from a `Datastore`. If the dataset is a component and can not
be found using the provided dataset type, a dataset ref for the parent
will be returned instead but with the correct dataset type.

Parameters
----------
datasetType : `DatasetType` or `str`
A `DatasetType` or the name of one. If this is a `DatasetType`
instance, its storage class will be respected and propagated to
the output, even if it differs from the dataset type definition
in the registry, as long as the storage classes are convertible.
dataId : `dict` or `DataCoordinate`, optional
A `dict`-like object containing the `Dimension` links that identify
the dataset within a collection.
collections : collection expression, optional
An expression that fully or partially identifies the collections to
search for the dataset; see
:ref:`daf_butler_collection_expressions` for more information.
Defaults to ``self.defaults.collections``.
timespan : `Timespan`, optional
A timespan that the validity range of the dataset must overlap.
If not provided, any `~CollectionType.CALIBRATION` collections
matched by the ``collections`` argument will not be searched.
datastore_records : `bool`, optional
Whether to attach datastore records to the `DatasetRef`.
**kwargs
Additional keyword arguments passed to
`DataCoordinate.standardize` to convert ``dataId`` to a true
`DataCoordinate` or augment an existing one.

Returns
-------
ref : `DatasetRef`
A reference to the dataset, or `None` if no matching Dataset
was found.

Raises
------
lsst.daf.butler.registry.NoDefaultCollectionError
Raised if ``collections`` is `None` and
``self.defaults.collections`` is `None`.
LookupError
Raised if one or more data ID keys are missing.
lsst.daf.butler.registry.MissingDatasetTypeError
Raised if the dataset type does not exist.
lsst.daf.butler.registry.MissingCollectionError
Raised if any of ``collections`` does not exist in the registry.

Notes
-----
This method simply returns `None` and does not raise an exception even
when the set of collections searched is intrinsically incompatible with
the dataset type, e.g. if ``datasetType.isCalibration() is False``, but
only `~CollectionType.CALIBRATION` collections are being searched.
This may make it harder to debug some lookup failures, but the behavior
is intentional; we consider it more important that failed searches are
reported consistently, regardless of the reason, and that adding
additional collections that do not contain a match to the search path
never changes the behavior.

This method handles component dataset types automatically, though most
other registry operations do not.
"""
if collections is None:
if not self.defaults.collections:
raise NoDefaultCollectionError(
"No collections provided to findDataset, and no defaults from registry construction."
)
collections = self.defaults.collections
backend = queries.SqlQueryBackend(self._db, self._managers, self.dimension_record_cache)
with backend.caching_context():
collection_wildcard = CollectionWildcard.from_expression(collections, require_ordered=True)
if collection_wildcard.empty():
return None
matched_collections = backend.resolve_collection_wildcard(collection_wildcard)
resolved_dataset_type = backend.resolve_single_dataset_type_wildcard(datasetType)
dataId = DataCoordinate.standardize(
dataId,
dimensions=resolved_dataset_type.dimensions,
universe=self.dimensions,
defaults=self.defaults.dataId,
**kwargs,
)
governor_constraints = {name: {cast(str, dataId[name])} for name in dataId.dimensions.governors}
(filtered_collections,) = backend.filter_dataset_collections(
[resolved_dataset_type],
matched_collections,
governor_constraints=governor_constraints,
).values()
if not filtered_collections:
return None
if timespan is None:
filtered_collections = [
collection_record
for collection_record in filtered_collections
if collection_record.type is not CollectionType.CALIBRATION
]
if filtered_collections:
requested_columns = {"dataset_id", "run", "collection"}
with backend.context() as context:
predicate = context.make_data_coordinate_predicate(
dataId.subset(resolved_dataset_type.dimensions), full=False
)
if timespan is not None:
requested_columns.add("timespan")
predicate = predicate.logical_and(
context.make_timespan_overlap_predicate(
DatasetColumnTag(resolved_dataset_type.name, "timespan"), timespan
)
)
relation = backend.make_dataset_query_relation(
resolved_dataset_type, filtered_collections, requested_columns, context
).with_rows_satisfying(predicate)
rows = list(context.fetch_iterable(relation))
else:
rows = []
if not rows:
return None
elif len(rows) == 1:
best_row = rows[0]
else:
rank_by_collection_key = {record.key: n for n, record in enumerate(filtered_collections)}
collection_tag = DatasetColumnTag(resolved_dataset_type.name, "collection")
row_iter = iter(rows)
best_row = next(row_iter)
best_rank = rank_by_collection_key[best_row[collection_tag]]
have_tie = False
for row in row_iter:
if (rank := rank_by_collection_key[row[collection_tag]]) < best_rank:
best_row = row
best_rank = rank
have_tie = False
elif rank == best_rank:
have_tie = True
assert timespan is not None, "Rank ties should be impossible given DB constraints."
if have_tie:
raise CalibrationLookupError(
f"Ambiguous calibration lookup for {resolved_dataset_type.name} in collections "
f"{collection_wildcard.strings} with timespan {timespan}."
)
reader = queries.DatasetRefReader(
resolved_dataset_type,
translate_collection=lambda k: self._managers.collections[k].name,
)
ref = reader.read(best_row, data_id=dataId)
if datastore_records:
ref = self.get_datastore_records(ref)

return ref

@transactional
def insertDatasets(
self,
Expand Down
Loading
Loading