Skip to content

Commit

Permalink
Merge pull request googleapis#2576 from daspecster/add-gateway-timeou…
Browse files Browse the repository at this point in the history
…t-to-pubsub-pull

Add gateway timeout to pubsub pull
  • Loading branch information
daspecster authored Nov 24, 2016
2 parents a787604 + 25726e9 commit 2ad06a8
Show file tree
Hide file tree
Showing 3 changed files with 34 additions and 1 deletion.
4 changes: 4 additions & 0 deletions core/google/cloud/_testing.py
Original file line number Diff line number Diff line change
Expand Up @@ -77,6 +77,10 @@ def _make_grpc_failed_precondition(self):
from grpc import StatusCode
return self._make_grpc_error(StatusCode.FAILED_PRECONDITION)

def _make_grpc_deadline_exceeded(self):
from grpc import StatusCode
return self._make_grpc_error(StatusCode.DEADLINE_EXCEEDED)


class _GAXPageIterator(object):

Expand Down
9 changes: 8 additions & 1 deletion pubsub/google/cloud/pubsub/_gax.py
Original file line number Diff line number Diff line change
Expand Up @@ -413,8 +413,15 @@ def subscription_pull(self, subscription_path, return_immediately=False,
subscription_path, max_messages,
return_immediately=return_immediately)
except GaxError as exc:
if exc_to_code(exc.cause) == StatusCode.NOT_FOUND:
code = exc_to_code(exc.cause)
if code == StatusCode.NOT_FOUND:
raise NotFound(subscription_path)
elif code == StatusCode.DEADLINE_EXCEEDED:
# NOTE: The JSON-over-HTTP API returns a 200 with an empty
# response when ``return_immediately`` is ``False``, so
# we "mutate" the gRPC error into a non-error to conform.
if not return_immediately:
return []
raise
return [_received_message_pb_to_mapping(rmpb)
for rmpb in response_pb.received_messages]
Expand Down
22 changes: 22 additions & 0 deletions pubsub/unit_tests/test__gax.py
Original file line number Diff line number Diff line change
Expand Up @@ -764,6 +764,24 @@ def test_subscription_pull_defaults_error(self):
self.assertFalse(return_immediately)
self.assertIsNone(options)

def test_subscription_pull_deadline_exceeded(self):
client = _Client(self.PROJECT)
gax_api = _GAXSubscriberAPI(_deadline_exceeded_gax_error=True)
api = self._make_one(gax_api, client)

result = api.subscription_pull(self.SUB_PATH)
self.assertEqual(result, [])

def test_subscription_pull_deadline_exceeded_return_immediately(self):
from google.gax.errors import GaxError

client = _Client(self.PROJECT)
gax_api = _GAXSubscriberAPI(_deadline_exceeded_gax_error=True)
api = self._make_one(gax_api, client)

with self.assertRaises(GaxError):
api.subscription_pull(self.SUB_PATH, return_immediately=True)

def test_subscription_acknowledge_hit(self):
ACK_ID1 = 'DEADBEEF'
ACK_ID2 = 'BEADCAFE'
Expand Down Expand Up @@ -1075,6 +1093,7 @@ class _GAXSubscriberAPI(_GAXBaseAPI):
_modify_push_config_ok = False
_acknowledge_ok = False
_modify_ack_deadline_ok = False
_deadline_exceeded_gax_error = False

def list_subscriptions(self, project, page_size, options=None):
self._list_subscriptions_called_with = (project, page_size, options)
Expand Down Expand Up @@ -1124,6 +1143,9 @@ def pull(self, name, max_messages, return_immediately, options=None):
name, max_messages, return_immediately, options)
if self._random_gax_error:
raise GaxError('error')
if self._deadline_exceeded_gax_error:
raise GaxError('deadline exceeded',
self._make_grpc_deadline_exceeded())
try:
return self._pull_response
except AttributeError:
Expand Down

0 comments on commit 2ad06a8

Please sign in to comment.