Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Remap new Gax conflict error code #3443

Merged
merged 3 commits into from
Jun 7, 2017
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Prev Previous commit
Next Next commit
Cover both possible gRPC conflict error codes.
Closes #3175.
  • Loading branch information
tseaver committed May 19, 2017
commit 7bce954ab24ed968b1728f344827ff32815ef240
9 changes: 6 additions & 3 deletions pubsub/google/cloud/pubsub/_gax.py
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,9 @@
from google.cloud.pubsub.subscription import Subscription
from google.cloud.pubsub.topic import Topic

_CONFLICT_ERROR_CODES = (
StatusCode.FAILED_PRECONDITION, StatusCode.ALREADY_EXISTS)


class _PublisherAPI(object):
"""Helper mapping publisher-related APIs.
Expand Down Expand Up @@ -105,7 +108,7 @@ def topic_create(self, topic_path):
try:
topic_pb = self._gax_api.create_topic(topic_path)
except GaxError as exc:
if exc_to_code(exc.cause) == StatusCode.FAILED_PRECONDITION:
if exc_to_code(exc.cause) in _CONFLICT_ERROR_CODES:
raise Conflict(topic_path)
raise
return {'name': topic_pb.name}
Expand Down Expand Up @@ -337,7 +340,7 @@ def subscription_create(self, subscription_path, topic_path,
retain_acked_messages=retain_acked_messages,
message_retention_duration=message_retention_duration)
except GaxError as exc:
if exc_to_code(exc.cause) == StatusCode.FAILED_PRECONDITION:
if exc_to_code(exc.cause) in _CONFLICT_ERROR_CODES:
raise Conflict(topic_path)
raise
return MessageToDict(sub_pb)
Expand Down Expand Up @@ -584,7 +587,7 @@ def snapshot_create(self, snapshot_path, subscription_path):
snapshot_pb = self._gax_api.create_snapshot(
snapshot_path, subscription_path)
except GaxError as exc:
if exc_to_code(exc.cause) == StatusCode.FAILED_PRECONDITION:
if exc_to_code(exc.cause) in _CONFLICT_ERROR_CODES:
raise Conflict(snapshot_path)
elif exc_to_code(exc.cause) == StatusCode.NOT_FOUND:
raise NotFound(subscription_path)
Expand Down
81 changes: 72 additions & 9 deletions pubsub/tests/unit/test__gax.py
Original file line number Diff line number Diff line change
Expand Up @@ -141,10 +141,24 @@ def test_topic_create(self):
self.assertEqual(topic_path, self.TOPIC_PATH)
self.assertIsNone(options)

def test_topic_create_failed_precondition(self):
from google.cloud.exceptions import Conflict

gax_api = _GAXPublisherAPI(_create_topic_failed_precondition=True)
client = _Client(self.PROJECT)
api = self._make_one(gax_api, client)

with self.assertRaises(Conflict):
api.topic_create(self.TOPIC_PATH)

topic_path, options = gax_api._create_topic_called_with
self.assertEqual(topic_path, self.TOPIC_PATH)
self.assertIsNone(options)

def test_topic_create_already_exists(self):
from google.cloud.exceptions import Conflict

gax_api = _GAXPublisherAPI(_create_topic_conflict=True)
gax_api = _GAXPublisherAPI(_create_topic_already_exists=True)
client = _Client(self.PROJECT)
api = self._make_one(gax_api, client)

Expand Down Expand Up @@ -597,11 +611,35 @@ def test_subscription_create_optional_params(self):
expected_message_retention_duration.total_seconds())
self.assertIsNone(options)

def test_subscription_create_failed_precondition(self):
from google.cloud.exceptions import Conflict

DEADLINE = 600
gax_api = _GAXSubscriberAPI(
_create_subscription_failed_precondition=True)
client = _Client(self.PROJECT)
api = self._make_one(gax_api, client)

with self.assertRaises(Conflict):
api.subscription_create(
self.SUB_PATH, self.TOPIC_PATH, DEADLINE, self.PUSH_ENDPOINT)

(name, topic, push_config, ack_deadline, retain_acked_messages,
message_retention_duration, options) = (
gax_api._create_subscription_called_with)
self.assertEqual(name, self.SUB_PATH)
self.assertEqual(topic, self.TOPIC_PATH)
self.assertEqual(push_config.push_endpoint, self.PUSH_ENDPOINT)
self.assertEqual(ack_deadline, DEADLINE)
self.assertIsNone(retain_acked_messages)
self.assertIsNone(message_retention_duration)
self.assertIsNone(options)

def test_subscription_create_already_exists(self):
from google.cloud.exceptions import Conflict

DEADLINE = 600
gax_api = _GAXSubscriberAPI(_create_subscription_conflict=True)
gax_api = _GAXSubscriberAPI(_create_subscription_already_exists=True)
client = _Client(self.PROJECT)
api = self._make_one(gax_api, client)

Expand Down Expand Up @@ -1121,10 +1159,26 @@ def test_snapshot_create(self):
self.assertEqual(subscription, self.SUB_PATH)
self.assertIsNone(options)

def test_snapshot_create_failed_precondition(self):
from google.cloud.exceptions import Conflict

gax_api = _GAXSubscriberAPI(_create_snapshot_failed_precondition=True)
client = _Client(self.PROJECT)
api = self._make_one(gax_api, client)

with self.assertRaises(Conflict):
api.snapshot_create(self.SNAPSHOT_PATH, self.SUB_PATH)

name, subscription, options = (
gax_api._create_snapshot_called_with)
self.assertEqual(name, self.SNAPSHOT_PATH)
self.assertEqual(subscription, self.SUB_PATH)
self.assertIsNone(options)

def test_snapshot_create_already_exists(self):
from google.cloud.exceptions import Conflict

gax_api = _GAXSubscriberAPI(_create_snapshot_conflict=True)
gax_api = _GAXSubscriberAPI(_create_snapshot_already_exists=True)
client = _Client(self.PROJECT)
api = self._make_one(gax_api, client)

Expand Down Expand Up @@ -1371,7 +1425,8 @@ def mock_insecure_channel(host):

class _GAXPublisherAPI(_GAXBaseAPI):

_create_topic_conflict = False
_create_topic_failed_precondition = False
_create_topic_already_exists = False

def list_topics(self, name, page_size, options):
self._list_topics_called_with = name, page_size, options
Expand All @@ -1383,8 +1438,10 @@ def create_topic(self, name, options=None):
self._create_topic_called_with = name, options
if self._random_gax_error:
raise GaxError('error')
if self._create_topic_conflict:
if self._create_topic_failed_precondition:
raise GaxError('conflict', self._make_grpc_failed_precondition())
if self._create_topic_already_exists:
raise GaxError('conflict', self._make_grpc_already_exists())
return self._create_topic_response

def get_topic(self, name, options=None):
Expand Down Expand Up @@ -1432,8 +1489,10 @@ def list_topic_subscriptions(self, topic, page_size, options=None):

class _GAXSubscriberAPI(_GAXBaseAPI):

_create_snapshot_conflict = False
_create_subscription_conflict = False
_create_snapshot_already_exists = False
_create_snapshot_failed_precondition = False
_create_subscription_already_exists = False
_create_subscription_failed_precondition = False
_modify_push_config_ok = False
_acknowledge_ok = False
_modify_ack_deadline_ok = False
Expand All @@ -1456,8 +1515,10 @@ def create_subscription(self, name, topic, push_config=None,
retain_acked_messages, message_retention_duration, options)
if self._random_gax_error:
raise GaxError('error')
if self._create_subscription_conflict:
if self._create_subscription_failed_precondition:
raise GaxError('conflict', self._make_grpc_failed_precondition())
if self._create_subscription_already_exists:
raise GaxError('conflict', self._make_grpc_already_exists())
return self._create_subscription_response

def get_subscription(self, name, options=None):
Expand Down Expand Up @@ -1533,7 +1594,9 @@ def create_snapshot(self, name, subscription, options=None):
self._create_snapshot_called_with = (name, subscription, options)
if self._random_gax_error:
raise GaxError('error')
if self._create_snapshot_conflict:
if self._create_snapshot_already_exists:
raise GaxError('conflict', self._make_grpc_already_exists())
if self._create_snapshot_failed_precondition:
raise GaxError('conflict', self._make_grpc_failed_precondition())
if self._snapshot_create_subscription_miss:
raise GaxError('miss', self._make_grpc_not_found())
Expand Down