Skip to content

Commit 2543af7

Browse files
committed
Make maximum table size configurable in Cassandra
1 parent 5477a8d commit 2543af7

File tree

2 files changed

+30
-11
lines changed

2 files changed

+30
-11
lines changed

python/lsst/dax/apdb/cassandra/apdbCassandra.py

Lines changed: 29 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -813,6 +813,7 @@ def store(
813813
objects: pandas.DataFrame,
814814
sources: pandas.DataFrame | None = None,
815815
forced_sources: pandas.DataFrame | None = None,
816+
maximum_table_length: int = 0,
816817
) -> None:
817818
# docstring is inherited from a base class
818819
objects = self._fix_input_timestamps(objects)
@@ -828,17 +829,18 @@ def store(
828829

829830
# fill region partition column for DiaObjects
830831
objects = self._add_apdb_part(objects)
831-
self._storeDiaObjects(objects, visit_time, replica_chunk)
832+
self._storeDiaObjects(objects, visit_time, replica_chunk, maximum_table_length)
832833

833834
if sources is not None:
834835
# copy apdb_part column from DiaObjects to DiaSources
835836
sources = self._add_apdb_part(sources)
836-
self._storeDiaSources(ApdbTables.DiaSource, sources, replica_chunk)
837+
self._storeDiaSources(ApdbTables.DiaSource, sources, replica_chunk, maximum_table_length)
837838
self._storeDiaSourcesPartitions(sources, visit_time, replica_chunk)
838839

839840
if forced_sources is not None:
840841
forced_sources = self._add_apdb_part(forced_sources)
841-
self._storeDiaSources(ApdbTables.DiaForcedSource, forced_sources, replica_chunk)
842+
self._storeDiaSources(ApdbTables.DiaForcedSource, forced_sources, replica_chunk,
843+
maximum_table_length)
842844

843845
def storeSSObjects(self, objects: pandas.DataFrame) -> None:
844846
# docstring is inherited from a base class
@@ -1183,7 +1185,11 @@ def _deleteMovingObjects(self, objs: pandas.DataFrame) -> None:
11831185
timer.add_values(row_count=len(batch))
11841186

11851187
def _storeDiaObjects(
1186-
self, objs: pandas.DataFrame, visit_time: astropy.time.Time, replica_chunk: ReplicaChunk | None
1188+
self,
1189+
objs: pandas.DataFrame,
1190+
visit_time: astropy.time.Time,
1191+
replica_chunk: ReplicaChunk | None,
1192+
maximum_table_length: int,
11871193
) -> None:
11881194
"""Store catalog of DiaObjects from current visit.
11891195
@@ -1195,6 +1201,8 @@ def _storeDiaObjects(
11951201
Time of the current visit.
11961202
replica_chunk : `ReplicaChunk` or `None`
11971203
Replica chunk identifier if replication is configured.
1204+
maximum_table_length : `int`
1205+
Maximum table length to write in a single operation.
11981206
"""
11991207
if len(objs) == 0:
12001208
_LOG.debug("No objects to write to database.")
@@ -1229,6 +1237,7 @@ def _storeDiaSources(
12291237
table_name: ApdbTables,
12301238
sources: pandas.DataFrame,
12311239
replica_chunk: ReplicaChunk | None,
1240+
maximum_table_length: int,
12321241
) -> None:
12331242
"""Store catalog of DIASources or DIAForcedSources from current visit.
12341243
@@ -1238,38 +1247,41 @@ def _storeDiaSources(
12381247
Table where to store the data.
12391248
sources : `pandas.DataFrame`
12401249
Catalog containing DiaSource records
1241-
visit_time : `astropy.time.Time`
1242-
Time of the current visit.
12431250
replica_chunk : `ReplicaChunk` or `None`
12441251
Replica chunk identifier if replication is configured.
1252+
maximum_table_length : `int`
1253+
Maximum table length to write in a single operation.
12451254
"""
12461255
# Time partitioning has to be based on midpointMjdTai, not visit_time
12471256
# as visit_time is not really a visit time.
12481257
tp_sources = sources.copy(deep=False)
12491258
tp_sources["apdb_time_part"] = tp_sources["midpointMjdTai"].apply(self._time_partition)
12501259
extra_columns: dict[str, Any] = {}
12511260
if not self.config.partitioning.time_partition_tables:
1252-
self._storeObjectsPandas(tp_sources, table_name)
1261+
self._storeObjectsPandas(tp_sources, table_name, maximum_table_length=maximum_table_length)
12531262
else:
12541263
# Group by time partition
12551264
partitions = set(tp_sources["apdb_time_part"])
12561265
if len(partitions) == 1:
12571266
# Single partition - just save the whole thing.
12581267
time_part = partitions.pop()
1259-
self._storeObjectsPandas(sources, table_name, time_part=time_part)
1268+
self._storeObjectsPandas(sources, table_name, time_part=time_part,
1269+
maximum_table_length=maximum_table_length)
12601270
else:
12611271
# group by time partition.
12621272
for time_part, sub_frame in tp_sources.groupby(by="apdb_time_part"):
12631273
sub_frame.drop(columns="apdb_time_part", inplace=True)
1264-
self._storeObjectsPandas(sub_frame, table_name, time_part=time_part)
1274+
self._storeObjectsPandas(sub_frame, table_name, time_part=time_part,
1275+
maximum_table_length=maximum_table_length)
12651276

12661277
if replica_chunk is not None:
12671278
extra_columns = dict(apdb_replica_chunk=replica_chunk.id)
12681279
if table_name is ApdbTables.DiaSource:
12691280
extra_table = ExtraTables.DiaSourceChunks
12701281
else:
12711282
extra_table = ExtraTables.DiaForcedSourceChunks
1272-
self._storeObjectsPandas(sources, extra_table, extra_columns=extra_columns)
1283+
self._storeObjectsPandas(sources, extra_table, extra_columns=extra_columns,
1284+
maximum_table_length=maximum_table_length)
12731285

12741286
def _storeDiaSourcesPartitions(
12751287
self, sources: pandas.DataFrame, visit_time: astropy.time.Time, replica_chunk: ReplicaChunk | None
@@ -1299,6 +1311,7 @@ def _storeObjectsPandas(
12991311
table_name: ApdbTables | ExtraTables,
13001312
extra_columns: Mapping | None = None,
13011313
time_part: int | None = None,
1314+
maximum_table_length: int = 0,
13021315
) -> None:
13031316
"""Store generic objects.
13041317
@@ -1316,6 +1329,8 @@ def _storeObjectsPandas(
13161329
columns exist there.
13171330
time_part : `int`, optional
13181331
If not `None` then insert into a per-partition table.
1332+
maximum_table_length : int, optional
1333+
Maximum table length to write in a single operation.
13191334
13201335
Notes
13211336
-----
@@ -1329,6 +1344,9 @@ def _storeObjectsPandas(
13291344
extra_columns = {}
13301345
extra_fields = list(extra_columns.keys())
13311346

1347+
if maximum_table_length == 0:
1348+
maximum_table_length = 50_000_000
1349+
13321350
# Fields that will come from dataframe.
13331351
df_fields = [column for column in records.columns if column not in extra_fields]
13341352

@@ -1359,7 +1377,7 @@ def _storeObjectsPandas(
13591377
# Cassandra has 64k limit on batch size, normally that should be
13601378
# enough but some tests generate too many forced sources.
13611379
queries = []
1362-
for rec_chunk in chunk_iterable(records.itertuples(index=False), 50_000_000):
1380+
for rec_chunk in chunk_iterable(records.itertuples(index=False), maximum_table_length):
13631381
batch = cassandra.query.BatchStatement()
13641382
for rec in rec_chunk:
13651383
values = []

python/lsst/dax/apdb/sql/apdbSql.py

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -640,6 +640,7 @@ def store(
640640
objects: pandas.DataFrame,
641641
sources: pandas.DataFrame | None = None,
642642
forced_sources: pandas.DataFrame | None = None,
643+
maximum_table_length: int = 0,
643644
) -> None:
644645
# docstring is inherited from a base class
645646
objects = self._fix_input_timestamps(objects)

0 commit comments

Comments
 (0)