Skip to content

Commit 5235aaa

Browse files
Add optional Arrow deserialization support (#2632) (#2634)
(cherry picked from commit ef3da6e) Co-authored-by: Quentin Pradet <quentin.pradet@elastic.co>
1 parent 0868d57 commit 5235aaa

File tree

7 files changed

+69
-3
lines changed

7 files changed

+69
-3
lines changed

docs/guide/configuration.asciidoc

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -359,7 +359,7 @@ The calculation is equal to `min(dead_node_backoff_factor * (2 ** (consecutive_f
359359
[[serializer]]
360360
=== Serializers
361361

362-
Serializers transform bytes on the wire into native Python objects and vice-versa. By default the client ships with serializers for `application/json`, `application/x-ndjson`, `text/*`, and `application/mapbox-vector-tile`.
362+
Serializers transform bytes on the wire into native Python objects and vice-versa. By default the client ships with serializers for `application/json`, `application/x-ndjson`, `text/*`, `application/vnd.apache.arrow.stream` and `application/mapbox-vector-tile`.
363363

364364
You can define custom serializers via the `serializers` parameter:
365365

elasticsearch/serializer.py

Lines changed: 34 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -49,6 +49,14 @@
4949
_OrjsonSerializer = None # type: ignore[assignment,misc]
5050

5151

52+
try:
53+
import pyarrow as pa
54+
55+
__all__.append("PyArrowSerializer")
56+
except ImportError:
57+
pa = None
58+
59+
5260
class JsonSerializer(_JsonSerializer):
5361
mimetype: ClassVar[str] = "application/json"
5462

@@ -114,6 +122,29 @@ def dumps(self, data: bytes) -> bytes:
114122
raise SerializationError(f"Cannot serialize {data!r} into a MapBox vector tile")
115123

116124

125+
if pa is not None:
126+
127+
class PyArrowSerializer(Serializer):
128+
"""PyArrow serializer for deserializing Arrow Stream data."""
129+
130+
mimetype: ClassVar[str] = "application/vnd.apache.arrow.stream"
131+
132+
def loads(self, data: bytes) -> pa.Table:
133+
try:
134+
with pa.ipc.open_stream(data) as reader:
135+
return reader.read_all()
136+
except pa.ArrowException as e:
137+
raise SerializationError(
138+
message=f"Unable to deserialize as Arrow stream: {data!r}",
139+
errors=(e,),
140+
)
141+
142+
def dumps(self, data: Any) -> bytes:
143+
raise SerializationError(
144+
message="Elasticsearch does not accept Arrow input data"
145+
)
146+
147+
117148
DEFAULT_SERIALIZERS: Dict[str, Serializer] = {
118149
JsonSerializer.mimetype: JsonSerializer(),
119150
MapboxVectorTileSerializer.mimetype: MapboxVectorTileSerializer(),
@@ -122,6 +153,9 @@ def dumps(self, data: bytes) -> bytes:
122153
CompatibilityModeNdjsonSerializer.mimetype: CompatibilityModeNdjsonSerializer(),
123154
}
124155

156+
if pa is not None:
157+
DEFAULT_SERIALIZERS[PyArrowSerializer.mimetype] = PyArrowSerializer()
158+
125159
# Alias for backwards compatibility
126160
JSONSerializer = JsonSerializer
127161

noxfile.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -94,7 +94,7 @@ def lint(session):
9494
session.run("flake8", *SOURCE_FILES)
9595
session.run("python", "utils/license-headers.py", "check", *SOURCE_FILES)
9696

97-
session.install(".[async,requests,orjson,vectorstore_mmr]", env=INSTALL_ENV)
97+
session.install(".[async,requests,orjson,pyarrow,vectorstore_mmr]", env=INSTALL_ENV)
9898

9999
# Run mypy on the package and then the type examples separately for
100100
# the two different mypy use-cases, ourselves and our users.

pyproject.toml

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -48,6 +48,7 @@ dependencies = [
4848
async = ["aiohttp>=3,<4"]
4949
requests = ["requests>=2.4.0, !=2.32.2, <3.0.0"]
5050
orjson = ["orjson>=3"]
51+
pyarrow = ["pyarrow>=1"]
5152
# Maximal Marginal Relevance (MMR) for search results
5253
vectorstore_mmr = ["numpy>=1", "simsimd>=3"]
5354
dev = [
@@ -69,6 +70,7 @@ dev = [
6970
"orjson",
7071
"numpy",
7172
"simsimd",
73+
"pyarrow",
7274
"pandas",
7375
"mapbox-vector-tile",
7476
]

test_elasticsearch/test_client/test_deprecated_options.py

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -134,6 +134,7 @@ class CustomSerializer(JsonSerializer):
134134
"application/x-ndjson",
135135
"application/json",
136136
"text/*",
137+
"application/vnd.apache.arrow.stream",
137138
"application/vnd.elasticsearch+json",
138139
"application/vnd.elasticsearch+x-ndjson",
139140
}
@@ -154,6 +155,7 @@ class CustomSerializer(JsonSerializer):
154155
"application/x-ndjson",
155156
"application/json",
156157
"text/*",
158+
"application/vnd.apache.arrow.stream",
157159
"application/vnd.elasticsearch+json",
158160
"application/vnd.elasticsearch+x-ndjson",
159161
"application/cbor",

test_elasticsearch/test_client/test_serializers.py

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -94,6 +94,7 @@ class CustomSerializer:
9494
"application/json",
9595
"text/*",
9696
"application/x-ndjson",
97+
"application/vnd.apache.arrow.stream",
9798
"application/vnd.mapbox-vector-tile",
9899
"application/vnd.elasticsearch+json",
99100
"application/vnd.elasticsearch+x-ndjson",
@@ -121,6 +122,7 @@ class CustomSerializer:
121122
"application/json",
122123
"text/*",
123124
"application/x-ndjson",
125+
"application/vnd.apache.arrow.stream",
124126
"application/vnd.mapbox-vector-tile",
125127
"application/vnd.elasticsearch+json",
126128
"application/vnd.elasticsearch+x-ndjson",
@@ -140,6 +142,7 @@ class CustomSerializer:
140142
"application/json",
141143
"text/*",
142144
"application/x-ndjson",
145+
"application/vnd.apache.arrow.stream",
143146
"application/vnd.mapbox-vector-tile",
144147
"application/vnd.elasticsearch+json",
145148
"application/vnd.elasticsearch+x-ndjson",

test_elasticsearch/test_serializer.py

Lines changed: 26 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -19,6 +19,7 @@
1919
from datetime import datetime
2020
from decimal import Decimal
2121

22+
import pyarrow as pa
2223
import pytest
2324

2425
try:
@@ -31,7 +32,12 @@
3132

3233
from elasticsearch import Elasticsearch
3334
from elasticsearch.exceptions import SerializationError
34-
from elasticsearch.serializer import JSONSerializer, OrjsonSerializer, TextSerializer
35+
from elasticsearch.serializer import (
36+
JSONSerializer,
37+
OrjsonSerializer,
38+
PyArrowSerializer,
39+
TextSerializer,
40+
)
3541

3642
requires_numpy_and_pandas = pytest.mark.skipif(
3743
np is None or pd is None, reason="Test requires numpy and pandas to be available"
@@ -157,6 +163,25 @@ def test_serializes_pandas_category(json_serializer):
157163
assert b'{"d":[1,2,3]}' == json_serializer.dumps({"d": cat})
158164

159165

166+
def test_pyarrow_loads():
167+
data = [
168+
pa.array([1, 2, 3, 4]),
169+
pa.array(["foo", "bar", "baz", None]),
170+
pa.array([True, None, False, True]),
171+
]
172+
batch = pa.record_batch(data, names=["f0", "f1", "f2"])
173+
sink = pa.BufferOutputStream()
174+
with pa.ipc.new_stream(sink, batch.schema) as writer:
175+
writer.write_batch(batch)
176+
177+
serializer = PyArrowSerializer()
178+
assert serializer.loads(sink.getvalue()).to_pydict() == {
179+
"f0": [1, 2, 3, 4],
180+
"f1": ["foo", "bar", "baz", None],
181+
"f2": [True, None, False, True],
182+
}
183+
184+
160185
def test_json_raises_serialization_error_on_dump_error(json_serializer):
161186
with pytest.raises(SerializationError):
162187
json_serializer.dumps(object())

0 commit comments

Comments
 (0)