diff --git a/CHANGELOG.md b/CHANGELOG.md index 8a249543..63baabdf 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -9,7 +9,8 @@ Inspired from [Keep a Changelog](https://keepachangelog.com/en/1.0.0/) - Added overload decorators to helpers-actions.pyi-"bulk" ([#239](https://github.com/opensearch-project/opensearch-py/pull/239)) - Document Keberos authenticaion ([214](https://github.com/opensearch-project/opensearch-py/pull/214)) - Add release workflows ([#240](https://github.com/opensearch-project/opensearch-py/pull/240)) -- Added new OpenSearch versions and updated compatibility matrix ([#257](https://github.com/opensearch-project/opensearch-py/pull/257)) +- Added SigV4 support for Async Opensearch Client ([#254](https://github.com/opensearch-project/opensearch-py/pull/254)) +- Compatibility with OpenSearch 2.1.0 - 2.4.1 ([#257](https://github.com/opensearch-project/opensearch-py/pull/257)) ### Changed - Updated getting started to user guide ([#233](https://github.com/opensearch-project/opensearch-py/pull/233)) - Updated CA certificate handling to check OpenSSL environment variables before defaulting to certifi ([#196](https://github.com/opensearch-project/opensearch-py/pull/196)) diff --git a/USER_GUIDE.md b/USER_GUIDE.md index 902ed7f9..5dc177ba 100644 --- a/USER_GUIDE.md +++ b/USER_GUIDE.md @@ -1,4 +1,4 @@ -- [User guide of OpenSearch Python Client](#user-guide-of-opensearch-python-client) +- [User guide of OpenSearch Python client](#user-guide-of-opensearch-python-client) - [Setup](#setup) - [Example](#example) - [Creating a client](#creating-a-client) @@ -9,8 +9,8 @@ - [Searching for a document](#searching-for-a-document) - [Deleting a document](#deleting-a-document) - [Deleting an index](#deleting-an-index) - - [Making API Calls](#making-api-calls) - - [Point in Time API](#point-in-time-api) + - [Making API calls](#making-api-calls) + - [Point in time API](#point-in-time-api) - [Using plugins](#using-plugins) - [Alerting plugin](#alerting-plugin) - [**Searching for monitors**](#searching-for-monitors) @@ -22,6 +22,7 @@ - [Using different authentication methods](#using-different-authentication-methods) - [Using IAM credentials](#using-iam-credentials) - [Pre-requisites to use `AWSV4SignerAuth`](#pre-requisites-to-use-awsv4signerauth) + - [Using IAM authentication with an async client](#using-iam-authentication-with-an-async-client) - [Using Kerberos](#using-kerberos) # User guide of OpenSearch Python client @@ -439,6 +440,51 @@ print('\nSearch results:') print(response) ``` +## Using IAM authentication with an async client + +Make sure to use the `AsyncHttpConnection` connection class with the async `AWSV4SignerAsyncAuth` signer. + +```python +from opensearchpy import OpenSearch, AsyncHttpConnection, AWSV4SignerAsyncAuth +import boto3 + +host = '' # cluster endpoint, for example: my-test-domain.us-east-1.es.amazonaws.com +region = 'us-west-2' +credentials = boto3.Session().get_credentials() +auth = AWSV4SignerAsyncAuth(credentials, region) +index_name = 'python-test-index3' + +client = OpenSearch( + hosts = [{'host': host, 'port': 443}], + http_auth = auth, + use_ssl = True, + verify_certs = True, + connection_class = AsyncHttpConnection +) + +async def search(): + q = 'miller' + query = { + 'size': 5, + 'query': { + 'multi_match': { + 'query': q, + 'fields': ['title^2', 'director'] + } + } + } + + response = await client.search( + body = query, + index = index_name + ) + + print('\nSearch results:') + print(response) + +search() +``` +======= ### Using Kerberos There are several python packages that provide Kerberos support over HTTP connections, such as [requests-kerberos](http://pypi.org/project/requests-kerberos) and [requests-gssapi](https://pypi.org/project/requests-gssapi). The following example shows how to setup the authentication. Note that some of the parameters, such as `mutual_authentication` might depend on the server settings. diff --git a/opensearchpy/__init__.py b/opensearchpy/__init__.py index bd7f24b5..c66c4613 100644 --- a/opensearchpy/__init__.py +++ b/opensearchpy/__init__.py @@ -62,7 +62,7 @@ SSLError, TransportError, ) -from .helpers import AWSV4SignerAuth +from .helpers import AWSV4SignerAsyncAuth, AWSV4SignerAuth from .serializer import JSONSerializer from .transport import Transport @@ -79,6 +79,7 @@ "JSONSerializer", "Connection", "RequestsHttpConnection", + "AsyncHttpConnection", "Urllib3HttpConnection", "ImproperlyConfigured", "OpenSearchException", @@ -95,6 +96,7 @@ "OpenSearchWarning", "OpenSearchDeprecationWarning", "AWSV4SignerAuth", + "AWSV4SignerAsyncAuth", ] try: @@ -105,12 +107,14 @@ from ._async.client import AsyncOpenSearch from ._async.http_aiohttp import AIOHttpConnection, AsyncConnection from ._async.transport import AsyncTransport + from .connection import AsyncHttpConnection __all__ += [ "AIOHttpConnection", "AsyncConnection", "AsyncTransport", "AsyncOpenSearch", + "AsyncHttpConnection", ] except (ImportError, SyntaxError): pass diff --git a/opensearchpy/__init__.pyi b/opensearchpy/__init__.pyi index cb927952..a53caac1 100644 --- a/opensearchpy/__init__.pyi +++ b/opensearchpy/__init__.pyi @@ -28,6 +28,7 @@ import sys from typing import Tuple from .client import OpenSearch as OpenSearch +from .connection import AsyncHttpConnection as AsyncHttpConnection from .connection import Connection as Connection from .connection import RequestsHttpConnection as RequestsHttpConnection from .connection import Urllib3HttpConnection as Urllib3HttpConnection @@ -57,6 +58,7 @@ try: from ._async.client import AsyncOpenSearch as AsyncOpenSearch from ._async.http_aiohttp import AIOHttpConnection as AIOHttpConnection from ._async.transport import AsyncTransport as AsyncTransport + from .helpers import AWSV4SignerAsyncAuth as AWSV4SignerAsyncAuth from .helpers import AWSV4SignerAuth as AWSV4SignerAuth except (ImportError, SyntaxError): pass diff --git a/opensearchpy/connection/__init__.py b/opensearchpy/connection/__init__.py index 61342241..1b9ad2cd 100644 --- a/opensearchpy/connection/__init__.py +++ b/opensearchpy/connection/__init__.py @@ -25,6 +25,8 @@ # under the License. +import sys + from .base import Connection from .http_requests import RequestsHttpConnection from .http_urllib3 import Urllib3HttpConnection, create_ssl_context @@ -35,3 +37,16 @@ "Urllib3HttpConnection", "create_ssl_context", ] + +try: + # Asyncio only supported on Python 3.6+ + if sys.version_info < (3, 6): + raise ImportError + + from .http_async import AsyncHttpConnection + + __all__ += [ + "AsyncHttpConnection", + ] +except (ImportError, SyntaxError): + pass diff --git a/opensearchpy/connection/__init__.pyi b/opensearchpy/connection/__init__.pyi index 3e5f3b52..ad1d9e62 100644 --- a/opensearchpy/connection/__init__.pyi +++ b/opensearchpy/connection/__init__.pyi @@ -25,6 +25,7 @@ # under the License. from .base import Connection as Connection +from .http_async import AsyncHttpConnection as AsyncHttpConnection from .http_requests import RequestsHttpConnection as RequestsHttpConnection from .http_urllib3 import Urllib3HttpConnection as Urllib3HttpConnection from .http_urllib3 import create_ssl_context as create_ssl_context diff --git a/opensearchpy/connection/http_async.py b/opensearchpy/connection/http_async.py new file mode 100644 index 00000000..8c2c94d2 --- /dev/null +++ b/opensearchpy/connection/http_async.py @@ -0,0 +1,290 @@ +# SPDX-License-Identifier: Apache-2.0 +# +# The OpenSearch Contributors require contributions made to +# this file be licensed under the Apache-2.0 license or a +# compatible open source license. +# +# Modifications Copyright OpenSearch Contributors. See +# GitHub history for details. +# + +import asyncio +import os +import ssl +import warnings + +from .._async._extra_imports import aiohttp, aiohttp_exceptions +from .._async.compat import get_running_loop +from .._async.http_aiohttp import AIOHttpConnection +from ..compat import reraise_exceptions, string_types, urlencode +from ..exceptions import ( + ConnectionError, + ConnectionTimeout, + ImproperlyConfigured, + SSLError, +) + +VERIFY_CERTS_DEFAULT = object() +SSL_SHOW_WARN_DEFAULT = object() + + +class AsyncHttpConnection(AIOHttpConnection): + def __init__( + self, + host="localhost", + port=None, + http_auth=None, + use_ssl=False, + verify_certs=VERIFY_CERTS_DEFAULT, + ssl_show_warn=SSL_SHOW_WARN_DEFAULT, + ca_certs=None, + client_cert=None, + client_key=None, + ssl_version=None, + ssl_assert_fingerprint=None, + maxsize=10, + headers=None, + ssl_context=None, + http_compress=None, + opaque_id=None, + loop=None, + **kwargs + ): + self.headers = {} + + super().__init__( + host=host, + port=port, + use_ssl=use_ssl, + headers=headers, + http_compress=http_compress, + opaque_id=opaque_id, + **kwargs + ) + + if http_auth is not None: + if isinstance(http_auth, (tuple, list)): + http_auth = ":".join(http_auth) + elif isinstance(http_auth, string_types): + http_auth = tuple(http_auth.split(":", 1)) + + # if providing an SSL context, raise error if any other SSL related flag is used + if ssl_context and ( + (verify_certs is not VERIFY_CERTS_DEFAULT) + or (ssl_show_warn is not SSL_SHOW_WARN_DEFAULT) + or ca_certs + or client_cert + or client_key + or ssl_version + ): + warnings.warn( + "When using `ssl_context`, all other SSL related kwargs are ignored" + ) + + self.ssl_assert_fingerprint = ssl_assert_fingerprint + if self.use_ssl and ssl_context is None: + if ssl_version is None: + ssl_context = ssl.create_default_context() + else: + ssl_context = ssl.SSLContext(ssl_version) + + # Convert all sentinel values to their actual default + # values if not using an SSLContext. + if verify_certs is VERIFY_CERTS_DEFAULT: + verify_certs = True + if ssl_show_warn is SSL_SHOW_WARN_DEFAULT: + ssl_show_warn = True + + if verify_certs: + ssl_context.verify_mode = ssl.CERT_REQUIRED + ssl_context.check_hostname = True + else: + ssl_context.check_hostname = False + ssl_context.verify_mode = ssl.CERT_NONE + + ca_certs = self.default_ca_certs() if ca_certs is None else ca_certs + if verify_certs: + if not ca_certs: + raise ImproperlyConfigured( + "Root certificates are missing for certificate " + "validation. Either pass them in using the ca_certs parameter or " + "install certifi to use it automatically." + ) + if os.path.isfile(ca_certs): + ssl_context.load_verify_locations(cafile=ca_certs) + elif os.path.isdir(ca_certs): + ssl_context.load_verify_locations(capath=ca_certs) + else: + raise ImproperlyConfigured("ca_certs parameter is not a path") + else: + if ssl_show_warn: + warnings.warn( + "Connecting to %s using SSL with verify_certs=False is insecure." + % self.host + ) + + # Use client_cert and client_key variables for SSL certificate configuration. + if client_cert and not os.path.isfile(client_cert): + raise ImproperlyConfigured("client_cert is not a path to a file") + if client_key and not os.path.isfile(client_key): + raise ImproperlyConfigured("client_key is not a path to a file") + if client_cert and client_key: + ssl_context.load_cert_chain(client_cert, client_key) + elif client_cert: + ssl_context.load_cert_chain(client_cert) + + self.headers.setdefault("connection", "keep-alive") + self.loop = loop + self.session = None + + # Parameters for creating an aiohttp.ClientSession later. + self._limit = maxsize + self._http_auth = http_auth + self._ssl_context = ssl_context + + async def perform_request( + self, method, url, params=None, body=None, timeout=None, ignore=(), headers=None + ): + if self.session is None: + await self._create_aiohttp_session() + assert self.session is not None + orig_body = body + url_path = self.url_prefix + url + if params: + query_string = urlencode(params) + else: + query_string = "" + + # There is a bug in aiohttp that disables the re-use + # of the connection in the pool when method=HEAD. + # See: https://github.com/aio-libs/aiohttp/issues/1769 + is_head = False + if method == "HEAD": + method = "GET" + is_head = True + + # Top-tier tip-toeing happening here. Basically + # because Pip's old resolver is bad and wipes out + # strict pins in favor of non-strict pins of extras + # our [async] extra overrides aiohttp's pin of + # yarl. yarl released breaking changes, aiohttp pinned + # defensively afterwards, but our users don't get + # that nice pin that aiohttp set. :( So to play around + # this super-defensively we try to import yarl, if we can't + # then we pass a string into ClientSession.request() instead. + url = self.url_prefix + url + if query_string: + url = "%s?%s" % (url, query_string) + url = self.host + url + + timeout = aiohttp.ClientTimeout( + total=timeout if timeout is not None else self.timeout + ) + + req_headers = self.headers.copy() + if headers: + req_headers.update(headers) + + if self.http_compress and body: + body = self._gzip_compress(body) + req_headers["content-encoding"] = "gzip" + + req_headers = { + **req_headers, + **self._http_auth(method, url, query_string, body), + } + + start = self.loop.time() + try: + + async with self.session.request( + method, + url, + data=body, + headers=req_headers, + timeout=timeout, + fingerprint=self.ssl_assert_fingerprint, + ) as response: + if is_head: # We actually called 'GET' so throw away the data. + await response.release() + raw_data = "" + else: + raw_data = await response.text() + duration = self.loop.time() - start + + # We want to reraise a cancellation or recursion error. + except reraise_exceptions: + raise + except Exception as e: + self.log_request_fail( + method, + str(url), + url_path, + orig_body, + self.loop.time() - start, + exception=e, + ) + if isinstance(e, aiohttp_exceptions.ServerFingerprintMismatch): + raise SSLError("N/A", str(e), e) + if isinstance( + e, (asyncio.TimeoutError, aiohttp_exceptions.ServerTimeoutError) + ): + raise ConnectionTimeout("TIMEOUT", str(e), e) + raise ConnectionError("N/A", str(e), e) + + # raise warnings if any from the 'Warnings' header. + warning_headers = response.headers.getall("warning", ()) + self._raise_warnings(warning_headers) + + # raise errors based on http status codes, let the client handle those if needed + if not (200 <= response.status < 300) and response.status not in ignore: + self.log_request_fail( + method, + str(url), + url_path, + orig_body, + duration, + status_code=response.status, + response=raw_data, + ) + self._raise_error(response.status, raw_data) + + self.log_request_success( + method, str(url), url_path, orig_body, response.status, raw_data, duration + ) + + return response.status, response.headers, raw_data + + async def close(self): + """ + Explicitly closes connection + """ + if self.session: + await self.session.close() + + async def _create_aiohttp_session(self): + """Creates an aiohttp.ClientSession(). This is delayed until + the first call to perform_request() so that AsyncTransport has + a chance to set AIOHttpConnection.loop + """ + if self.loop is None: + self.loop = get_running_loop() + self.session = aiohttp.ClientSession( + headers=self.headers, + skip_auto_headers=("accept", "accept-encoding"), + auto_decompress=True, + loop=self.loop, + cookie_jar=aiohttp.DummyCookieJar(), + response_class=OpenSearchClientResponse, + connector=aiohttp.TCPConnector( + limit=self._limit, use_dns_cache=True, ssl=self._ssl_context + ), + ) + + +class OpenSearchClientResponse(aiohttp.ClientResponse): + async def text(self, encoding=None, errors="strict"): + if self._body is None: + await self.read() + + return self._body.decode("utf-8", "surrogatepass") diff --git a/opensearchpy/connection/http_async.pyi b/opensearchpy/connection/http_async.pyi new file mode 100644 index 00000000..adde809b --- /dev/null +++ b/opensearchpy/connection/http_async.pyi @@ -0,0 +1,37 @@ +# SPDX-License-Identifier: Apache-2.0 +# +# The OpenSearch Contributors require contributions made to +# this file be licensed under the Apache-2.0 license or a +# compatible open source license. +# +# Modifications Copyright OpenSearch Contributors. See +# GitHub history for details. + +from typing import Any, Mapping, Optional + +from .._async._extra_imports import aiohttp # type: ignore +from .._async.http_aiohttp import AIOHttpConnection + +class AsyncHttpConnection(AIOHttpConnection): + session: Optional[aiohttp.ClientSession] + def __init__( + self, + host: str = ..., + port: Optional[int] = ..., + http_auth: Optional[Any] = ..., + use_ssl: bool = ..., + verify_certs: bool = ..., + ssl_show_warn: bool = ..., + ca_certs: Optional[Any] = ..., + client_cert: Optional[Any] = ..., + client_key: Optional[Any] = ..., + ssl_version: Optional[Any] = ..., + ssl_assert_fingerprint: Optional[Any] = ..., + maxsize: Optional[int] = ..., + headers: Optional[Mapping[str, str]] = ..., + ssl_context: Optional[Any] = ..., + http_compress: Optional[bool] = ..., + opaque_id: Optional[str] = ..., + loop: Optional[Any] = ..., + **kwargs: Any + ) -> None: ... diff --git a/opensearchpy/connection/http_requests.py b/opensearchpy/connection/http_requests.py index 0ab835a7..316bf8ef 100644 --- a/opensearchpy/connection/http_requests.py +++ b/opensearchpy/connection/http_requests.py @@ -99,9 +99,7 @@ def __init__( # Mount http-adapter with custom connection-pool size. Default=10 if pool_maxsize and isinstance(pool_maxsize, int): - pool_adapter = requests.adapters.HTTPAdapter( - pool_maxsize=pool_maxsize - ) + pool_adapter = requests.adapters.HTTPAdapter(pool_maxsize=pool_maxsize) self.session.mount("http://", pool_adapter) self.session.mount("https://", pool_adapter) diff --git a/opensearchpy/helpers/__init__.py b/opensearchpy/helpers/__init__.py index 4b08807b..2aff0e3e 100644 --- a/opensearchpy/helpers/__init__.py +++ b/opensearchpy/helpers/__init__.py @@ -37,6 +37,7 @@ scan, streaming_bulk, ) +from .asyncsigner import AWSV4SignerAsyncAuth from .errors import BulkIndexError, ScanError from .signer import AWSV4SignerAuth @@ -52,6 +53,7 @@ "_chunk_actions", "_process_bulk_chunk", "AWSV4SignerAuth", + "AWSV4SignerAsyncAuth", ] diff --git a/opensearchpy/helpers/__init__.pyi b/opensearchpy/helpers/__init__.pyi index d2747865..bc307c7b 100644 --- a/opensearchpy/helpers/__init__.pyi +++ b/opensearchpy/helpers/__init__.pyi @@ -46,6 +46,7 @@ try: from .._async.helpers import async_reindex as async_reindex from .._async.helpers import async_scan as async_scan from .._async.helpers import async_streaming_bulk as async_streaming_bulk + from .asyncsigner import AWSV4SignerAsyncAuth as AWSV4SignerAsyncAuth from .signer import AWSV4SignerAuth as AWSV4SignerAuth except (ImportError, SyntaxError): pass diff --git a/opensearchpy/helpers/asyncsigner.py b/opensearchpy/helpers/asyncsigner.py new file mode 100644 index 00000000..fbf88e25 --- /dev/null +++ b/opensearchpy/helpers/asyncsigner.py @@ -0,0 +1,54 @@ +# SPDX-License-Identifier: Apache-2.0 +# +# The OpenSearch Contributors require contributions made to +# this file be licensed under the Apache-2.0 license or a +# compatible open source license. +# +# Modifications Copyright OpenSearch Contributors. See +# GitHub history for details. + +import sys + +OPENSEARCH_SERVICE = "es" + +PY3 = sys.version_info[0] == 3 + + +class AWSV4SignerAsyncAuth: + """ + AWS V4 Request Signer for Async Requests. + """ + + def __init__(self, credentials, region): # type: ignore + if not credentials: + raise ValueError("Credentials cannot be empty") + self.credentials = credentials + + if not region: + raise ValueError("Region cannot be empty") + self.region = region + + def __call__(self, method, url, query_string, body): # type: ignore + return self._sign_request(method, url, query_string, body) # type: ignore + + def _sign_request(self, method, url, query_string, body): + """ + This method helps in signing the request by injecting the required headers. + :param prepared_request: unsigned headers + :return: signed headers + """ + from botocore.auth import SigV4Auth + from botocore.awsrequest import AWSRequest + + # create an AWS request object and sign it using SigV4Auth + print("".join([url, query_string])) + aws_request = AWSRequest( + method=method, + url="".join([url, query_string]), + data=body, + ) + sig_v4_auth = SigV4Auth(self.credentials, OPENSEARCH_SERVICE, self.region) + sig_v4_auth.add_auth(aws_request) + + # copy the headers from AWS request object into the prepared_request + return dict(aws_request.headers.items()) diff --git a/opensearchpy/helpers/asyncsigner.pyi b/opensearchpy/helpers/asyncsigner.pyi new file mode 100644 index 00000000..2c701bb9 --- /dev/null +++ b/opensearchpy/helpers/asyncsigner.pyi @@ -0,0 +1,18 @@ +# SPDX-License-Identifier: Apache-2.0 +# +# The OpenSearch Contributors require contributions made to +# this file be licensed under the Apache-2.0 license or a +# compatible open source license. +# +# Modifications Copyright OpenSearch Contributors. See +# GitHub history for details. + +from typing import Any, Dict, List + +class AWSV4SignerAsyncAuth: + @property + def __init__(self, *args: Any, **kwargs: Any) -> None: ... + @property + def __call__(self, *args: Any, **kwargs: Any) -> Any: ... + @property + def _sign_request(self, *args: Any, **kwargs: Any) -> Dict[str, List[str]]: ... diff --git a/test_opensearchpy/test_async/test_asyncsigner.py b/test_opensearchpy/test_async/test_asyncsigner.py new file mode 100644 index 00000000..4a977836 --- /dev/null +++ b/test_opensearchpy/test_async/test_asyncsigner.py @@ -0,0 +1,65 @@ +# -*- coding: utf-8 -*- +# SPDX-License-Identifier: Apache-2.0 +# +# The OpenSearch Contributors require contributions made to +# this file be licensed under the Apache-2.0 license or a +# compatible open source license. +# +# Modifications Copyright OpenSearch Contributors. See +# GitHub history for details. + + +import sys + +import pytest + +from ..test_cases import TestCase + +pytestmark = pytest.mark.asyncio + + +class TestAsyncSigner(TestCase): + @pytest.mark.skipif( + sys.version_info < (3, 6), reason="AWSV4SignerAsyncAuth requires python3.6+" + ) + async def test_aws_signer_async_as_http_auth(self): + region = "us-west-2" + + from opensearchpy.helpers.asyncsigner import AWSV4SignerAsyncAuth + + auth = AWSV4SignerAsyncAuth(self.mock_session(), region) + headers = auth("GET", "http://localhost") + self.assertIn("Authorization", headers) + self.assertIn("X-Amz-Date", headers) + self.assertIn("X-Amz-Security-Token", headers) + + @pytest.mark.skipif( + sys.version_info < (3, 6), reason="AWSV4SignerAuth requires python3.6+" + ) + async def test_aws_signer_async_when_region_is_null(self): + session = self.mock_session() + + from opensearchpy.helpers.asyncsigner import AWSV4SignerAsyncAuth + + with pytest.raises(ValueError) as e: + AWSV4SignerAsyncAuth(session, None) + assert str(e.value) == "Region cannot be empty" + + with pytest.raises(ValueError) as e: + AWSV4SignerAsyncAuth(session, "") + assert str(e.value) == "Region cannot be empty" + + @pytest.mark.skipif( + sys.version_info < (3, 6), reason="AWSV4SignerAuth requires python3.6+" + ) + async def test_aws_signer_async_when_credentials_is_null(self): + region = "us-west-1" + + from opensearchpy.helpers.asyncsigner import AWSV4SignerAsyncAuth + + with pytest.raises(ValueError) as e: + AWSV4SignerAsyncAuth(None, region) + assert str(e.value) == "Credentials cannot be empty" + + with pytest.raises(ValueError) as e: + assert str(e.value) == "Credentials cannot be empty"