Skip to content

Commit f9fce99

Browse files
authored
Merge pull request #222 from averbis/feature/218-Support-for-streaming-export-and-import-APIs
Issue #218: Support for streaming export and import APIs
2 parents e1365a1 + f7cf3de commit f9fce99

File tree

4 files changed

+643
-1
lines changed

4 files changed

+643
-1
lines changed

averbis/core/_rest_client.py

Lines changed: 223 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -17,6 +17,9 @@
1717
# limitations under the License.
1818
#
1919
#
20+
from __future__ import annotations
21+
22+
import ijson # type: ignore
2023
import copy
2124
from enum import Enum, auto
2225
from urllib.parse import quote
@@ -54,6 +57,7 @@
5457
from pathlib import Path
5558
import requests
5659
import mimetypes
60+
import weakref
5761

5862
from requests import RequestException
5963

@@ -106,6 +110,88 @@
106110
MEDIA_TYPE_BINARY_TSI,
107111
]
108112

113+
114+
class ExportDocumentStream:
115+
"""Iterator wrapper around a streaming export HTTP response.
116+
117+
The object wraps a requests.Response that was opened with stream=True and
118+
provides an iterator over the exported document objects (as parsed by
119+
ijson.items(..., "payload.documents.item")).
120+
121+
Behavior and guarantees:
122+
- The response is closed deterministically when the iterator is exhausted.
123+
- Calling ``close()`` will close the response immediately and cancel the
124+
weakref finalizer.
125+
- The object implements the context manager protocol: use ``with`` to
126+
ensure the response is closed on early exit.
127+
- A weakref finalizer is registered as a last-resort safety net to close
128+
the response if the wrapper is garbage-collected without explicit
129+
closing. Relying on this finalizer is discouraged; prefer deterministic
130+
closing via exhaust/close/with.
131+
- Provides a small ``__or__`` helper so callers can write
132+
``export_stream | target.import_json_document_stream`` as a convenience.
133+
"""
134+
135+
def __init__(self, response: requests.Response):
136+
# _resp may be set to None when closed; annotate as Optional
137+
self._resp: Optional[requests.Response] = response
138+
self._iter = ijson.items(self._resp.raw, "payload.documents.item")
139+
# Finalizer ensures response is closed if this object is dropped
140+
self._finalizer = weakref.finalize(self, self._safe_close)
141+
142+
def __iter__(self):
143+
return self
144+
145+
def __next__(self):
146+
try:
147+
return next(self._iter)
148+
except StopIteration:
149+
# fully consumed -> close response
150+
self.close()
151+
raise
152+
except Exception:
153+
# parsing or IO error -> close and re-raise
154+
self.close()
155+
raise
156+
157+
def close(self) -> None:
158+
"""Idempotently close the underlying response and detach finalizer."""
159+
resp = getattr(self, "_resp", None)
160+
if resp is not None:
161+
try:
162+
resp.close()
163+
finally:
164+
# clear stored reference and detach finalizer
165+
self._resp = None
166+
try:
167+
self._finalizer.detach()
168+
except Exception:
169+
pass
170+
171+
def _safe_close(self) -> None:
172+
try:
173+
resp = getattr(self, "_resp", None)
174+
if resp is not None:
175+
resp.close()
176+
except Exception:
177+
pass
178+
179+
def __enter__(self):
180+
return self
181+
182+
def __exit__(self, exc_type, exc, tb):
183+
self.close()
184+
return False
185+
186+
def __or__(self, other):
187+
"""Support convenience piping: export_stream | target.import_json_document_stream
188+
189+
Returns the result of calling ``other(self)`` which matches the
190+
conventional import method signature accepting an iterator/generator.
191+
"""
192+
return other(self)
193+
194+
109195
DOCUMENT_IMPORTER_CAS = "CAS Importer"
110196
DOCUMENT_IMPORTER_SOLR = "Solr XML Importer"
111197
DOCUMENT_IMPORTER_TEXT = "Text Importer"
@@ -1841,6 +1927,75 @@ def list_processes(self) -> List[Process]:
18411927
if process.document_source_name == self.name
18421928
]
18431929

1930+
@experimental_api
1931+
def export_json_document_stream(
1932+
self, document_names: Optional[List[str]] = None
1933+
) -> ExportDocumentStream:
1934+
"""
1935+
HIGHLY EXPERIMENTAL API - may soon change or disappear.
1936+
1937+
Export documents from a collection and return a generator of document objects.
1938+
1939+
Args:
1940+
document_names: Optional list of document names to export (empty list exports all)
1941+
1942+
Yields:
1943+
Document dictionaries from the export response
1944+
"""
1945+
if document_names is None:
1946+
document_names = []
1947+
1948+
# Open the streaming response and return a safe iterator object which
1949+
# ensures the underlying response is closed when fully consumed,
1950+
# when closed explicitly, or when GC'd as a last-resort.
1951+
resp = self.project.client._export_document_stream(
1952+
self.project.name, self.name, document_names
1953+
)
1954+
resp.raise_for_status()
1955+
1956+
# Use the module-level ExportDocumentStream which provides the same
1957+
# iterator + deterministic close semantics and a weakref finalizer.
1958+
return ExportDocumentStream(resp)
1959+
1960+
@experimental_api
1961+
def import_json_document_stream(
1962+
self, document_generator: Iterator[Dict[str, Any]]
1963+
) -> dict:
1964+
"""
1965+
HIGHLY EXPERIMENTAL API - may soon change or disappear.
1966+
1967+
Import documents to a collection from a document generator.
1968+
1969+
Args:
1970+
document_generator: Iterator yielding document dictionaries
1971+
1972+
Returns:
1973+
Response object from the import request
1974+
"""
1975+
1976+
# POST streamed documents to import endpoint. We read and return the
1977+
# JSON payload from the server and ensure the response is closed
1978+
# deterministically before returning.
1979+
resp = self.project.client._import_document_stream(
1980+
self.project.name, self.name, document_generator
1981+
)
1982+
try:
1983+
resp.raise_for_status()
1984+
try:
1985+
payload = resp.json()
1986+
except Exception:
1987+
# If server did not return JSON, return raw text as fallback
1988+
try:
1989+
payload = resp.text
1990+
except Exception:
1991+
payload = None
1992+
return payload
1993+
finally:
1994+
try:
1995+
resp.close()
1996+
except Exception:
1997+
pass
1998+
18441999
def _get_document_identifier(self, document_name: str) -> str:
18452000
documents = [
18462001
document
@@ -4110,6 +4265,74 @@ def _list_processes(self, project: "Project") -> List[Process]:
41104265
processes.append(document_collection.get_process(item["processName"]))
41114266
return processes
41124267

4268+
def _export_document_stream(
4269+
self,
4270+
project_name: str,
4271+
document_collection_name: str,
4272+
document_names: List[str],
4273+
) -> requests.Response:
4274+
build_version = self.get_build_info()
4275+
if not self._is_higher_equal_version(build_version["platformVersion"], 9, 3):
4276+
raise OperationNotSupported(
4277+
f"The method 'export_documents' is only supported for platform versions >= 9.3.0 (available from Health Discovery version 7.5.0), but current platform is {build_version['platformVersion']}."
4278+
)
4279+
4280+
return requests.post(
4281+
self._build_url(
4282+
f"/experimental/projects/{project_name}/documentCollections/{document_collection_name}/export"
4283+
),
4284+
headers=self._default_headers(
4285+
{
4286+
HEADER_ACCEPT: MEDIA_TYPE_ANY,
4287+
HEADER_CONTENT_TYPE: MEDIA_TYPE_APPLICATION_JSON,
4288+
}
4289+
),
4290+
json={"documentNames": document_names},
4291+
stream=True,
4292+
)
4293+
4294+
def _import_document_stream(
4295+
self,
4296+
project_name: str,
4297+
document_collection_name: str,
4298+
document_generator: Iterator[Dict[str, Any]],
4299+
):
4300+
build_version = self.get_build_info()
4301+
if not self._is_higher_equal_version(build_version["platformVersion"], 9, 3):
4302+
raise OperationNotSupported(
4303+
f"The method 'import_documents' is only supported for platform versions >= 9.3.0 (available from Health Discovery version 7.5.0), but current platform is {build_version['platformVersion']}."
4304+
)
4305+
4306+
def generate_import_stream():
4307+
"""Yield import JSON bytes"""
4308+
yield b'{"documents":['
4309+
first = True
4310+
count = 0
4311+
for doc in document_generator:
4312+
count += 1
4313+
if not first:
4314+
yield b","
4315+
else:
4316+
first = False
4317+
yield json.dumps(doc).encode("utf-8")
4318+
yield b"]}"
4319+
4320+
return requests.post(
4321+
self._build_url(
4322+
f"/experimental/projects/{project_name}/documentCollections/{document_collection_name}/import"
4323+
),
4324+
headers=self._default_headers(
4325+
{
4326+
HEADER_ACCEPT: MEDIA_TYPE_ANY,
4327+
HEADER_CONTENT_TYPE: MEDIA_TYPE_APPLICATION_JSON,
4328+
}
4329+
),
4330+
# requests expects an iterable or file-like object; call the
4331+
# generator function to get an iterator over bytes
4332+
data=generate_import_stream(),
4333+
stream=True,
4334+
)
4335+
41134336
def _delete_document(
41144337
self, project_name: str, document_collection_name: str, document_name: str
41154338
) -> dict:

pyproject.toml

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -7,7 +7,8 @@ requires-python = ">=3.9.0"
77
dependencies = [
88
"requests",
99
"types-requests",
10-
"dkpro-cassis"
10+
"dkpro-cassis",
11+
"ijson"
1112
]
1213
authors = [
1314
{ name = "Averbis GmbH" }

0 commit comments

Comments
 (0)