|
14 | 14 | # KIND, either express or implied. See the License for the |
15 | 15 | # specific language governing permissions and limitations |
16 | 16 | # under the License. |
17 | | -import hashlib |
18 | 17 | import math |
19 | 18 | import threading |
20 | 19 | from abc import ABC, abstractmethod |
| 20 | +from collections.abc import Hashable |
21 | 21 | from functools import singledispatch |
22 | 22 | from typing import ( |
23 | 23 | Any, |
|
31 | 31 | TypeVar, |
32 | 32 | ) |
33 | 33 |
|
| 34 | +from cachetools import LRUCache, cached |
| 35 | +from cachetools.keys import hashkey |
| 36 | + |
34 | 37 | from pyiceberg.conversions import from_bytes |
35 | 38 | from pyiceberg.expressions import ( |
36 | 39 | AlwaysFalse, |
@@ -1980,116 +1983,22 @@ def residual_for(self, partition_data: Record) -> BooleanExpression: |
1980 | 1983 | _DEFAULT_RESIDUAL_EVALUATOR_CACHE_SIZE = 128 |
1981 | 1984 |
|
1982 | 1985 |
|
1983 | | -class ResidualEvaluatorCache: |
1984 | | - """Thread-safe LRU cache for ResidualEvaluator instances. |
1985 | | -
|
1986 | | - Caches ResidualEvaluators to avoid repeated instantiation and initialization |
1987 | | - overhead when scanning multiple data files with identical partition specs, |
1988 | | - expressions, schemas, and case sensitivity settings. |
1989 | | - """ |
1990 | | - |
1991 | | - _cache: Dict[str, ResidualEvaluator] |
1992 | | - _maxsize: int |
1993 | | - _lock: threading.RLock |
1994 | | - |
1995 | | - def __init__(self, maxsize: int = _DEFAULT_RESIDUAL_EVALUATOR_CACHE_SIZE) -> None: |
1996 | | - """Initialize the cache. |
1997 | | -
|
1998 | | - Args: |
1999 | | - maxsize: Maximum number of evaluators to cache. Defaults to 128. |
2000 | | - """ |
2001 | | - self._cache = {} |
2002 | | - self._maxsize = maxsize |
2003 | | - self._lock = threading.RLock() |
2004 | | - |
2005 | | - @staticmethod |
2006 | | - def _make_key( |
2007 | | - spec_id: int, |
2008 | | - expr: BooleanExpression, |
2009 | | - case_sensitive: bool, |
2010 | | - schema_id: int | None = None, |
2011 | | - ) -> str: |
2012 | | - """Create deterministic cache key from evaluator parameters. |
2013 | | -
|
2014 | | - Args: |
2015 | | - spec_id: Partition spec identifier. |
2016 | | - expr: Filter expression tree. |
2017 | | - case_sensitive: Case-sensitive flag. |
2018 | | - schema_id: Optional schema identifier. |
2019 | | -
|
2020 | | - Returns: |
2021 | | - 32-character MD5 hex string cache key. |
2022 | | - """ |
2023 | | - key_parts = f"{spec_id}#{repr(expr)}#{case_sensitive}#{schema_id}" |
2024 | | - return hashlib.md5(key_parts.encode()).hexdigest() |
2025 | | - |
2026 | | - def get( |
2027 | | - self, |
2028 | | - spec: PartitionSpec, |
2029 | | - expr: BooleanExpression, |
2030 | | - case_sensitive: bool, |
2031 | | - schema: Schema, |
2032 | | - ) -> ResidualEvaluator | None: |
2033 | | - """Retrieve cached evaluator if it exists. |
2034 | | -
|
2035 | | - Args: |
2036 | | - spec: Partition specification. |
2037 | | - expr: Filter expression. |
2038 | | - case_sensitive: Case sensitivity flag. |
2039 | | - schema: Table schema. |
2040 | | -
|
2041 | | - Returns: |
2042 | | - Cached ResidualEvaluator or None. |
2043 | | - """ |
2044 | | - cache_key = self._make_key(spec.spec_id, expr, case_sensitive, schema.schema_id) |
2045 | | - with self._lock: |
2046 | | - return self._cache.get(cache_key) |
2047 | | - |
2048 | | - def put( |
2049 | | - self, |
2050 | | - spec: PartitionSpec, |
2051 | | - expr: BooleanExpression, |
2052 | | - case_sensitive: bool, |
2053 | | - schema: Schema, |
2054 | | - evaluator: ResidualEvaluator, |
2055 | | - ) -> None: |
2056 | | - """Cache a ResidualEvaluator instance. |
2057 | | -
|
2058 | | - Args: |
2059 | | - spec: Partition specification. |
2060 | | - expr: Filter expression. |
2061 | | - case_sensitive: Case sensitivity flag. |
2062 | | - schema: Table schema. |
2063 | | - evaluator: ResidualEvaluator to cache. |
2064 | | - """ |
2065 | | - cache_key = self._make_key(spec.spec_id, expr, case_sensitive, schema.schema_id) |
2066 | | - with self._lock: |
2067 | | - if len(self._cache) >= self._maxsize: |
2068 | | - oldest_key = next(iter(self._cache)) |
2069 | | - del self._cache[oldest_key] |
2070 | | - self._cache[cache_key] = evaluator |
2071 | | - |
2072 | | - def clear(self) -> None: |
2073 | | - """Clear all cached evaluators.""" |
2074 | | - with self._lock: |
2075 | | - self._cache.clear() |
2076 | | - |
2077 | | - |
2078 | | -_residual_evaluator_cache = ResidualEvaluatorCache() |
| 1986 | +def _residual_evaluator_cache_key( |
| 1987 | + spec: PartitionSpec, expr: BooleanExpression, case_sensitive: bool, schema: Schema |
| 1988 | +) -> Tuple[Hashable, ...]: |
| 1989 | + return hashkey(spec.spec_id, repr(expr), case_sensitive, schema.schema_id) |
2079 | 1990 |
|
2080 | 1991 |
|
| 1992 | +@cached( |
| 1993 | + cache=LRUCache(maxsize=_DEFAULT_RESIDUAL_EVALUATOR_CACHE_SIZE), |
| 1994 | + key=_residual_evaluator_cache_key, |
| 1995 | + lock=threading.RLock(), |
| 1996 | +) |
2081 | 1997 | def residual_evaluator_of( |
2082 | 1998 | spec: PartitionSpec, expr: BooleanExpression, case_sensitive: bool, schema: Schema |
2083 | 1999 | ) -> ResidualEvaluator: |
2084 | | - cached = _residual_evaluator_cache.get(spec, expr, case_sensitive, schema) |
2085 | | - if cached is not None: |
2086 | | - return cached |
2087 | | - |
2088 | | - evaluator: ResidualEvaluator |
2089 | | - if spec.is_unpartitioned(): |
2090 | | - evaluator = UnpartitionedResidualEvaluator(schema=schema, expr=expr) |
2091 | | - else: |
2092 | | - evaluator = ResidualEvaluator(spec=spec, expr=expr, schema=schema, case_sensitive=case_sensitive) |
2093 | | - |
2094 | | - _residual_evaluator_cache.put(spec, expr, case_sensitive, schema, evaluator) |
2095 | | - return evaluator |
| 2000 | + return ( |
| 2001 | + UnpartitionedResidualEvaluator(schema=schema, expr=expr) |
| 2002 | + if spec.is_unpartitioned() |
| 2003 | + else ResidualEvaluator(spec=spec, expr=expr, schema=schema, case_sensitive=case_sensitive) |
| 2004 | + ) |
0 commit comments