Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add google.api.core.gapic_v1.method #4057

Merged
merged 7 commits into from
Sep 26, 2017
Merged
Show file tree
Hide file tree
Changes from 6 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
244 changes: 244 additions & 0 deletions core/google/api/core/gapic_v1/method.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,244 @@
# Copyright 2017 Google Inc.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.

"""Helpers for wrapping low-level gRPC methods with common functionality.

This is used by gapic clients to provide common error mapping, retry, timeout,
pagination, and long-running operations to gRPC methods.
"""

import platform

import pkg_resources

from google.api.core import timeout
from google.api.core.helpers import grpc_helpers

_PY_VERSION = platform.python_version()
_GRPC_VERSION = pkg_resources.get_distribution('grpcio').version
_API_CORE_VERSION = pkg_resources.get_distribution('google-cloud-core').version
METRICS_METADATA_KEY = 'x-goog-api-client'
USE_DEFAULT_METADATA = object()


def _is_not_none_or_false(value):
return value is not None and value is not False


def _apply_decorators(func, decorators):
"""Apply a list of decorators to a given function.

``decorators`` may contain items that are ``None`` or ``False`` which will

This comment was marked as spam.

This comment was marked as spam.

be ignored.
"""
decorators = filter(_is_not_none_or_false, reversed(decorators))

for decorator in decorators:
func = decorator(func)

return func


def _prepare_metadata(metadata):
"""Transforms metadata to gRPC format and adds global metrics.

Args:
metadata (Optional[Mapping[str, str]]): Any current metadata.

Returns:
Sequence[Tuple(str, str)]: The gRPC-friendly metadata keys and values.
"""
if metadata is None:
metadata = {}

client_metadata = 'api-core/{} gl-python/{} grpc/{}'.format(
_API_CORE_VERSION, _PY_VERSION, _API_CORE_VERSION)

# Merge this with any existing metric metadata.
if METRICS_METADATA_KEY in metadata:
client_metadata = '{} {}'.format(
client_metadata, metadata[METRICS_METADATA_KEY])

metadata[METRICS_METADATA_KEY] = client_metadata

return list(metadata.items())


def _determine_timeout(default_timeout, specified_timeout, retry):
"""Determines how timeout should be applied to a wrapped method.

Args:
default_timeout (Optional[Timeout]): The default timeout specified
at method creation time.
specified_timeout (Optional[Timeout]): The timeout specified at
invocation time.
retry (Optional[Retry]): The retry specified at invocation time.

Returns:
Optional[Timeout]: The timeout to apply to the method or ``None``.
"""
if specified_timeout is default_timeout:
# If timeout is the default and the default timeout is exponential and
# a non-default retry is specified, make sure the timeout's deadline
# matches the retry's. This handles the case where the user leaves
# the timeout default but specifies a lower deadline via the retry.
if retry and isinstance(default_timeout, timeout.ExponentialTimeout):

This comment was marked as spam.

This comment was marked as spam.

return default_timeout.with_deadline(retry._deadline)

This comment was marked as spam.

This comment was marked as spam.

else:
return default_timeout

# If timeout is specified as a number instead of a Timeout instance,
# convert it to a ConstantTimeout.
if isinstance(specified_timeout, (int, float)):
return timeout.ConstantTimeout(specified_timeout)
else:
return specified_timeout


class _GapicCallable(object):
"""Callable that applies retry, timeout, and metadata logic.

Args:
target (Callable): The low-level RPC method.
retry (google.api.core.retry.Retry): The default retry for the
callable. If ``None``, this callable will not retry by default
timeout (google.api.core.timeout.Timeout): The default timeout
for the callable. If ``None``, this callable will not specify
a timeout argument to the low-level RPC method by default.
metadata (Optional[Sequence[Tuple[str, str]]]): gRPC call metadata
that's passed to the low-level RPC method. If falsy, no metadata
will be passed to the low-level RPC method.

This comment was marked as spam.

This comment was marked as spam.

"""

def __init__(self, target, retry, timeout, metadata):
self._target = target
self._retry = retry
self._timeout = timeout
self._metadata = metadata

def __call__(self, *args, **kwargs):
"""Invoke the low-level RPC with retry, timeout, and metadata."""
# Note: Due to Python 2 lacking keyword-only arguments we use kwargs to

This comment was marked as spam.

This comment was marked as spam.

# to extract the retry and timeout params.
timeout_ = _determine_timeout(
self._timeout,
kwargs.pop('timeout', self._timeout),
# Use only the invocation-specified retry only for this, as we only
# want to adjust the timeout deadline if the *user* specified
# a different retry.
kwargs.get('retry', None))

This comment was marked as spam.

This comment was marked as spam.


retry = kwargs.pop('retry', self._retry)

# Apply all applicable decorators.
wrapped_func = _apply_decorators(self._target, [retry, timeout_])

# Set the metadata for the call using the metadata calculated by
# _prepare_metadata.
if self._metadata:

This comment was marked as spam.

This comment was marked as spam.

kwargs['metadata'] = self._metadata

return wrapped_func(*args, **kwargs)


def wrap_method(
func, default_retry=None, default_timeout=None,
metadata=USE_DEFAULT_METADATA):
"""Wrap an RPC method with common behavior.

This applies common error wrapping, retry, and timeout behavior a function.
The wrapped function will take optional ``retry`` and ``timeout``
arguments.

For example::

import google.api.core.gapic_v1.method
from google.api.core import retry
from google.api.core import timeout

# The original RPC method.
def get_topic(name, timeout=None):
request = publisher_v2.GetTopicRequest(name=name)
return publisher_stub.GetTopic(request, timeout=timeout)

default_retry = retry.Retry(deadline=60)
default_timeout = timeout.Timeout(deadline=60)
wrapped_get_topic = google.api.core.gapic_v1.method.wrap_method(
get_topic, default_retry)

# Execute get_topic with default retry and timeout:
response = wrapped_get_topic()

# Execute get_topic without doing any retying but with the default
# timeout:
response = wrapped_get_topic(retry=None)

# Execute get_topic but only retry on 5xx errors:
my_retry = retry.Retry(retry.if_exception_type(
exceptions.InternalServerError))
response = wrapped_get_topic(retry=my_retry)

The way this works is by late-wrapping the given function with the retry
and timeout decorators. Essentially, when ``wrapped_get_topic()`` is
called:

* ``get_topic()`` is first wrapped with the ``timeout`` into
``get_topic_with_timeout``.
* ``get_topic_with_timeout`` is wrapped with the ``retry`` into
``get_topic_with_timeout_and_retry()``.
* The final ``get_topic_with_timeout_and_retry`` is called passing through
the ``args`` and ``kwargs``.

The callstack is therefore::

method.__call__() ->
Retry.__call__() ->
Timeout.__call__() ->
wrap_errors() ->
get_topic()

Note that if ``timeout`` or ``retry`` is ``None``, then they are not
applied to the function. For example,
``wrapped_get_topic(timeout=None, retry=None)`` is more or less
equivalent to just calling ``get_topic`` but with error re-mapping.

Args:
func (Callable[Any]): The function to wrap. It should accept an
optional ``timeout`` argument. If ``metadata`` is not ``None``, it
should accept a ``metadata`` argument.
default_retry (Optional[google.api.core.Retry]): The default retry
strategy. If ``None``, the method will not retry by default.
default_timeout (Optional[google.api.core.Timeout]): The default
timeout strategy. Can also be specified as an int or float. If
``None``, the method will not have timeout specified by default.
metadata (Optional(Mapping[str, str])): A dict of metadata keys and
values. This will be augmented with common ``x-google-api-client``
metadata. If ``None``, metadata will not be passed to the function
at all, if ``USE_DEFAULT_METADATA`` (the default) then only the

This comment was marked as spam.

This comment was marked as spam.

common metadata will be provided.

Returns:
Callable: A new callable that takes optional ``retry`` and ``timeout``
arguments and applies the common error mapping, retry, timeout,
and metadata behavior to the low-level RPC method.
"""
func = grpc_helpers.wrap_errors(func)

This comment was marked as spam.

This comment was marked as spam.


if metadata is USE_DEFAULT_METADATA:
metadata = {}

if metadata is not None:
metadata = _prepare_metadata(metadata)

This comment was marked as spam.

This comment was marked as spam.


return _GapicCallable(func, default_retry, default_timeout, metadata)
146 changes: 146 additions & 0 deletions core/tests/unit/api_core/gapic/test_method.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,146 @@
# Copyright 2017 Google Inc.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.

import mock

from google.api.core import exceptions
from google.api.core import retry
from google.api.core import timeout
import google.api.core.gapic_v1.method


def test_wrap_method_basic():
method = mock.Mock(spec=['__call__'], return_value=42)

wrapped_method = google.api.core.gapic_v1.method.wrap_method(
method, metadata=None)

result = wrapped_method(1, 2, meep='moop')

assert result == 42
method.assert_called_once_with(1, 2, meep='moop')


def test_wrap_method_with_default_metadata():
method = mock.Mock(spec=['__call__'])

wrapped_method = google.api.core.gapic_v1.method.wrap_method(method)

wrapped_method(1, 2, meep='moop')

method.assert_called_once_with(1, 2, meep='moop', metadata=mock.ANY)

metadata = method.call_args[1]['metadata']
assert len(metadata) == 1
assert metadata[0][0] == 'x-goog-api-client'
assert 'api-core' in metadata[0][1]


def test_wrap_method_with_custom_metadata():
method = mock.Mock(spec=['__call__'])

wrapped_method = google.api.core.gapic_v1.method.wrap_method(
method, metadata={'foo': 'bar'})

wrapped_method(1, 2, meep='moop')

method.assert_called_once_with(1, 2, meep='moop', metadata=mock.ANY)

metadata = method.call_args[1]['metadata']
assert len(metadata) == 2
assert ('foo', 'bar') in metadata


def test_wrap_method_with_merged_metadata():
method = mock.Mock(spec=['__call__'])

wrapped_method = google.api.core.gapic_v1.method.wrap_method(
method, metadata={'x-goog-api-client': 'foo/1.2.3'})

wrapped_method(1, 2, meep='moop')

method.assert_called_once_with(1, 2, meep='moop', metadata=mock.ANY)

metadata = method.call_args[1]['metadata']
assert len(metadata) == 1
assert metadata[0][0] == 'x-goog-api-client'
assert metadata[0][1].endswith(' foo/1.2.3')


@mock.patch('time.sleep')
def test_wrap_method_with_default_retry_and_timeout(unusued_sleep):
method = mock.Mock(spec=['__call__'], side_effect=[
exceptions.InternalServerError(None),
42])
default_retry = retry.Retry()
default_timeout = timeout.ConstantTimeout(60)
wrapped_method = google.api.core.gapic_v1.method.wrap_method(
method, default_retry, default_timeout)

result = wrapped_method()

assert result == 42
assert method.call_count == 2
method.assert_called_with(timeout=60, metadata=mock.ANY)


@mock.patch('time.sleep')
def test_wrap_method_with_overriding_retry_and_timeout(unusued_sleep):
method = mock.Mock(spec=['__call__'], side_effect=[
exceptions.NotFound(None),
42])
default_retry = retry.Retry()
default_timeout = timeout.ConstantTimeout(60)
wrapped_method = google.api.core.gapic_v1.method.wrap_method(
method, default_retry, default_timeout)

result = wrapped_method(
retry=retry.Retry(retry.if_exception_type(exceptions.NotFound)),
timeout=timeout.ConstantTimeout(22))

assert result == 42
assert method.call_count == 2
method.assert_called_with(timeout=22, metadata=mock.ANY)


@mock.patch('time.sleep')
def test_wrap_method_with_overriding_retry_deadline(unusued_sleep):
method = mock.Mock(spec=['__call__'], side_effect=([
exceptions.InternalServerError(None)] * 3) + [42])
default_retry = retry.Retry()
default_timeout = timeout.ExponentialTimeout(deadline=60)
wrapped_method = google.api.core.gapic_v1.method.wrap_method(
method, default_retry, default_timeout)

# Overriding only the retry's deadline should also override the timeout's
# deadline.
result = wrapped_method(
retry=default_retry.with_deadline(30))

assert result == 42
timeout_args = [call[1]['timeout'] for call in method.call_args_list]
assert timeout_args == [5, 10, 20, 29]


def test_wrap_method_with_overriding_timeout_as_a_number():
method = mock.Mock(spec=['__call__'], return_value=42)
default_retry = retry.Retry()
default_timeout = timeout.ConstantTimeout(60)
wrapped_method = google.api.core.gapic_v1.method.wrap_method(
method, default_retry, default_timeout)

result = wrapped_method(timeout=22)

assert result == 42
method.assert_called_once_with(timeout=22, metadata=mock.ANY)