|
5 | 5 | import dataclasses
|
6 | 6 | import datetime
|
7 | 7 | import logging
|
8 |
| -from collections.abc import Iterable, Mapping, Sequence, Set |
| 8 | +from collections.abc import Callable, Iterable, Mapping, Sequence, Set |
| 9 | +from contextlib import AbstractContextManager |
9 | 10 | from typing import TYPE_CHECKING, Any, ClassVar
|
10 | 11 |
|
11 | 12 | import astropy.time
|
|
24 | 25 | from ...._timespan import Timespan
|
25 | 26 | from ....dimensions import DataCoordinate, DimensionGroup, DimensionUniverse
|
26 | 27 | from ....direct_query_driver import SqlJoinsBuilder, SqlSelectBuilder # new query system, server+direct only
|
| 28 | +from ....queries import Query |
27 | 29 | from ....queries import tree as qt # new query system, both clients + server
|
28 | 30 | from ..._caching_context import CachingContext
|
29 | 31 | from ..._collection_summary import CollectionSummary
|
@@ -966,7 +968,7 @@ def certify(
|
966 | 968 | collection: CollectionRecord,
|
967 | 969 | datasets: Iterable[DatasetRef],
|
968 | 970 | timespan: Timespan,
|
969 |
| - context: SqlQueryContext, |
| 971 | + query_func: Callable[[], AbstractContextManager[Query]], |
970 | 972 | ) -> None:
|
971 | 973 | # Docstring inherited from DatasetRecordStorageManager.
|
972 | 974 | if (storage := self._find_storage(dataset_type.name)) is None:
|
@@ -1019,23 +1021,25 @@ def certify(
|
1019 | 1021 | ) from err
|
1020 | 1022 | else:
|
1021 | 1023 | # Have to implement exclusion constraint ourselves.
|
1022 |
| - # Start by building a SELECT query for any rows that would overlap |
1023 |
| - # this one. |
1024 |
| - relation = self._build_calib_overlap_query(dataset_type, collection, data_ids, timespan, context) |
1025 | 1024 | # Acquire a table lock to ensure there are no concurrent writes
|
1026 | 1025 | # could invalidate our checking before we finish the inserts. We
|
1027 | 1026 | # use a SAVEPOINT in case there is an outer transaction that a
|
1028 | 1027 | # failure here should not roll back.
|
1029 |
| - with self._db.transaction(lock=[calibs_table], savepoint=True): |
1030 |
| - # Enter SqlQueryContext in case we need to use a temporary |
1031 |
| - # table to include the give data IDs in the query. Note that |
1032 |
| - # by doing this inside the transaction, we make sure it doesn't |
1033 |
| - # attempt to close the session when its done, since it just |
1034 |
| - # sees an already-open session that it knows it shouldn't |
1035 |
| - # manage. |
1036 |
| - with context: |
1037 |
| - # Run the check SELECT query. |
1038 |
| - conflicting = context.count(context.process(relation)) |
| 1028 | + with self._db.transaction( |
| 1029 | + lock=[calibs_table], |
| 1030 | + savepoint=True, |
| 1031 | + # join_data_coordinates sometimes requires a temp table |
| 1032 | + for_temp_tables=True, |
| 1033 | + ): |
| 1034 | + # Query for any rows that would overlap this one. |
| 1035 | + with query_func() as query: |
| 1036 | + if data_ids is not None: |
| 1037 | + query = query.join_data_coordinates(data_ids) |
| 1038 | + timespan_column = query.expression_factory[dataset_type.name].timespan |
| 1039 | + result = query.datasets(dataset_type, collection.name).where( |
| 1040 | + timespan_column.overlaps(timespan) |
| 1041 | + ) |
| 1042 | + conflicting = result.count() |
1039 | 1043 | if conflicting > 0:
|
1040 | 1044 | raise ConflictingDefinitionError(
|
1041 | 1045 | f"{conflicting} validity range conflicts certifying datasets of type "
|
|
0 commit comments