Skip to content

Commit 326884c

Browse files
kevinjqliuCopilot
andauthored
fix manifest cache (#2951)
<!-- Thanks for opening a pull request! --> <!-- In the case this PR will resolve an issue, please replace ${GITHUB_ISSUE_ID} below with the actual Github issue id. --> <!-- Closes #${GITHUB_ISSUE_ID} --> # Rationale for this change Fix part of #2325 Context: #2325 (comment) Cache Manifest File object instead of Manifest List (tuple of Manifest Files). This PR fix the O(N²) cache inefficiency, into the expected O(N) linear growth pattern. ## Are these changes tested? Yes, with benchmark test (`tests/benchmark/test_memory_benchmark.py`) Result running from main branch: https://gist.github.com/kevinjqliu/970f4b51a12aaa0318a2671173430736 Result running from this branch: https://gist.github.com/kevinjqliu/24990d18d2cea2fa468597c16bfa27fd ### Benchmark Comparison: main vs kevinjqliu/fix-manifest-cache | Test | main | fix branch | |------|:----:|:----------:| | `test_manifest_cache_memory_growth` | ❌ FAILED | ✅ PASSED | | `test_memory_after_gc_with_cache_cleared` | ✅ PASSED | ✅ PASSED | | `test_manifest_cache_deduplication_efficiency` | ✅ PASSED | ✅ PASSED | #### Memory Growth Benchmark (50 append operations) | Metric | main | fix branch | Improvement | |--------|-----:|----------:|------------:| | Initial memory | 3,233.4 KB | 3,210.7 KB | -0.7% | | Final memory | 4,280.6 KB | 3,558.9 KB | **-16.9%** | | Total growth | 1,047.2 KB | 348.1 KB | **-66.8%** | | Growth per iteration | 26,809 bytes | 8,913 bytes | **-66.8%** | #### Memory at Each Iteration | Iteration | main | fix branch | Δ | |----------:|-----:|-----------:|--:| | 10 | 3,233.4 KB | 3,210.7 KB | -22.7 KB | | 20 | 3,471.0 KB | 3,371.4 KB | -99.6 KB | | 30 | 3,719.3 KB | 3,467.1 KB | -252.2 KB | | 40 | 3,943.9 KB | 3,483.2 KB | -460.7 KB | | 50 | 4,280.6 KB | 3,558.9 KB | **-721.7 KB** | This fix reduces memory growth by ~67%, bringing per-iteration growth from ~27 KB down to ~9 KB. The improvement comes from caching individual `ManifestFile` objects by their `manifest_path` instead of caching entire manifest list tuples. This deduplicates `ManifestFile` objects that appear in multiple manifest lists (common after appends). ## Are there any user-facing changes? <!-- In the case of user-facing changes, please add the changelog label. --> --------- Co-authored-by: Copilot <175728472+Copilot@users.noreply.github.com>
1 parent 33bfbc8 commit 326884c

File tree

3 files changed

+614
-25
lines changed

3 files changed

+614
-25
lines changed

pyiceberg/manifest.py

Lines changed: 44 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -28,8 +28,7 @@
2828
Literal,
2929
)
3030

31-
from cachetools import LRUCache, cached
32-
from cachetools.keys import hashkey
31+
from cachetools import LRUCache
3332
from pydantic_core import to_json
3433

3534
from pyiceberg.avro.codecs import AVRO_CODEC_KEY, AvroCompressionCodec
@@ -892,15 +891,53 @@ def __hash__(self) -> int:
892891
return hash(self.manifest_path)
893892

894893

895-
# Global cache for manifest lists
896-
_manifest_cache: LRUCache[Any, tuple[ManifestFile, ...]] = LRUCache(maxsize=128)
894+
# Global cache for ManifestFile objects, keyed by manifest_path.
895+
# This deduplicates ManifestFile objects across manifest lists, which commonly
896+
# share manifests after append operations.
897+
_manifest_cache: LRUCache[str, ManifestFile] = LRUCache(maxsize=128)
898+
899+
# Lock for thread-safe cache access
900+
_manifest_cache_lock = threading.RLock()
897901

898902

899-
@cached(cache=_manifest_cache, key=lambda io, manifest_list: hashkey(manifest_list), lock=threading.RLock())
900903
def _manifests(io: FileIO, manifest_list: str) -> tuple[ManifestFile, ...]:
901-
"""Read and cache manifests from the given manifest list, returning a tuple to prevent modification."""
904+
"""Read manifests from a manifest list, deduplicating ManifestFile objects via cache.
905+
906+
Caches individual ManifestFile objects by manifest_path. This is memory-efficient
907+
because consecutive manifest lists typically share most of their manifests:
908+
909+
ManifestList1: [ManifestFile1]
910+
ManifestList2: [ManifestFile1, ManifestFile2]
911+
ManifestList3: [ManifestFile1, ManifestFile2, ManifestFile3]
912+
913+
With per-ManifestFile caching, each ManifestFile is stored once and reused.
914+
915+
Note: The manifest list file is re-read on each call. This is intentional to
916+
keep the implementation simple and avoid O(N²) memory growth from caching
917+
overlapping manifest list tuples. Re-reading is cheap since manifest lists
918+
are small metadata files.
919+
920+
Args:
921+
io: FileIO instance for reading the manifest list.
922+
manifest_list: Path to the manifest list file.
923+
924+
Returns:
925+
A tuple of ManifestFile objects.
926+
"""
902927
file = io.new_input(manifest_list)
903-
return tuple(read_manifest_list(file))
928+
manifest_files = list(read_manifest_list(file))
929+
930+
result = []
931+
with _manifest_cache_lock:
932+
for manifest_file in manifest_files:
933+
manifest_path = manifest_file.manifest_path
934+
if manifest_path in _manifest_cache:
935+
result.append(_manifest_cache[manifest_path])
936+
else:
937+
_manifest_cache[manifest_path] = manifest_file
938+
result.append(manifest_file)
939+
940+
return tuple(result)
904941

905942

906943
def read_manifest_list(input_file: InputFile) -> Iterator[ManifestFile]:
Lines changed: 287 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,287 @@
1+
# Licensed to the Apache Software Foundation (ASF) under one
2+
# or more contributor license agreements. See the NOTICE file
3+
# distributed with this work for additional information
4+
# regarding copyright ownership. The ASF licenses this file
5+
# to you under the Apache License, Version 2.0 (the
6+
# "License"); you may not use this file except in compliance
7+
# with the License. You may obtain a copy of the License at
8+
#
9+
# http://www.apache.org/licenses/LICENSE-2.0
10+
#
11+
# Unless required by applicable law or agreed to in writing,
12+
# software distributed under the License is distributed on an
13+
# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14+
# KIND, either express or implied. See the License for the
15+
# specific language governing permissions and limitations
16+
# under the License.
17+
"""Memory benchmarks for manifest cache efficiency.
18+
19+
These benchmarks reproduce the manifest cache memory issue described in:
20+
https://github.com/apache/iceberg-python/issues/2325
21+
22+
The issue: When caching manifest lists as tuples, overlapping ManifestFile objects
23+
are duplicated across cache entries, causing O(N²) memory growth instead of O(N).
24+
25+
Run with: uv run pytest tests/benchmark/test_memory_benchmark.py -v -s -m benchmark
26+
"""
27+
28+
import gc
29+
import tracemalloc
30+
from datetime import datetime, timezone
31+
32+
import pyarrow as pa
33+
import pytest
34+
35+
from pyiceberg.catalog.memory import InMemoryCatalog
36+
from pyiceberg.manifest import _manifest_cache
37+
38+
39+
def generate_test_dataframe() -> pa.Table:
40+
"""Generate a PyArrow table for testing, similar to the issue's example."""
41+
n_rows = 100 # Smaller for faster tests, increase for more realistic benchmarks
42+
43+
return pa.table(
44+
{
45+
"event_type": ["playback"] * n_rows,
46+
"event_origin": ["origin1"] * n_rows,
47+
"event_send_at": [datetime.now(timezone.utc)] * n_rows,
48+
"event_saved_at": [datetime.now(timezone.utc)] * n_rows,
49+
"id": list(range(n_rows)),
50+
"reference_id": [f"ref-{i}" for i in range(n_rows)],
51+
}
52+
)
53+
54+
55+
@pytest.fixture
56+
def memory_catalog(tmp_path_factory: pytest.TempPathFactory) -> InMemoryCatalog:
57+
"""Create an in-memory catalog for memory testing."""
58+
warehouse_path = str(tmp_path_factory.mktemp("warehouse"))
59+
catalog = InMemoryCatalog("memory_test", warehouse=f"file://{warehouse_path}")
60+
catalog.create_namespace("default")
61+
return catalog
62+
63+
64+
@pytest.fixture(autouse=True)
65+
def clear_caches() -> None:
66+
"""Clear caches before each test."""
67+
_manifest_cache.clear()
68+
gc.collect()
69+
70+
71+
@pytest.mark.benchmark
72+
def test_manifest_cache_memory_growth(memory_catalog: InMemoryCatalog) -> None:
73+
"""Benchmark memory growth of manifest cache during repeated appends.
74+
75+
This test reproduces the issue from GitHub #2325 where each append creates
76+
a new manifest list entry in the cache, causing memory to grow.
77+
78+
With the old caching strategy (tuple per manifest list), memory grew as O(N²).
79+
With the new strategy (individual ManifestFile objects), memory grows as O(N).
80+
"""
81+
df = generate_test_dataframe()
82+
table = memory_catalog.create_table("default.memory_test", schema=df.schema)
83+
84+
tracemalloc.start()
85+
86+
num_iterations = 50
87+
memory_samples: list[tuple[int, int, int]] = [] # (iteration, current_memory, cache_size)
88+
89+
print("\n--- Manifest Cache Memory Growth Benchmark ---")
90+
print(f"Running {num_iterations} append operations...")
91+
92+
for i in range(num_iterations):
93+
table.append(df)
94+
95+
# Sample memory at intervals
96+
if (i + 1) % 10 == 0:
97+
current, _ = tracemalloc.get_traced_memory()
98+
cache_size = len(_manifest_cache)
99+
100+
memory_samples.append((i + 1, current, cache_size))
101+
print(f" Iteration {i + 1}: Memory={current / 1024:.1f} KB, Cache entries={cache_size}")
102+
103+
tracemalloc.stop()
104+
105+
# Analyze memory growth
106+
if len(memory_samples) >= 2:
107+
first_memory = memory_samples[0][1]
108+
last_memory = memory_samples[-1][1]
109+
memory_growth = last_memory - first_memory
110+
growth_per_iteration = memory_growth / (memory_samples[-1][0] - memory_samples[0][0])
111+
112+
print("\nResults:")
113+
print(f" Initial memory: {first_memory / 1024:.1f} KB")
114+
print(f" Final memory: {last_memory / 1024:.1f} KB")
115+
print(f" Total growth: {memory_growth / 1024:.1f} KB")
116+
print(f" Growth per iteration: {growth_per_iteration:.1f} bytes")
117+
print(f" Final cache size: {memory_samples[-1][2]} entries")
118+
119+
# With efficient caching, growth should be roughly linear (O(N))
120+
# rather than quadratic (O(N²)) as it was before
121+
# Memory growth includes ManifestFile objects, metadata, and other overhead
122+
# We expect about 5-10 KB per iteration for typical workloads
123+
# The key improvement is that growth is O(N) not O(N²)
124+
# Threshold of 15KB/iteration based on observed behavior - O(N²) would show ~50KB+/iteration
125+
max_memory_growth_per_iteration_bytes = 15000
126+
assert growth_per_iteration < max_memory_growth_per_iteration_bytes, (
127+
f"Memory growth per iteration ({growth_per_iteration:.0f} bytes) is too high. "
128+
"This may indicate the O(N²) cache inefficiency is present."
129+
)
130+
131+
132+
@pytest.mark.benchmark
133+
def test_memory_after_gc_with_cache_cleared(memory_catalog: InMemoryCatalog) -> None:
134+
"""Test that clearing the cache allows memory to be reclaimed.
135+
136+
This test verifies that when we clear the manifest cache, the associated
137+
memory can be garbage collected.
138+
"""
139+
df = generate_test_dataframe()
140+
table = memory_catalog.create_table("default.gc_test", schema=df.schema)
141+
142+
tracemalloc.start()
143+
144+
print("\n--- Memory After GC Benchmark ---")
145+
146+
# Phase 1: Fill the cache
147+
print("Phase 1: Filling cache with 20 appends...")
148+
for _ in range(20):
149+
table.append(df)
150+
151+
gc.collect()
152+
before_clear_memory, _ = tracemalloc.get_traced_memory()
153+
cache_size_before = len(_manifest_cache)
154+
print(f" Memory before clear: {before_clear_memory / 1024:.1f} KB")
155+
print(f" Cache size: {cache_size_before}")
156+
157+
# Phase 2: Clear cache and GC
158+
print("\nPhase 2: Clearing cache and running GC...")
159+
_manifest_cache.clear()
160+
gc.collect()
161+
gc.collect() # Multiple GC passes for thorough cleanup
162+
163+
after_clear_memory, _ = tracemalloc.get_traced_memory()
164+
print(f" Memory after clear: {after_clear_memory / 1024:.1f} KB")
165+
print(f" Memory reclaimed: {(before_clear_memory - after_clear_memory) / 1024:.1f} KB")
166+
167+
tracemalloc.stop()
168+
169+
memory_reclaimed = before_clear_memory - after_clear_memory
170+
print("\nResults:")
171+
print(f" Memory reclaimed by clearing cache: {memory_reclaimed / 1024:.1f} KB")
172+
173+
# Verify that clearing the cache actually freed some memory
174+
# Note: This may be flaky in some environments due to GC behavior
175+
assert memory_reclaimed >= 0, "Memory should not increase after clearing cache"
176+
177+
178+
@pytest.mark.benchmark
179+
def test_manifest_cache_deduplication_efficiency() -> None:
180+
"""Benchmark the efficiency of the per-ManifestFile caching strategy.
181+
182+
This test verifies that when multiple manifest lists share the same
183+
ManifestFile objects, they are properly deduplicated in the cache.
184+
"""
185+
from tempfile import TemporaryDirectory
186+
187+
from pyiceberg.io.pyarrow import PyArrowFileIO
188+
from pyiceberg.manifest import (
189+
DataFile,
190+
DataFileContent,
191+
FileFormat,
192+
ManifestEntry,
193+
ManifestEntryStatus,
194+
_manifests,
195+
write_manifest,
196+
write_manifest_list,
197+
)
198+
from pyiceberg.partitioning import UNPARTITIONED_PARTITION_SPEC
199+
from pyiceberg.schema import Schema
200+
from pyiceberg.typedef import Record
201+
from pyiceberg.types import IntegerType, NestedField
202+
203+
io = PyArrowFileIO()
204+
205+
print("\n--- Manifest Cache Deduplication Benchmark ---")
206+
207+
with TemporaryDirectory() as tmp_dir:
208+
schema = Schema(NestedField(field_id=1, name="id", field_type=IntegerType(), required=True))
209+
spec = UNPARTITIONED_PARTITION_SPEC
210+
211+
# Create N manifest files
212+
num_manifests = 20
213+
manifest_files = []
214+
215+
print(f"Creating {num_manifests} manifest files...")
216+
for i in range(num_manifests):
217+
manifest_path = f"{tmp_dir}/manifest_{i}.avro"
218+
with write_manifest(
219+
format_version=2,
220+
spec=spec,
221+
schema=schema,
222+
output_file=io.new_output(manifest_path),
223+
snapshot_id=i + 1,
224+
avro_compression="null",
225+
) as writer:
226+
data_file = DataFile.from_args(
227+
content=DataFileContent.DATA,
228+
file_path=f"{tmp_dir}/data_{i}.parquet",
229+
file_format=FileFormat.PARQUET,
230+
partition=Record(),
231+
record_count=100,
232+
file_size_in_bytes=1000,
233+
)
234+
writer.add_entry(
235+
ManifestEntry.from_args(
236+
status=ManifestEntryStatus.ADDED,
237+
snapshot_id=i + 1,
238+
data_file=data_file,
239+
)
240+
)
241+
manifest_files.append(writer.to_manifest_file())
242+
243+
# Create multiple manifest lists with overlapping manifest files
244+
# List i contains manifest files 0 through i
245+
num_lists = 10
246+
print(f"Creating {num_lists} manifest lists with overlapping manifests...")
247+
248+
_manifest_cache.clear()
249+
250+
for i in range(num_lists):
251+
list_path = f"{tmp_dir}/manifest-list_{i}.avro"
252+
manifests_to_include = manifest_files[: i + 1]
253+
254+
with write_manifest_list(
255+
format_version=2,
256+
output_file=io.new_output(list_path),
257+
snapshot_id=i + 1,
258+
parent_snapshot_id=i if i > 0 else None,
259+
sequence_number=i + 1,
260+
avro_compression="null",
261+
) as list_writer:
262+
list_writer.add_manifests(manifests_to_include)
263+
264+
# Read the manifest list using _manifests (this populates the cache)
265+
_manifests(io, list_path)
266+
267+
# Analyze cache efficiency
268+
cache_entries = len(_manifest_cache)
269+
# List i contains manifests 0..i, so only the first num_lists manifests are actually used
270+
manifests_actually_used = num_lists
271+
272+
print("\nResults:")
273+
print(f" Manifest lists created: {num_lists}")
274+
print(f" Manifest files created: {num_manifests}")
275+
print(f" Manifest files actually used: {manifests_actually_used}")
276+
print(f" Cache entries: {cache_entries}")
277+
278+
# With efficient per-ManifestFile caching, we should have exactly
279+
# manifests_actually_used entries (one per unique manifest path)
280+
print(f"\n Expected cache entries (efficient): {manifests_actually_used}")
281+
print(f" Actual cache entries: {cache_entries}")
282+
283+
# The cache should be efficient - one entry per unique manifest path
284+
assert cache_entries == manifests_actually_used, (
285+
f"Cache has {cache_entries} entries, expected exactly {manifests_actually_used}. "
286+
"The cache may not be deduplicating properly."
287+
)

0 commit comments

Comments
 (0)