Closed
Description
Environment
Delta-rs version: 0.18.2
Binding: Python
Environment:
- Cloud provider: AWS
- OS: Debian GNU/Linux 12 (bookworm)
- Other: python 3.11
Bug
What happened:
- delta table with ±8 million rows, partitioned with largest partition 800k rows, size ±550GiB according to describe detail.
- table optimized with .compact() and checkpoint every 100 commits.
- merge operation with 1k rows and a single partition in predicate: part_col IN ('123456789')
- memory spikes from ±200MiB to ±14GiB on .execute() - see memory_profiler output:
Line # Mem usage Increment Occurrences Line Contents
=============================================================
35 200.7 MiB 200.7 MiB 1 @profile
36 def _perform_merge(self, detections: List[Detection], class_blacklist: List[str] = None):
37 # upsert detections to datastore
38 202.2 MiB 1.5 MiB 1 batch = build_pyarrow_batch(detections=detections, class_blacklist=class_blacklist)
39 202.2 MiB 0.0 MiB 1 logging.info(f'upserting batch of [{batch.num_rows}] detections to delta table')
40 202.2 MiB 0.0 MiB 1 if batch.num_rows > 0:
41 209.2 MiB 7.0 MiB 2 dt = get_delta_table(table_path=self.delta_table_path,
42 202.2 MiB 0.0 MiB 1 dynamo_table_name=self.dynamo_table_name)
43 209.2 MiB 0.0 MiB 1 h3_indices_column = batch.column('h3_indices')
44 209.2 MiB 0.0 MiB 1 partition_key_values = self._get_partition_keys(h3_indices=h3_indices_column.to_pylist())
45 209.2 MiB 0.0 MiB 1 merge_predicate = f'{TARGET_ALIAS}.{HD_MAP_FRAME_DETECTION_PARTITION_BY_COLUMN} in ({partition_key_values}) and {TARGET_ALIAS}.{HD_MAP_FRAME_DETECTION_KEY_COLUMN} = {SOURCE_ALIAS}.{HD_MAP_FRAME_DETECTION_KEY_COLUMN}'
46 209.2 MiB 0.0 MiB 1 update_predicate = f'{SOURCE_ALIAS}.{HD_MAP_FRAME_DETECTION_INFERENCE_AT_MS_COLUMN} >= {TARGET_ALIAS}.{HD_MAP_FRAME_DETECTION_INFERENCE_AT_MS_COLUMN}'
47 14466.7 MiB 0.0 MiB 1 metrics = (
48 209.2 MiB 0.0 MiB 2 dt.merge(
49 209.2 MiB 0.0 MiB 1 source=batch,
50 209.2 MiB 0.0 MiB 1 predicate=merge_predicate,
51 209.2 MiB 0.0 MiB 1 source_alias=SOURCE_ALIAS,
52 209.2 MiB 0.0 MiB 1 target_alias=TARGET_ALIAS,
53 209.2 MiB 0.0 MiB 1 large_dtypes=False
54 )
55 209.2 MiB 0.0 MiB 1 .when_matched_update_all(predicate=update_predicate)
56 209.2 MiB 0.0 MiB 1 .when_not_matched_insert_all()
57 14466.7 MiB 14257.5 MiB 1 .execute()
58 )
59 14466.7 MiB 0.0 MiB 1 logging.info(f'merged with metrics {metrics}...')
60 14466.7 MiB 0.0 MiB 1 if dt.version() % OPTIMIZE_FREQUENCY == 0:
61 try:
62 self._optimize(dt)
63 except Exception as e:
64 logging.warning(f'error optimizing [{dt.table_uri}], will SKIP... [{e}]')
- Running on a pod with 16GiB, these spikes result in OOM

What you expected to happen:
Memory to remain within reasonable limits
How to reproduce it:
More details:
Activity