diff --git a/core/google/cloud/_testing.py b/core/google/cloud/_testing.py index 49eb35ad50ae..880536f8aa45 100644 --- a/core/google/cloud/_testing.py +++ b/core/google/cloud/_testing.py @@ -77,6 +77,10 @@ def _make_grpc_failed_precondition(self): from grpc import StatusCode return self._make_grpc_error(StatusCode.FAILED_PRECONDITION) + def _make_grpc_deadline_exceeded(self): + from grpc import StatusCode + return self._make_grpc_error(StatusCode.DEADLINE_EXCEEDED) + class _GAXPageIterator(object): diff --git a/pubsub/google/cloud/pubsub/_gax.py b/pubsub/google/cloud/pubsub/_gax.py index 57bf688486aa..11ab7d4aac7b 100644 --- a/pubsub/google/cloud/pubsub/_gax.py +++ b/pubsub/google/cloud/pubsub/_gax.py @@ -413,8 +413,15 @@ def subscription_pull(self, subscription_path, return_immediately=False, subscription_path, max_messages, return_immediately=return_immediately) except GaxError as exc: - if exc_to_code(exc.cause) == StatusCode.NOT_FOUND: + code = exc_to_code(exc.cause) + if code == StatusCode.NOT_FOUND: raise NotFound(subscription_path) + elif code == StatusCode.DEADLINE_EXCEEDED: + # NOTE: The JSON-over-HTTP API returns a 200 with an empty + # response when ``return_immediately`` is ``False``, so + # we "mutate" the gRPC error into a non-error to conform. + if not return_immediately: + return [] raise return [_received_message_pb_to_mapping(rmpb) for rmpb in response_pb.received_messages] diff --git a/pubsub/unit_tests/test__gax.py b/pubsub/unit_tests/test__gax.py index 43d9804b90a9..decf9a25c068 100644 --- a/pubsub/unit_tests/test__gax.py +++ b/pubsub/unit_tests/test__gax.py @@ -764,6 +764,24 @@ def test_subscription_pull_defaults_error(self): self.assertFalse(return_immediately) self.assertIsNone(options) + def test_subscription_pull_deadline_exceeded(self): + client = _Client(self.PROJECT) + gax_api = _GAXSubscriberAPI(_deadline_exceeded_gax_error=True) + api = self._make_one(gax_api, client) + + result = api.subscription_pull(self.SUB_PATH) + self.assertEqual(result, []) + + def test_subscription_pull_deadline_exceeded_return_immediately(self): + from google.gax.errors import GaxError + + client = _Client(self.PROJECT) + gax_api = _GAXSubscriberAPI(_deadline_exceeded_gax_error=True) + api = self._make_one(gax_api, client) + + with self.assertRaises(GaxError): + api.subscription_pull(self.SUB_PATH, return_immediately=True) + def test_subscription_acknowledge_hit(self): ACK_ID1 = 'DEADBEEF' ACK_ID2 = 'BEADCAFE' @@ -1075,6 +1093,7 @@ class _GAXSubscriberAPI(_GAXBaseAPI): _modify_push_config_ok = False _acknowledge_ok = False _modify_ack_deadline_ok = False + _deadline_exceeded_gax_error = False def list_subscriptions(self, project, page_size, options=None): self._list_subscriptions_called_with = (project, page_size, options) @@ -1124,6 +1143,9 @@ def pull(self, name, max_messages, return_immediately, options=None): name, max_messages, return_immediately, options) if self._random_gax_error: raise GaxError('error') + if self._deadline_exceeded_gax_error: + raise GaxError('deadline exceeded', + self._make_grpc_deadline_exceeded()) try: return self._pull_response except AttributeError: