Skip to content

Commit

Permalink
Add base future package to google.cloud (googleapis#3616)
Browse files Browse the repository at this point in the history
  • Loading branch information
Jon Wayne Parrott authored and landrito committed Aug 22, 2017
1 parent 3d8cf02 commit a26aa32
Show file tree
Hide file tree
Showing 9 changed files with 472 additions and 0 deletions.
3 changes: 3 additions & 0 deletions core/.coveragerc
Original file line number Diff line number Diff line change
Expand Up @@ -13,3 +13,6 @@ exclude_lines =
pragma: NO COVER
# Ignore debug-only repr
def __repr__
# Ignore abstract methods
raise NotImplementedError
raise NotImplementedError()
23 changes: 23 additions & 0 deletions core/google/cloud/_helpers.py
Original file line number Diff line number Diff line change
Expand Up @@ -379,6 +379,29 @@ def _bytes_to_unicode(value):
raise ValueError('%r could not be converted to unicode' % (value,))


def _from_any_pb(pb_type, any_pb):
"""Converts an Any protobuf to the specified message type
Args:
pb_type (type): the type of the message that any_pb stores an instance
of.
any_pb (google.protobuf.any_pb2.Any): the object to be converted.
Returns:
pb_type: An instance of the pb_type message.
Raises:
TypeError: if the message could not be converted.
"""
msg = pb_type()
if not any_pb.Unpack(msg):
raise TypeError(
'Could not convert {} to {}'.format(
any_pb.__class__.__name__, pb_type.__name__))

return msg


def _pb_timestamp_to_datetime(timestamp_pb):
"""Convert a Timestamp protobuf to a datetime object.
Expand Down
21 changes: 21 additions & 0 deletions core/google/cloud/future/__init__.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,21 @@
# Copyright 2017, Google Inc.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.

"""Futures for dealing with asynchronous operations."""

from google.cloud.future.base import Future

__all__ = [
'Future',
]
39 changes: 39 additions & 0 deletions core/google/cloud/future/_helpers.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,39 @@
# Copyright 2017, Google Inc.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.

"""Private helpers for futures."""

import logging
import threading


_LOGGER = logging.getLogger(__name__)


def start_daemon_thread(*args, **kwargs):
"""Starts a thread and marks it as a daemon thread."""
thread = threading.Thread(*args, **kwargs)
thread.daemon = True
thread.start()
return thread


def safe_invoke_callback(callback, *args, **kwargs):
"""Invoke a callback, swallowing and logging any exceptions."""
# pylint: disable=bare-except
# We intentionally want to swallow all exceptions.
try:
return callback(*args, **kwargs)
except:
_LOGGER.exception('Error while executing Future callback.')
175 changes: 175 additions & 0 deletions core/google/cloud/future/base.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,175 @@
# Copyright 2017, Google Inc.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.

"""Abstract and helper bases for Future implementations."""

import abc

import six

from google.cloud.future import _helpers


@six.add_metaclass(abc.ABCMeta)
class Future(object):
# pylint: disable=missing-docstring
# We inherit the interfaces here from concurrent.futures.

"""Future interface.
This interface is based on :class:`concurrent.futures.Future`.
"""

@abc.abstractmethod
def cancel(self):
raise NotImplementedError()

@abc.abstractmethod
def cancelled(self):
raise NotImplementedError()

@abc.abstractmethod
def running(self):
raise NotImplementedError()

@abc.abstractmethod
def done(self):
raise NotImplementedError()

@abc.abstractmethod
def result(self, timeout=None):
raise NotImplementedError()

@abc.abstractmethod
def exception(self, timeout=None):
raise NotImplementedError()

@abc.abstractmethod
def add_done_callback(self, fn):
# pylint: disable=invalid-name
raise NotImplementedError()

@abc.abstractmethod
def set_result(self, result):
raise NotImplementedError()

@abc.abstractmethod
def set_exception(self, exception):
raise NotImplementedError()


class PollingFuture(Future):
"""A Future that needs to poll some service to check its status.
The private :meth:`_blocking_poll` method should be implemented by
subclasses.
.. note: Privacy here is intended to prevent the final class from
overexposing, not to prevent subclasses from accessing methods.
"""
def __init__(self):
super(PollingFuture, self).__init__()
self._result = None
self._exception = None
self._result_set = False
"""bool: Set to True when the result has been set via set_result or
set_exception."""
self._polling_thread = None
self._done_callbacks = []

@abc.abstractmethod
def _blocking_poll(self, timeout=None):
"""Poll and wait for the Future to be resolved.
Args:
timeout (int): How long to wait for the operation to complete.
If None, wait indefinitely.
"""
# pylint: disable=missing-raises
raise NotImplementedError()

def result(self, timeout=None):
"""Get the result of the operation, blocking if necessary.
Args:
timeout (int): How long to wait for the operation to complete.
If None, wait indefinitely.
Returns:
google.protobuf.Message: The Operation's result.
Raises:
google.gax.GaxError: If the operation errors or if the timeout is
reached before the operation completes.
"""
self._blocking_poll()

if self._exception is not None:
# pylint: disable=raising-bad-type
# Pylint doesn't recognize that this is valid in this case.
raise self._exception

return self._result

def exception(self, timeout=None):
"""Get the exception from the operation, blocking if necessary.
Args:
timeout (int): How long to wait for the operation to complete.
If None, wait indefinitely.
Returns:
Optional[google.gax.GaxError]: The operation's error.
"""
self._blocking_poll()
return self._exception

def add_done_callback(self, fn):
"""Add a callback to be executed when the operation is complete.
If the operation is not already complete, this will start a helper
thread to poll for the status of the operation in the background.
Args:
fn (Callable[Future]): The callback to execute when the operation
is complete.
"""
if self._result_set:
_helpers.safe_invoke_callback(fn, self)
return

self._done_callbacks.append(fn)

if self._polling_thread is None:
# The polling thread will exit on its own as soon as the operation
# is done.
self._polling_thread = _helpers.start_daemon_thread(
target=self._blocking_poll)

def _invoke_callbacks(self, *args, **kwargs):
"""Invoke all done callbacks."""
for callback in self._done_callbacks:
_helpers.safe_invoke_callback(callback, *args, **kwargs)

def set_result(self, result):
"""Set the Future's result."""
self._result = result
self._result_set = True
self._invoke_callbacks(self)

def set_exception(self, exception):
"""Set the Future's exception."""
self._exception = exception
self._result_set = True
self._invoke_callbacks(self)
Empty file.
37 changes: 37 additions & 0 deletions core/tests/unit/future/test__helpers.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,37 @@
# Copyright 2017, Google Inc.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.

import mock

from google.cloud.future import _helpers


@mock.patch('threading.Thread', autospec=True)
def test_start_deamon_thread(unused_thread):
deamon_thread = _helpers.start_daemon_thread(target=mock.sentinel.target)
assert deamon_thread.daemon is True


def test_safe_invoke_callback():
callback = mock.Mock(spec=['__call__'], return_value=42)
result = _helpers.safe_invoke_callback(callback, 'a', b='c')
assert result == 42
callback.assert_called_once_with('a', b='c')


def test_safe_invoke_callback_exception():
callback = mock.Mock(spec=['__call__'], side_effect=ValueError())
result = _helpers.safe_invoke_callback(callback, 'a', b='c')
assert result is None
callback.assert_called_once_with('a', b='c')
Loading

0 comments on commit a26aa32

Please sign in to comment.