|
31 | 31 | #
|
32 | 32 |
|
33 | 33 | """ Identify specific nodes in a JSON document (RFC 6901) """
|
| 34 | +from .jsonpointer import JsonPointerException, JsonPointer, resolve_pointer, set_pointer, EndOfList |
34 | 35 |
|
35 |
| -# Will be parsed by setup.py to determine package metadata |
36 |
| -__author__ = 'Stefan Kögl <stefan@skoegl.net>' |
37 |
| -__version__ = '3.0.0' |
38 |
| -__website__ = 'https://github.com/stefankoegl/python-json-pointer' |
39 |
| -__license__ = 'Modified BSD License' |
40 |
| - |
41 |
| -import copy |
42 |
| -import re |
43 |
| -from collections.abc import Mapping, Sequence |
44 |
| -from itertools import tee, chain |
45 |
| - |
46 |
| -_nothing = object() |
47 |
| - |
48 |
| - |
49 |
| -def set_pointer(doc, pointer, value, inplace=True): |
50 |
| - """Resolves a pointer against doc and sets the value of the target within doc. |
51 |
| -
|
52 |
| - With inplace set to true, doc is modified as long as pointer is not the |
53 |
| - root. |
54 |
| -
|
55 |
| - >>> obj = {'foo': {'anArray': [ {'prop': 44}], 'another prop': {'baz': 'A string' }}} |
56 |
| -
|
57 |
| - >>> set_pointer(obj, '/foo/anArray/0/prop', 55) == \ |
58 |
| - {'foo': {'another prop': {'baz': 'A string'}, 'anArray': [{'prop': 55}]}} |
59 |
| - True |
60 |
| -
|
61 |
| - >>> set_pointer(obj, '/foo/yet another prop', 'added prop') == \ |
62 |
| - {'foo': {'another prop': {'baz': 'A string'}, 'yet another prop': 'added prop', 'anArray': [{'prop': 55}]}} |
63 |
| - True |
64 |
| -
|
65 |
| - >>> obj = {'foo': {}} |
66 |
| - >>> set_pointer(obj, '/foo/a%20b', 'x') == \ |
67 |
| - {'foo': {'a%20b': 'x' }} |
68 |
| - True |
69 |
| - """ |
70 |
| - |
71 |
| - pointer = JsonPointer(pointer) |
72 |
| - return pointer.set(doc, value, inplace) |
73 |
| - |
74 |
| - |
75 |
| -def resolve_pointer(doc, pointer, default=_nothing): |
76 |
| - """ Resolves pointer against doc and returns the referenced object |
77 |
| -
|
78 |
| - >>> obj = {'foo': {'anArray': [ {'prop': 44}], 'another prop': {'baz': 'A string' }}, 'a%20b': 1, 'c d': 2} |
79 |
| -
|
80 |
| - >>> resolve_pointer(obj, '') == obj |
81 |
| - True |
82 |
| -
|
83 |
| - >>> resolve_pointer(obj, '/foo') == obj['foo'] |
84 |
| - True |
85 |
| -
|
86 |
| - >>> resolve_pointer(obj, '/foo/another prop') == obj['foo']['another prop'] |
87 |
| - True |
88 |
| -
|
89 |
| - >>> resolve_pointer(obj, '/foo/another prop/baz') == obj['foo']['another prop']['baz'] |
90 |
| - True |
91 |
| -
|
92 |
| - >>> resolve_pointer(obj, '/foo/anArray/0') == obj['foo']['anArray'][0] |
93 |
| - True |
94 |
| -
|
95 |
| - >>> resolve_pointer(obj, '/some/path', None) == None |
96 |
| - True |
97 |
| -
|
98 |
| - >>> resolve_pointer(obj, '/a b', None) == None |
99 |
| - True |
100 |
| -
|
101 |
| - >>> resolve_pointer(obj, '/a%20b') == 1 |
102 |
| - True |
103 |
| -
|
104 |
| - >>> resolve_pointer(obj, '/c d') == 2 |
105 |
| - True |
106 |
| -
|
107 |
| - >>> resolve_pointer(obj, '/c%20d', None) == None |
108 |
| - True |
109 |
| - """ |
110 |
| - |
111 |
| - pointer = JsonPointer(pointer) |
112 |
| - return pointer.resolve(doc, default) |
113 |
| - |
114 |
| - |
115 |
| -def pairwise(iterable): |
116 |
| - """ Transforms a list to a list of tuples of adjacent items |
117 |
| -
|
118 |
| - s -> (s0,s1), (s1,s2), (s2, s3), ... |
119 |
| -
|
120 |
| - >>> list(pairwise([])) |
121 |
| - [] |
122 |
| -
|
123 |
| - >>> list(pairwise([1])) |
124 |
| - [] |
125 |
| -
|
126 |
| - >>> list(pairwise([1, 2, 3, 4])) |
127 |
| - [(1, 2), (2, 3), (3, 4)] |
128 |
| - """ |
129 |
| - a, b = tee(iterable) |
130 |
| - for _ in b: |
131 |
| - break |
132 |
| - return zip(a, b) |
133 |
| - |
134 |
| - |
135 |
| -class JsonPointerException(Exception): |
136 |
| - pass |
137 |
| - |
138 |
| - |
139 |
| -class EndOfList(object): |
140 |
| - """Result of accessing element "-" of a list""" |
141 |
| - |
142 |
| - def __init__(self, list_): |
143 |
| - self.list_ = list_ |
144 |
| - |
145 |
| - def __repr__(self): |
146 |
| - return '{cls}({lst})'.format(cls=self.__class__.__name__, |
147 |
| - lst=repr(self.list_)) |
148 |
| - |
149 |
| - |
150 |
| -class JsonPointer(object): |
151 |
| - """A JSON Pointer that can reference parts of a JSON document""" |
152 |
| - |
153 |
| - # Array indices must not contain: |
154 |
| - # leading zeros, signs, spaces, decimals, etc |
155 |
| - _RE_ARRAY_INDEX = re.compile('0|[1-9][0-9]*$') |
156 |
| - _RE_INVALID_ESCAPE = re.compile('(~[^01]|~$)') |
157 |
| - |
158 |
| - def __init__(self, pointer): |
159 |
| - |
160 |
| - # validate escapes |
161 |
| - invalid_escape = self._RE_INVALID_ESCAPE.search(pointer) |
162 |
| - if invalid_escape: |
163 |
| - raise JsonPointerException('Found invalid escape {}'.format( |
164 |
| - invalid_escape.group())) |
165 |
| - |
166 |
| - parts = pointer.split('/') |
167 |
| - if parts.pop(0) != '': |
168 |
| - raise JsonPointerException('Location must start with /') |
169 |
| - |
170 |
| - parts = [unescape(part) for part in parts] |
171 |
| - self.parts = parts |
172 |
| - |
173 |
| - def to_last(self, doc): |
174 |
| - """Resolves ptr until the last step, returns (sub-doc, last-step)""" |
175 |
| - |
176 |
| - if not self.parts: |
177 |
| - return doc, None |
178 |
| - |
179 |
| - for part in self.parts[:-1]: |
180 |
| - doc = self.walk(doc, part) |
181 |
| - |
182 |
| - return doc, JsonPointer.get_part(doc, self.parts[-1]) |
| 36 | +try: |
| 37 | + from importlib import metadata |
| 38 | +except ImportError: # for Python < 3.8 |
| 39 | + import importlib_metadata as metadata # type: ignore |
183 | 40 |
|
184 |
| - def resolve(self, doc, default=_nothing): |
185 |
| - """Resolves the pointer against doc and returns the referenced object""" |
186 |
| - |
187 |
| - for part in self.parts: |
188 |
| - |
189 |
| - try: |
190 |
| - doc = self.walk(doc, part) |
191 |
| - except JsonPointerException: |
192 |
| - if default is _nothing: |
193 |
| - raise |
194 |
| - else: |
195 |
| - return default |
196 |
| - |
197 |
| - return doc |
198 |
| - |
199 |
| - get = resolve |
200 |
| - |
201 |
| - def set(self, doc, value, inplace=True): |
202 |
| - """Resolve the pointer against the doc and replace the target with value.""" |
203 |
| - |
204 |
| - if len(self.parts) == 0: |
205 |
| - if inplace: |
206 |
| - raise JsonPointerException('Cannot set root in place') |
207 |
| - return value |
208 |
| - |
209 |
| - if not inplace: |
210 |
| - doc = copy.deepcopy(doc) |
211 |
| - |
212 |
| - (parent, part) = self.to_last(doc) |
213 |
| - |
214 |
| - if isinstance(parent, Sequence) and part == '-': |
215 |
| - parent.append(value) |
216 |
| - else: |
217 |
| - parent[part] = value |
218 |
| - |
219 |
| - return doc |
220 |
| - |
221 |
| - @classmethod |
222 |
| - def get_part(cls, doc, part): |
223 |
| - """Returns the next step in the correct type""" |
224 |
| - |
225 |
| - if isinstance(doc, Mapping): |
226 |
| - return part |
227 |
| - |
228 |
| - elif isinstance(doc, Sequence): |
229 |
| - |
230 |
| - if part == '-': |
231 |
| - return part |
232 |
| - |
233 |
| - if not JsonPointer._RE_ARRAY_INDEX.match(str(part)): |
234 |
| - raise JsonPointerException("'%s' is not a valid sequence index" % part) |
235 |
| - |
236 |
| - return int(part) |
237 |
| - |
238 |
| - elif hasattr(doc, '__getitem__'): |
239 |
| - # Allow indexing via ducktyping |
240 |
| - # if the target has defined __getitem__ |
241 |
| - return part |
242 |
| - |
243 |
| - else: |
244 |
| - raise JsonPointerException("Document '%s' does not support indexing, " |
245 |
| - "must be mapping/sequence or support __getitem__" % type(doc)) |
246 |
| - |
247 |
| - def get_parts(self): |
248 |
| - """Returns the list of the parts. For example, JsonPointer('/a/b').get_parts() == ['a', 'b']""" |
249 |
| - |
250 |
| - return self.parts |
251 |
| - |
252 |
| - def walk(self, doc, part): |
253 |
| - """ Walks one step in doc and returns the referenced part """ |
254 |
| - |
255 |
| - part = JsonPointer.get_part(doc, part) |
256 |
| - |
257 |
| - assert hasattr(doc, '__getitem__'), "invalid document type %s" % (type(doc),) |
258 |
| - |
259 |
| - if isinstance(doc, Sequence): |
260 |
| - if part == '-': |
261 |
| - return EndOfList(doc) |
262 |
| - |
263 |
| - try: |
264 |
| - return doc[part] |
265 |
| - |
266 |
| - except IndexError: |
267 |
| - raise JsonPointerException("index '%s' is out of bounds" % (part,)) |
268 |
| - |
269 |
| - # Else the object is a mapping or supports __getitem__(so assume custom indexing) |
270 |
| - try: |
271 |
| - return doc[part] |
272 |
| - |
273 |
| - except KeyError: |
274 |
| - raise JsonPointerException("member '%s' not found in %s" % (part, doc)) |
275 |
| - |
276 |
| - def contains(self, ptr): |
277 |
| - """ Returns True if self contains the given ptr """ |
278 |
| - return self.parts[:len(ptr.parts)] == ptr.parts |
279 |
| - |
280 |
| - def __contains__(self, item): |
281 |
| - """ Returns True if self contains the given ptr """ |
282 |
| - return self.contains(item) |
283 |
| - |
284 |
| - def join(self, suffix): |
285 |
| - """ Returns a new JsonPointer with the given suffix append to this ptr """ |
286 |
| - if isinstance(suffix, JsonPointer): |
287 |
| - suffix_parts = suffix.parts |
288 |
| - elif isinstance(suffix, str): |
289 |
| - suffix_parts = JsonPointer(suffix).parts |
290 |
| - else: |
291 |
| - suffix_parts = suffix |
292 |
| - try: |
293 |
| - return JsonPointer.from_parts(chain(self.parts, suffix_parts)) |
294 |
| - except: # noqa E722 |
295 |
| - raise JsonPointerException("Invalid suffix") |
296 |
| - |
297 |
| - def __truediv__(self, suffix): # Python 3 |
298 |
| - return self.join(suffix) |
299 |
| - |
300 |
| - @property |
301 |
| - def path(self): |
302 |
| - """Returns the string representation of the pointer |
303 |
| -
|
304 |
| - >>> ptr = JsonPointer('/~0/0/~1').path == '/~0/0/~1' |
305 |
| - """ |
306 |
| - parts = [escape(part) for part in self.parts] |
307 |
| - return ''.join('/' + part for part in parts) |
308 |
| - |
309 |
| - def __eq__(self, other): |
310 |
| - """Compares a pointer to another object |
311 |
| -
|
312 |
| - Pointers can be compared by comparing their strings (or splitted |
313 |
| - strings), because no two different parts can point to the same |
314 |
| - structure in an object (eg no different number representations) |
315 |
| - """ |
316 |
| - |
317 |
| - if not isinstance(other, JsonPointer): |
318 |
| - return False |
319 |
| - |
320 |
| - return self.parts == other.parts |
321 |
| - |
322 |
| - def __hash__(self): |
323 |
| - return hash(tuple(self.parts)) |
324 |
| - |
325 |
| - def __str__(self): |
326 |
| - return self.path |
327 |
| - |
328 |
| - def __repr__(self): |
329 |
| - return type(self).__name__ + "(" + repr(self.path) + ")" |
330 |
| - |
331 |
| - @classmethod |
332 |
| - def from_parts(cls, parts): |
333 |
| - """Constructs a JsonPointer from a list of (unescaped) paths |
334 |
| -
|
335 |
| - >>> JsonPointer.from_parts(['a', '~', '/', 0]).path == '/a/~0/~1/0' |
336 |
| - True |
337 |
| - """ |
338 |
| - parts = [escape(str(part)) for part in parts] |
339 |
| - ptr = cls(''.join('/' + part for part in parts)) |
340 |
| - return ptr |
341 |
| - |
342 |
| - |
343 |
| -def escape(s): |
344 |
| - return s.replace('~', '~0').replace('/', '~1') |
345 |
| - |
346 |
| - |
347 |
| -def unescape(s): |
348 |
| - return s.replace('~1', '/').replace('~0', '~') |
| 41 | +# Will be parsed by setup.py to determine package metadata |
| 42 | +jsonpointer_metadata = metadata.metadata("jsonpointer") |
| 43 | +__author__ = jsonpointer_metadata["Author"] |
| 44 | +__version__ = metadata.version("jsonpointer") |
| 45 | +__website__ = jsonpointer_metadata["Home-page"] |
| 46 | +__license__ = jsonpointer_metadata["License"] |
| 47 | + |
| 48 | +__all__ = [ |
| 49 | + "resolve_pointer", |
| 50 | + "set_pointer", |
| 51 | + "JsonPointerException", |
| 52 | + "JsonPointer", |
| 53 | + "EndOfList", |
| 54 | + "__author__", |
| 55 | + "__version__", |
| 56 | + "__website__", |
| 57 | + "__license__", |
| 58 | +] |
0 commit comments