Skip to content

Add the 'X-Elastic-Client-Meta' header #1473

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 1 commit into from
Dec 14, 2020
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
18 changes: 17 additions & 1 deletion elasticsearch/_async/http_aiohttp.py
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,11 @@
import warnings
from ._extra_imports import aiohttp_exceptions, aiohttp, yarl
from .compat import get_running_loop
from ..connection import Connection
from ..connection.base import (
Connection,
_get_client_meta_header,
_python_to_meta_version,
)
from ..compat import urlencode
from ..exceptions import (
ConnectionError,
Expand Down Expand Up @@ -218,6 +222,11 @@ async def perform_request(

orig_body = body
url_path = self.url_prefix + url
if params:
# Pop client metadata from parameters, if any.
client_meta = tuple(params.pop("_client_meta", ()))
else:
client_meta = ()
if params:
query_string = urlencode(params)
else:
Expand Down Expand Up @@ -268,6 +277,13 @@ async def perform_request(
body = self._gzip_compress(body)
req_headers["content-encoding"] = "gzip"

# Create meta header for aiohttp
if self.meta_header:
client_meta = (
("ai", _python_to_meta_version(aiohttp.__version__)),
) + client_meta
req_headers["x-elastic-client-meta"] = _get_client_meta_header(client_meta)

start = self.loop.time()
try:
async with self.session.request(
Expand Down
7 changes: 4 additions & 3 deletions elasticsearch/_async/http_aiohttp.pyi
Original file line number Diff line number Diff line change
Expand Up @@ -16,19 +16,19 @@
# under the License.

from ._extra_imports import aiohttp # type: ignore
from typing import Optional, Mapping, Collection, Union, Any, Tuple
from typing import Optional, Mapping, MutableMapping, Collection, Union, Any, Tuple
from ..connection import Connection

class AsyncConnection(Connection):
async def perform_request( # type: ignore
self,
method: str,
url: str,
params: Optional[Mapping[str, Any]] = ...,
params: Optional[MutableMapping[str, Any]] = ...,
body: Optional[bytes] = ...,
timeout: Optional[Union[int, float]] = ...,
ignore: Collection[int] = ...,
headers: Optional[Mapping[str, str]] = ...,
headers: Optional[MutableMapping[str, str]] = ...,
) -> Tuple[int, Mapping[str, str], str]: ...
async def close(self) -> None: ...

Expand All @@ -55,6 +55,7 @@ class AIOHttpConnection(AsyncConnection):
cloud_id: Optional[str] = ...,
api_key: Optional[Any] = ...,
opaque_id: Optional[str] = ...,
meta_header: bool = ...,
loop: Any = ...,
**kwargs: Any,
) -> None: ...
30 changes: 30 additions & 0 deletions elasticsearch/connection/base.py
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@
import io
import re
from platform import python_version
import sys
import warnings

try:
Expand Down Expand Up @@ -65,6 +66,8 @@ class Connection(object):
:arg cloud_id: The Cloud ID from ElasticCloud. Convenient way to connect to cloud instances.
:arg opaque_id: Send this value in the 'X-Opaque-Id' HTTP header
For tracing all requests made by this transport.
:arg meta_header: If True will send the 'X-Elastic-Client-Meta' HTTP header containing
simple client metadata. Setting to False will disable the header. Defaults to True.
"""

def __init__(
Expand All @@ -79,6 +82,7 @@ def __init__(
cloud_id=None,
api_key=None,
opaque_id=None,
meta_header=True,
**kwargs
):

Expand Down Expand Up @@ -148,6 +152,10 @@ def __init__(
self.url_prefix = url_prefix
self.timeout = timeout

if not isinstance(meta_header, bool):
raise TypeError("meta_header must be of type bool")
self.meta_header = meta_header

def __repr__(self):
return "<%s: %s>" % (self.__class__.__name__, self.host)

Expand Down Expand Up @@ -329,3 +337,25 @@ def _get_api_key_header_val(self, api_key):
s = "{0}:{1}".format(api_key[0], api_key[1]).encode("utf-8")
return "ApiKey " + binascii.b2a_base64(s).rstrip(b"\r\n").decode("utf-8")
return "ApiKey " + api_key


def _python_to_meta_version(version):
"""Transforms a Python package version to one
compatible with 'X-Elastic-Client-Meta'. Essentially
replaces any pre-release information with a 'p' suffix.
"""
version, version_pre = re.match(r"^([0-9.]+)(.*)$", version).groups()
if version_pre:
version += "p"
return version


def _get_client_meta_header(client_meta=()):
"""Builds an 'X-Elastic-Client-Meta' HTTP header"""
es_version = _python_to_meta_version(__versionstr__)
py_version = python_version() + ("p" if sys.version_info[3] != "final" else "")
# First three values have to be 'service', 'language', 'transport'
client_meta = (("es", es_version), ("py", py_version), ("t", es_version)) + tuple(
client_meta
)
return ",".join("%s=%s" % (k, v) for k, v in client_meta)
8 changes: 5 additions & 3 deletions elasticsearch/connection/base.pyi
Original file line number Diff line number Diff line change
Expand Up @@ -21,13 +21,13 @@ from typing import (
Union,
Optional,
Mapping,
MutableMapping,
Tuple,
List,
NoReturn,
Dict,
Sequence,
Any,
AnyStr,
Collection,
)

Expand All @@ -44,6 +44,7 @@ class Connection(object):
host: str
url_prefix: str
timeout: Optional[Union[float, int]]
meta_header: bool
def __init__(
self,
host: str = ...,
Expand All @@ -56,6 +57,7 @@ class Connection(object):
cloud_id: Optional[str] = ...,
api_key: Optional[Union[Tuple[str, str], List[str], str]] = ...,
opaque_id: Optional[str] = ...,
meta_header: bool = ...,
**kwargs: Any
) -> None: ...
def __repr__(self) -> str: ...
Expand All @@ -77,11 +79,11 @@ class Connection(object):
self,
method: str,
url: str,
params: Optional[Mapping[str, Any]] = ...,
params: Optional[MutableMapping[str, Any]] = ...,
body: Optional[bytes] = ...,
timeout: Optional[Union[int, float]] = ...,
ignore: Collection[int] = ...,
headers: Optional[Mapping[str, str]] = ...,
headers: Optional[MutableMapping[str, str]] = ...,
) -> Tuple[int, Mapping[str, str], str]: ...
def log_request_success(
self,
Expand Down
16 changes: 14 additions & 2 deletions elasticsearch/connection/http_requests.py
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@
except ImportError:
REQUESTS_AVAILABLE = False

from .base import Connection
from .base import Connection, _get_client_meta_header, _python_to_meta_version
from ..exceptions import (
ConnectionError,
ImproperlyConfigured,
Expand Down Expand Up @@ -142,13 +142,25 @@ def perform_request(
url = self.base_url + url
headers = headers or {}
if params:
url = "%s?%s" % (url, urlencode(params or {}))
# Pop client metadata from parameters, if any.
client_meta = params.pop("_client_meta", ())
else:
client_meta = ()
if params:
url = "%s?%s" % (url, urlencode(params))

orig_body = body
if self.http_compress and body:
body = self._gzip_compress(body)
headers["content-encoding"] = "gzip"

# Create meta header for requests
if self.meta_header:
client_meta = (
("rq", _python_to_meta_version(requests.__version__)),
) + client_meta
headers["x-elastic-client-meta"] = _get_client_meta_header(client_meta)

start = time.time()
request = requests.Request(method=method, headers=headers, url=url, data=body)
prepared_request = self.session.prepare_request(request)
Expand Down
1 change: 1 addition & 0 deletions elasticsearch/connection/http_requests.pyi
Original file line number Diff line number Diff line change
Expand Up @@ -37,5 +37,6 @@ class RequestsHttpConnection(Connection):
cloud_id: Optional[str] = ...,
api_key: Optional[Any] = ...,
opaque_id: Optional[str] = ...,
meta_header: bool = ...,
**kwargs: Any
) -> None: ...
17 changes: 16 additions & 1 deletion elasticsearch/connection/http_urllib3.py
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@
from urllib3.util.retry import Retry # type: ignore
import warnings

from .base import Connection
from .base import Connection, _get_client_meta_header, _python_to_meta_version
from ..exceptions import (
ConnectionError,
ImproperlyConfigured,
Expand Down Expand Up @@ -216,8 +216,14 @@ def perform_request(
self, method, url, params=None, body=None, timeout=None, ignore=(), headers=None
):
url = self.url_prefix + url
# Pop client metadata from parameters, if any.
if params:
client_meta = tuple(params.pop("_client_meta", ()))
else:
client_meta = ()
if params:
url = "%s?%s" % (url, urlencode(params))

full_url = self.host + url

start = time.time()
Expand All @@ -242,6 +248,15 @@ def perform_request(
body = self._gzip_compress(body)
request_headers["content-encoding"] = "gzip"

# Create meta header for urllib3
if self.meta_header:
client_meta = (
("ur", _python_to_meta_version(urllib3.__version__)),
) + client_meta
request_headers["x-elastic-client-meta"] = _get_client_meta_header(
client_meta
)

response = self.pool.urlopen(
method, url, body, retries=Retry(False), headers=request_headers, **kw
)
Expand Down
1 change: 1 addition & 0 deletions elasticsearch/connection/http_urllib3.pyi
Original file line number Diff line number Diff line change
Expand Up @@ -51,5 +51,6 @@ class Urllib3HttpConnection(Connection):
cloud_id: Optional[str] = ...,
api_key: Optional[Any] = ...,
opaque_id: Optional[str] = ...,
meta_header: bool = ...,
**kwargs: Any
) -> None: ...
16 changes: 15 additions & 1 deletion elasticsearch/helpers/actions.py
Original file line number Diff line number Diff line change
Expand Up @@ -230,6 +230,8 @@ def _process_bulk_chunk(
"""
Send a bulk request to elasticsearch and process the output.
"""
kwargs = _add_helper_meta_to_kwargs(kwargs, "bp")

try:
# send the actual request
resp = client.bulk("\n".join(bulk_actions) + "\n", *args, **kwargs)
Expand All @@ -248,6 +250,13 @@ def _process_bulk_chunk(
yield item


def _add_helper_meta_to_kwargs(kwargs, helper_meta):
params = (kwargs or {}).pop("params", {})
params["_client_meta"] = (("h", helper_meta),)
kwargs["params"] = params
return kwargs


def streaming_bulk(
client,
actions,
Expand Down Expand Up @@ -515,6 +524,7 @@ def scan(

"""
scroll_kwargs = scroll_kwargs or {}
_add_helper_meta_to_kwargs(scroll_kwargs, "s")

if not preserve_order:
query = query.copy() if query else {}
Expand Down Expand Up @@ -562,7 +572,11 @@ def scan(

finally:
if scroll_id and clear_scroll:
client.clear_scroll(body={"scroll_id": [scroll_id]}, ignore=(404,))
client.clear_scroll(
body={"scroll_id": [scroll_id]},
ignore=(404,),
params={"_client_meta": (("h", "s"),)},
)


def reindex(
Expand Down
41 changes: 41 additions & 0 deletions test_elasticsearch/test_async/test_connection.py
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@
# under the License.

import ssl
import re
import gzip
import io
from mock import patch
Expand Down Expand Up @@ -316,3 +317,43 @@ async def test_surrogatepass_into_bytes(self):
con = await self._get_mock_connection(response_body=buf)
status, headers, data = await con.perform_request("GET", "/")
assert u"你好\uda6a" == data

async def test_meta_header_value(self):
con = await self._get_mock_connection()
assert con.meta_header is True

await con.perform_request("GET", "/", body=b"{}")

_, kwargs = con.session.request.call_args
headers = kwargs["headers"]
assert re.match(
r"^es=[0-9]+\.[0-9]+\.[0-9]+p?,py=[0-9]+\.[0-9]+\.[0-9]+p?,"
r"t=[0-9]+\.[0-9]+\.[0-9]+p?,ai=[0-9]+\.[0-9]+\.[0-9]+p?$",
headers["x-elastic-client-meta"],
)

con = await self._get_mock_connection()
assert con.meta_header is True

await con.perform_request(
"GET", "/", body=b"{}", params={"_client_meta": (("h", "bp"),)}
)

(method, url), kwargs = con.session.request.call_args
headers = kwargs["headers"]
assert method == "GET"
assert str(url) == "http://localhost:9200/"
assert re.match(
r"^es=[0-9]+\.[0-9]+\.[0-9]+p?,py=[0-9]+\.[0-9]+\.[0-9]+p?,"
r"t=[0-9]+\.[0-9]+\.[0-9]+p?,ai=[0-9]+\.[0-9]+\.[0-9]+p?,h=bp$",
headers["x-elastic-client-meta"],
)

con = await self._get_mock_connection(connection_params={"meta_header": False})
assert con.meta_header is False

await con.perform_request("GET", "/", body=b"{}")

_, kwargs = con.session.request.call_args
headers = kwargs["headers"]
assert "x-elastic-client-meta" not in (x.lower() for x in headers)
Loading