Skip to content

Commit b1e974b

Browse files
committed
Add the 'X-Elastic-Client-Meta' header
1 parent 81a08b0 commit b1e974b

File tree

13 files changed

+271
-14
lines changed

13 files changed

+271
-14
lines changed

elasticsearch/__init__.py

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -98,5 +98,5 @@
9898
"AsyncTransport",
9999
"AsyncElasticsearch",
100100
]
101-
except (ImportError, SyntaxError):
102-
pass
101+
except (ImportError, SyntaxError) as e:
102+
print(e, type(e).__name__)

elasticsearch/_async/http_aiohttp.py

Lines changed: 15 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -22,7 +22,11 @@
2222
import warnings
2323
from ._extra_imports import aiohttp_exceptions, aiohttp, yarl
2424
from .compat import get_running_loop
25-
from ..connection import Connection
25+
from ..connection.base import (
26+
Connection,
27+
_get_client_meta_header,
28+
_python_to_meta_version,
29+
)
2630
from ..compat import urlencode
2731
from ..exceptions import (
2832
ConnectionError,
@@ -219,8 +223,11 @@ async def perform_request(
219223
orig_body = body
220224
url_path = self.url_prefix + url
221225
if params:
226+
# Pop client metadata from parameters, if any.
227+
client_meta = tuple(params.pop("_client_meta", ()))
222228
query_string = urlencode(params)
223229
else:
230+
client_meta = ()
224231
query_string = ""
225232

226233
# There is a bug in aiohttp that disables the re-use
@@ -268,6 +275,13 @@ async def perform_request(
268275
body = self._gzip_compress(body)
269276
req_headers["content-encoding"] = "gzip"
270277

278+
# Create meta header for aiohttp
279+
if self.meta_header:
280+
client_meta = (
281+
("ai", _python_to_meta_version(aiohttp.__version__)),
282+
) + client_meta
283+
req_headers["x-elastic-client-meta"] = _get_client_meta_header(client_meta)
284+
271285
start = self.loop.time()
272286
try:
273287
async with self.session.request(

elasticsearch/_async/http_aiohttp.pyi

Lines changed: 4 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -16,19 +16,19 @@
1616
# under the License.
1717

1818
from ._extra_imports import aiohttp # type: ignore
19-
from typing import Optional, Mapping, Collection, Union, Any, Tuple
19+
from typing import Optional, Mapping, MutableMapping, Collection, Union, Any, Tuple
2020
from ..connection import Connection
2121

2222
class AsyncConnection(Connection):
2323
async def perform_request( # type: ignore
2424
self,
2525
method: str,
2626
url: str,
27-
params: Optional[Mapping[str, Any]] = ...,
27+
params: Optional[MutableMapping[str, Any]] = ...,
2828
body: Optional[bytes] = ...,
2929
timeout: Optional[Union[int, float]] = ...,
3030
ignore: Collection[int] = ...,
31-
headers: Optional[Mapping[str, str]] = ...,
31+
headers: Optional[MutableMapping[str, str]] = ...,
3232
) -> Tuple[int, Mapping[str, str], str]: ...
3333
async def close(self) -> None: ...
3434

@@ -55,6 +55,7 @@ class AIOHttpConnection(AsyncConnection):
5555
cloud_id: Optional[str] = ...,
5656
api_key: Optional[Any] = ...,
5757
opaque_id: Optional[str] = ...,
58+
meta_header: bool = ...,
5859
loop: Any = ...,
5960
**kwargs: Any,
6061
) -> None: ...

elasticsearch/connection/base.py

Lines changed: 32 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -17,10 +17,12 @@
1717

1818
import logging
1919
import binascii
20+
from functools import lru_cache
2021
import gzip
2122
import io
2223
import re
2324
from platform import python_version
25+
import sys
2426
import warnings
2527

2628
try:
@@ -65,6 +67,8 @@ class Connection(object):
6567
:arg cloud_id: The Cloud ID from ElasticCloud. Convenient way to connect to cloud instances.
6668
:arg opaque_id: Send this value in the 'X-Opaque-Id' HTTP header
6769
For tracing all requests made by this transport.
70+
:arg meta_header: If True will send the 'X-Elastic-Client-Meta' HTTP header containing
71+
simple client metadata. Setting to False will disable the header. Defaults to True.
6872
"""
6973

7074
def __init__(
@@ -79,6 +83,7 @@ def __init__(
7983
cloud_id=None,
8084
api_key=None,
8185
opaque_id=None,
86+
meta_header=True,
8287
**kwargs
8388
):
8489

@@ -148,6 +153,10 @@ def __init__(
148153
self.url_prefix = url_prefix
149154
self.timeout = timeout
150155

156+
if not isinstance(meta_header, bool):
157+
raise TypeError("meta_header must be of type bool")
158+
self.meta_header = meta_header
159+
151160
def __repr__(self):
152161
return "<%s: %s>" % (self.__class__.__name__, self.host)
153162

@@ -329,3 +338,26 @@ def _get_api_key_header_val(self, api_key):
329338
s = "{0}:{1}".format(api_key[0], api_key[1]).encode("utf-8")
330339
return "ApiKey " + binascii.b2a_base64(s).rstrip(b"\r\n").decode("utf-8")
331340
return "ApiKey " + api_key
341+
342+
343+
def _python_to_meta_version(version):
344+
"""Transforms a Python package version to one
345+
compatible with 'X-Elastic-Client-Meta'. Essentially
346+
replaces any pre-release information with a 'p' suffix.
347+
"""
348+
version, version_pre = re.match(r"^([0-9.]+)(.*)$", version).groups()
349+
if version_pre:
350+
version += "p"
351+
return version
352+
353+
354+
@lru_cache()
355+
def _get_client_meta_header(client_meta=()):
356+
"""Builds an 'X-Elastic-Client-Meta' HTTP header"""
357+
es_version = _python_to_meta_version(__versionstr__)
358+
py_version = python_version() + ("p" if sys.version_info[3] != "final" else "")
359+
# First three values have to be 'service', 'language', 'transport'
360+
client_meta = (("es", es_version), ("py", py_version), ("t", es_version)) + tuple(
361+
client_meta
362+
)
363+
return ",".join("%s=%s" % (k, v) for k, v in client_meta)

elasticsearch/connection/base.pyi

Lines changed: 5 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -21,13 +21,13 @@ from typing import (
2121
Union,
2222
Optional,
2323
Mapping,
24+
MutableMapping,
2425
Tuple,
2526
List,
2627
NoReturn,
2728
Dict,
2829
Sequence,
2930
Any,
30-
AnyStr,
3131
Collection,
3232
)
3333

@@ -44,6 +44,7 @@ class Connection(object):
4444
host: str
4545
url_prefix: str
4646
timeout: Optional[Union[float, int]]
47+
meta_header: bool
4748
def __init__(
4849
self,
4950
host: str = ...,
@@ -56,6 +57,7 @@ class Connection(object):
5657
cloud_id: Optional[str] = ...,
5758
api_key: Optional[Union[Tuple[str, str], List[str], str]] = ...,
5859
opaque_id: Optional[str] = ...,
60+
meta_header: bool = ...,
5961
**kwargs: Any
6062
) -> None: ...
6163
def __repr__(self) -> str: ...
@@ -77,11 +79,11 @@ class Connection(object):
7779
self,
7880
method: str,
7981
url: str,
80-
params: Optional[Mapping[str, Any]] = ...,
82+
params: Optional[MutableMapping[str, Any]] = ...,
8183
body: Optional[bytes] = ...,
8284
timeout: Optional[Union[int, float]] = ...,
8385
ignore: Collection[int] = ...,
84-
headers: Optional[Mapping[str, str]] = ...,
86+
headers: Optional[MutableMapping[str, str]] = ...,
8587
) -> Tuple[int, Mapping[str, str], str]: ...
8688
def log_request_success(
8789
self,

elasticsearch/connection/http_requests.py

Lines changed: 13 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -25,7 +25,7 @@
2525
except ImportError:
2626
REQUESTS_AVAILABLE = False
2727

28-
from .base import Connection
28+
from .base import Connection, _get_client_meta_header, _python_to_meta_version
2929
from ..exceptions import (
3030
ConnectionError,
3131
ImproperlyConfigured,
@@ -142,13 +142,24 @@ def perform_request(
142142
url = self.base_url + url
143143
headers = headers or {}
144144
if params:
145-
url = "%s?%s" % (url, urlencode(params or {}))
145+
# Pop client metadata from parameters, if any.
146+
client_meta = params.pop("_client_meta", ())
147+
url = "%s?%s" % (url, urlencode(params))
148+
else:
149+
client_meta = ()
146150

147151
orig_body = body
148152
if self.http_compress and body:
149153
body = self._gzip_compress(body)
150154
headers["content-encoding"] = "gzip"
151155

156+
# Create meta header for requests
157+
if self.meta_header:
158+
client_meta = (
159+
("rq", _python_to_meta_version(requests.__version__)),
160+
) + client_meta
161+
headers["x-elastic-client-meta"] = _get_client_meta_header(client_meta)
162+
152163
start = time.time()
153164
request = requests.Request(method=method, headers=headers, url=url, data=body)
154165
prepared_request = self.session.prepare_request(request)

elasticsearch/connection/http_requests.pyi

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -37,5 +37,6 @@ class RequestsHttpConnection(Connection):
3737
cloud_id: Optional[str] = ...,
3838
api_key: Optional[Any] = ...,
3939
opaque_id: Optional[str] = ...,
40+
meta_header: bool = ...,
4041
**kwargs: Any
4142
) -> None: ...

elasticsearch/connection/http_urllib3.py

Lines changed: 16 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -22,7 +22,7 @@
2222
from urllib3.util.retry import Retry # type: ignore
2323
import warnings
2424

25-
from .base import Connection
25+
from .base import Connection, _get_client_meta_header, _python_to_meta_version
2626
from ..exceptions import (
2727
ConnectionError,
2828
ImproperlyConfigured,
@@ -216,8 +216,14 @@ def perform_request(
216216
self, method, url, params=None, body=None, timeout=None, ignore=(), headers=None
217217
):
218218
url = self.url_prefix + url
219+
# Pop client metadata from parameters, if any.
220+
if params:
221+
client_meta = tuple(params.pop("_client_meta", ()))
222+
else:
223+
client_meta = ()
219224
if params:
220225
url = "%s?%s" % (url, urlencode(params))
226+
221227
full_url = self.host + url
222228

223229
start = time.time()
@@ -242,6 +248,15 @@ def perform_request(
242248
body = self._gzip_compress(body)
243249
request_headers["content-encoding"] = "gzip"
244250

251+
# Create meta header for urllib3
252+
if self.meta_header:
253+
client_meta = (
254+
("ur", _python_to_meta_version(urllib3.__version__)),
255+
) + client_meta
256+
request_headers["x-elastic-client-meta"] = _get_client_meta_header(
257+
client_meta
258+
)
259+
245260
response = self.pool.urlopen(
246261
method, url, body, retries=Retry(False), headers=request_headers, **kw
247262
)

elasticsearch/connection/http_urllib3.pyi

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -51,5 +51,6 @@ class Urllib3HttpConnection(Connection):
5151
cloud_id: Optional[str] = ...,
5252
api_key: Optional[Any] = ...,
5353
opaque_id: Optional[str] = ...,
54+
meta_header: bool = ...,
5455
**kwargs: Any
5556
) -> None: ...

elasticsearch/helpers/actions.py

Lines changed: 15 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -230,6 +230,8 @@ def _process_bulk_chunk(
230230
"""
231231
Send a bulk request to elasticsearch and process the output.
232232
"""
233+
kwargs = _add_helper_meta_to_kwargs(kwargs, "bp")
234+
233235
try:
234236
# send the actual request
235237
resp = client.bulk("\n".join(bulk_actions) + "\n", *args, **kwargs)
@@ -248,6 +250,13 @@ def _process_bulk_chunk(
248250
yield item
249251

250252

253+
def _add_helper_meta_to_kwargs(kwargs, helper_meta):
254+
params = (kwargs or {}).pop("params", {})
255+
params["_client_meta"] = (("h", helper_meta),)
256+
kwargs["params"] = params
257+
return kwargs
258+
259+
251260
def streaming_bulk(
252261
client,
253262
actions,
@@ -515,6 +524,7 @@ def scan(
515524
516525
"""
517526
scroll_kwargs = scroll_kwargs or {}
527+
_add_helper_meta_to_kwargs(scroll_kwargs, "s")
518528

519529
if not preserve_order:
520530
query = query.copy() if query else {}
@@ -562,7 +572,11 @@ def scan(
562572

563573
finally:
564574
if scroll_id and clear_scroll:
565-
client.clear_scroll(body={"scroll_id": [scroll_id]}, ignore=(404,))
575+
client.clear_scroll(
576+
body={"scroll_id": [scroll_id]},
577+
ignore=(404,),
578+
params={"_client_meta": (("h", "s"),)},
579+
)
566580

567581

568582
def reindex(

test_elasticsearch/test_async/test_connection.py

Lines changed: 41 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -17,6 +17,7 @@
1717
# under the License.
1818

1919
import ssl
20+
import re
2021
import gzip
2122
import io
2223
from mock import patch
@@ -316,3 +317,43 @@ async def test_surrogatepass_into_bytes(self):
316317
con = await self._get_mock_connection(response_body=buf)
317318
status, headers, data = await con.perform_request("GET", "/")
318319
assert u"你好\uda6a" == data
320+
321+
async def test_meta_header_value(self):
322+
con = await self._get_mock_connection()
323+
assert con.meta_header is True
324+
325+
await con.perform_request("GET", "/", body=b"{}")
326+
327+
_, kwargs = con.session.request.call_args
328+
headers = kwargs["headers"]
329+
assert re.match(
330+
r"^es=[0-9]+\.[0-9]+\.[0-9]+p?,py=[0-9]+\.[0-9]+\.[0-9]+p?,"
331+
r"t=[0-9]+\.[0-9]+\.[0-9]+p?,ai=[0-9]+\.[0-9]+\.[0-9]+p?$",
332+
headers["x-elastic-client-meta"],
333+
)
334+
335+
con = await self._get_mock_connection()
336+
assert con.meta_header is True
337+
338+
await con.perform_request(
339+
"GET", "/", body=b"{}", params={"_client_meta": (("h", "bp"),)}
340+
)
341+
342+
(method, url), kwargs = con.session.request.call_args
343+
headers = kwargs["headers"]
344+
assert method == "GET"
345+
assert str(url) == "http://localhost:9200/"
346+
assert re.match(
347+
r"^es=[0-9]+\.[0-9]+\.[0-9]+p?,py=[0-9]+\.[0-9]+\.[0-9]+p?,"
348+
r"t=[0-9]+\.[0-9]+\.[0-9]+p?,ai=[0-9]+\.[0-9]+\.[0-9]+p?,h=bp$",
349+
headers["x-elastic-client-meta"],
350+
)
351+
352+
con = await self._get_mock_connection(connection_params={"meta_header": False})
353+
assert con.meta_header is False
354+
355+
await con.perform_request("GET", "/", body=b"{}")
356+
357+
_, kwargs = con.session.request.call_args
358+
headers = kwargs["headers"]
359+
assert "x-elastic-client-meta" not in (x.lower() for x in headers)

0 commit comments

Comments
 (0)