Skip to content

Commit 3bab775

Browse files
committed
feat: list_query: Filterable list
1 parent 8a65bf2 commit 3bab775

File tree

4 files changed

+476
-0
lines changed

4 files changed

+476
-0
lines changed

docs/contributing/internals.md

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -18,3 +18,8 @@ stability policy.
1818
.. autoapimodule:: libvcs.types
1919
:members:
2020
```
21+
22+
```{eval-rst}
23+
.. autoapimodule:: libvcs.utils.list_query
24+
:members:
25+
```

libvcs/utils/__init__.py

Whitespace-only changes.

libvcs/utils/list_query.py

Lines changed: 231 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,231 @@
1+
import dataclasses
2+
import re
3+
import traceback
4+
from typing import Any, Callable, Generic, Optional, Protocol, Sequence, TypeVar, Union
5+
6+
T = TypeVar("T", Any, Any)
7+
8+
9+
def keygetter(obj, path):
10+
"""obj, "foods__breakfast", obj['foods']['breakfast']
11+
12+
>>> keygetter({ "foods": { "breakfast": "cereal" } }, "foods__breakfast")
13+
'cereal'
14+
>>> keygetter({ "foods": { "breakfast": "cereal" } }, "foods")
15+
{'breakfast': 'cereal'}
16+
17+
"""
18+
try:
19+
sub_fields = path.split("__")
20+
dct = obj
21+
for sub_field in sub_fields:
22+
dct = dct[sub_field]
23+
return dct
24+
except Exception as e:
25+
traceback.print_exception(e)
26+
return None
27+
28+
29+
def parse_lookup(obj, path, lookup):
30+
"""Check if field lookup key, e.g. "my__path__contains" has comparator, return val.
31+
32+
If comparator not used or value not found, return None.
33+
34+
mykey__endswith("mykey") -> "mykey" else None
35+
36+
>>> parse_lookup({ "food": "red apple" }, "food__istartswith", "__istartswith")
37+
'red apple'
38+
"""
39+
try:
40+
if path.endswith(lookup):
41+
if field_name := path.rsplit(lookup)[0]:
42+
return keygetter(obj, field_name)
43+
except Exception as e:
44+
traceback.print_exception(e)
45+
return None
46+
47+
48+
class LookupProtocol(Protocol):
49+
def __call__(self, data: Union[list[str], str], rhs: Union[list[str], str]):
50+
pass
51+
52+
53+
def lookup_exact(data, rhs):
54+
return rhs == data
55+
56+
57+
def lookup_iexact(data, rhs):
58+
return rhs.lower() == data.lower()
59+
60+
61+
def lookup_contains(data, rhs):
62+
return rhs in data
63+
64+
65+
def lookup_icontains(data, rhs):
66+
return rhs.lower() in data.lower()
67+
68+
69+
def lookup_startswith(data, rhs):
70+
return data.startswith(rhs)
71+
72+
73+
def lookup_istartswith(data, rhs):
74+
return data.lower().startswith(rhs.lower())
75+
76+
77+
def lookup_endswith(data, rhs):
78+
return data.endswith(rhs)
79+
80+
81+
def lookup_iendswith(data, rhs):
82+
return data.lower().endswith(rhs.lower())
83+
84+
85+
def lookup_in(data, rhs):
86+
if isinstance(rhs, list):
87+
return data in rhs
88+
return rhs in data
89+
90+
91+
def lookup_nin(data, rhs):
92+
if isinstance(rhs, list):
93+
return data not in rhs
94+
return rhs not in data
95+
96+
97+
def lookup_regex(data, rhs):
98+
return re.search(rhs, data)
99+
100+
101+
def lookup_iregex(data, rhs):
102+
return re.search(rhs, data, re.IGNORECASE)
103+
104+
105+
LOOKUP_NAME_MAP: dict[str, LookupProtocol] = {
106+
"eq": lookup_exact,
107+
"exact": lookup_exact,
108+
"iexact": lookup_iexact,
109+
"contains": lookup_contains,
110+
"icontains": lookup_icontains,
111+
"startswith": lookup_startswith,
112+
"istartswith": lookup_istartswith,
113+
"endswith": lookup_endswith,
114+
"iendswith": lookup_iendswith,
115+
"in": lookup_in,
116+
"nin": lookup_nin,
117+
"regex": lookup_regex,
118+
"iregex": lookup_iregex,
119+
}
120+
121+
122+
@dataclasses.dataclass(eq=False)
123+
class ListQuery(Generic[T]):
124+
"""Filter a list of dicts. *Experimental and unstable*.
125+
126+
:py:func:`dataclasses.dataclass` is only used for ``__repr__`` and pytest comparison
127+
details.
128+
129+
>>> query = ListQuery(
130+
... [
131+
... {
132+
... "place": "Largo",
133+
... "city": "Tampa",
134+
... "state": "Florida",
135+
... "foods": {"fruit": ["banana", "orange"], "breakfast": "cereal"},
136+
... },
137+
... {
138+
... "place": "Chicago suburbs",
139+
... "city": "Elmhurst",
140+
... "state": "Illinois",
141+
... "foods": {"fruit": ["apple", "cantelope"], "breakfast": "waffles"},
142+
... },
143+
... ]
144+
... )
145+
>>> query.filter(place="Chicago suburbs").data[0]['city']
146+
'Elmhurst'
147+
>>> query.filter(place__icontains="chicago").data[0]['city']
148+
'Elmhurst'
149+
>>> query.filter(foods__breakfast="waffles").data[0]['city']
150+
'Elmhurst'
151+
>>> query.filter(foods__fruit__in="cantelope").data[0]['city']
152+
'Elmhurst'
153+
>>> query.filter(foods__fruit__in="orange").data[0]['city']
154+
'Tampa'
155+
"""
156+
157+
__slots__ = ("data", "pk_key")
158+
data: Sequence[T]
159+
160+
# def __init__(self, data, pk_key: Optional[str] = None):
161+
# self.data: Sequence[T] = data
162+
# #: Primary key for objects, optional.
163+
# #: Use for .get(), .items()
164+
# self.pk_key: Optional[Any] = pk_key
165+
166+
def items(self):
167+
data: Sequence[T]
168+
169+
if self.pk_key is None:
170+
raise Exception("items() require a pk_key exists")
171+
return [(getattr(item, self.pk_key), item) for item in self.data]
172+
173+
def __eq__(self, other):
174+
data = other
175+
if hasattr(data, "data"):
176+
data = getattr(data, "data")
177+
178+
if not isinstance(self.data, list) or not isinstance(data, list):
179+
return False
180+
181+
if len(self.data) == len(data):
182+
for (a, b) in zip(self.data, data):
183+
if isinstance(a, dict):
184+
a_keys = a.keys()
185+
if a.keys == b.keys():
186+
for key in a_keys:
187+
if abs(a[key] - b[key]) > 1:
188+
return False
189+
else:
190+
if a != b:
191+
return False
192+
193+
return True
194+
return False
195+
196+
def filter(self, matcher: Optional[Union[Callable[[T], bool], T]] = None, **kwargs):
197+
def filter_lookup(obj) -> bool:
198+
for path, v in kwargs.items():
199+
try:
200+
lhs, op = path.rsplit("__", 1)
201+
202+
if op not in LOOKUP_NAME_MAP:
203+
raise ValueError(f"{op} not in LOOKUP_NAME_MAP")
204+
except ValueError:
205+
lhs = path
206+
op = "exact"
207+
208+
assert op in LOOKUP_NAME_MAP
209+
path = lhs
210+
data = keygetter(obj, path)
211+
212+
if not LOOKUP_NAME_MAP[op](data, v):
213+
return False
214+
215+
return True
216+
217+
if callable(matcher):
218+
_filter = matcher
219+
elif matcher is not None:
220+
221+
def val_match(obj):
222+
if isinstance(matcher, list):
223+
return obj in matcher
224+
else:
225+
return obj == matcher
226+
227+
_filter = val_match
228+
else:
229+
_filter = filter_lookup
230+
231+
return self.__class__(data=[k for k in self.data if _filter(k)])

0 commit comments

Comments
 (0)