Skip to content

Commit debc402

Browse files
committed
Remove daf_relation from registry.certify()
Modify registry.certify() to use the new query system instead of daf_relation, because we plan to retire daf_relation.
1 parent 35ec7c2 commit debc402

File tree

3 files changed

+35
-31
lines changed

3 files changed

+35
-31
lines changed

python/lsst/daf/butler/registry/datasets/byDimensions/_manager.py

Lines changed: 19 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -5,7 +5,8 @@
55
import dataclasses
66
import datetime
77
import logging
8-
from collections.abc import Iterable, Mapping, Sequence, Set
8+
from collections.abc import Callable, Iterable, Mapping, Sequence, Set
9+
from contextlib import AbstractContextManager
910
from typing import TYPE_CHECKING, Any, ClassVar
1011

1112
import astropy.time
@@ -24,6 +25,7 @@
2425
from ...._timespan import Timespan
2526
from ....dimensions import DataCoordinate, DimensionGroup, DimensionUniverse
2627
from ....direct_query_driver import SqlJoinsBuilder, SqlSelectBuilder # new query system, server+direct only
28+
from ....queries import Query
2729
from ....queries import tree as qt # new query system, both clients + server
2830
from ..._caching_context import CachingContext
2931
from ..._collection_summary import CollectionSummary
@@ -966,7 +968,7 @@ def certify(
966968
collection: CollectionRecord,
967969
datasets: Iterable[DatasetRef],
968970
timespan: Timespan,
969-
context: SqlQueryContext,
971+
query_func: Callable[[], AbstractContextManager[Query]],
970972
) -> None:
971973
# Docstring inherited from DatasetRecordStorageManager.
972974
if (storage := self._find_storage(dataset_type.name)) is None:
@@ -1019,23 +1021,25 @@ def certify(
10191021
) from err
10201022
else:
10211023
# Have to implement exclusion constraint ourselves.
1022-
# Start by building a SELECT query for any rows that would overlap
1023-
# this one.
1024-
relation = self._build_calib_overlap_query(dataset_type, collection, data_ids, timespan, context)
10251024
# Acquire a table lock to ensure there are no concurrent writes
10261025
# could invalidate our checking before we finish the inserts. We
10271026
# use a SAVEPOINT in case there is an outer transaction that a
10281027
# failure here should not roll back.
1029-
with self._db.transaction(lock=[calibs_table], savepoint=True):
1030-
# Enter SqlQueryContext in case we need to use a temporary
1031-
# table to include the give data IDs in the query. Note that
1032-
# by doing this inside the transaction, we make sure it doesn't
1033-
# attempt to close the session when its done, since it just
1034-
# sees an already-open session that it knows it shouldn't
1035-
# manage.
1036-
with context:
1037-
# Run the check SELECT query.
1038-
conflicting = context.count(context.process(relation))
1028+
with self._db.transaction(
1029+
lock=[calibs_table],
1030+
savepoint=True,
1031+
# join_data_coordinates sometimes requires a temp table
1032+
for_temp_tables=True,
1033+
):
1034+
# Query for any rows that would overlap this one.
1035+
with query_func() as query:
1036+
if data_ids is not None:
1037+
query = query.join_data_coordinates(data_ids)
1038+
timespan_column = query.expression_factory[dataset_type.name].timespan
1039+
result = query.datasets(dataset_type, collection.name).where(
1040+
timespan_column.overlaps(timespan)
1041+
)
1042+
conflicting = result.count()
10391043
if conflicting > 0:
10401044
raise ConflictingDefinitionError(
10411045
f"{conflicting} validity range conflicts certifying datasets of type "

python/lsst/daf/butler/registry/interfaces/_datasets.py

Lines changed: 8 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -32,7 +32,8 @@
3232
__all__ = ("DatasetRecordStorageManager",)
3333

3434
from abc import abstractmethod
35-
from collections.abc import Iterable, Mapping, Sequence, Set
35+
from collections.abc import Callable, Iterable, Mapping, Sequence, Set
36+
from contextlib import AbstractContextManager
3637
from typing import TYPE_CHECKING, Any
3738

3839
from lsst.daf.relation import Relation
@@ -42,6 +43,7 @@
4243
from ..._exceptions import DatasetTypeError, DatasetTypeNotSupportedError
4344
from ..._timespan import Timespan
4445
from ...dimensions import DataCoordinate
46+
from ...queries import Query
4547
from ._versioning import VersionedExtension, VersionTuple
4648

4749
if TYPE_CHECKING:
@@ -504,7 +506,7 @@ def certify(
504506
collection: CollectionRecord,
505507
datasets: Iterable[DatasetRef],
506508
timespan: Timespan,
507-
context: SqlQueryContext,
509+
query_func: Callable[[], AbstractContextManager[Query]],
508510
) -> None:
509511
"""Associate one or more datasets with a calibration collection and a
510512
validity range within it.
@@ -521,9 +523,10 @@ def certify(
521523
`DatasetType` as ``dataset_type``, but this is not checked.
522524
timespan : `Timespan`
523525
The validity range for these datasets within the collection.
524-
context : `SqlQueryContext`
525-
The object that manages database connections, temporary tables and
526-
relation engines for this query.
526+
query_func : `Callable` [[], `AbstractContextManager` [ `Query` ]],
527+
Function returning a context manager that sets up a `Query` object
528+
for querying the registry. (That is, a function equivalent to
529+
`Butler.query()`).
527530
528531
Raises
529532
------

python/lsst/daf/butler/registry/sql_registry.py

Lines changed: 8 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -1161,17 +1161,14 @@ def certify(self, collection: str, refs: Iterable[DatasetRef], timespan: Timespa
11611161
``collection.type is not CollectionType.CALIBRATION``.
11621162
"""
11631163
progress = Progress("lsst.daf.butler.Registry.certify", level=logging.DEBUG)
1164-
collectionRecord = self._managers.collections.find(collection)
1165-
for datasetType, refsForType in progress.iter_item_chunks(
1166-
DatasetRef.iter_by_type(refs), desc="Certifying datasets by type"
1167-
):
1168-
self._managers.datasets.certify(
1169-
datasetType,
1170-
collectionRecord,
1171-
refsForType,
1172-
timespan,
1173-
context=queries.SqlQueryContext(self._db, self._managers.column_types),
1174-
)
1164+
with self._managers.caching_context.enable_collection_record_cache():
1165+
collectionRecord = self._managers.collections.find(collection)
1166+
for datasetType, refsForType in progress.iter_item_chunks(
1167+
DatasetRef.iter_by_type(refs), desc="Certifying datasets by type"
1168+
):
1169+
self._managers.datasets.certify(
1170+
datasetType, collectionRecord, refsForType, timespan, self._query
1171+
)
11751172

11761173
@transactional
11771174
def decertify(

0 commit comments

Comments
 (0)