diff --git a/samples/snippets/subscriber.py b/samples/snippets/subscriber.py index 2cd03b785..79cd0ebf1 100644 --- a/samples/snippets/subscriber.py +++ b/samples/snippets/subscriber.py @@ -94,6 +94,86 @@ def create_subscription(project_id: str, topic_id: str, subscription_id: str) -> # [END pubsub_create_pull_subscription] +def optimistic_subscribe( + project_id: str, + topic_id: str, + subscription_id: str, + timeout: Optional[float] = None, +) -> None: + """Optimistically subscribe to messages instead of making calls to verify existence + of a subscription first and then subscribing to messages from it. This avoids admin + operation calls to verify the existence of a subscription and reduces the probability + of running out of quota for admin operations.""" + # [START pubsub_optimistic_subscribe] + from google.api_core.exceptions import NotFound + from google.cloud import pubsub_v1 + from concurrent.futures import TimeoutError + + # TODO(developer) + # project_id = "your-project-id" + # subscription_id = "your-subscription-id" + # Number of seconds the subscriber should listen for messages + # timeout = 5.0 + # topic_id = "your-topic-id" + + # Create a subscriber client. + subscriber = pubsub_v1.SubscriberClient() + + # The `subscription_path` method creates a fully qualified identifier + # in the form `projects/{project_id}/subscriptions/{subscription_id}` + subscription_path = subscriber.subscription_path(project_id, subscription_id) + + # Define callback to be called when a message is received. + def callback(message: pubsub_v1.subscriber.message.Message) -> None: + # Ack message after processing it. + message.ack() + + # Wrap subscriber in a 'with' block to automatically call close() when done. + with subscriber: + try: + # Optimistically subscribe to messages on the subscription. + streaming_pull_future = subscriber.subscribe( + subscription_path, callback=callback + ) + streaming_pull_future.result(timeout=timeout) + except TimeoutError: + print("Successfully subscribed until the timeout passed.") + streaming_pull_future.cancel() # Trigger the shutdown. + streaming_pull_future.result() # Block until the shutdown is complete. + except NotFound: + print(f"Subscription {subscription_path} not found, creating it.") + + try: + # If the subscription does not exist, then create it. + publisher = pubsub_v1.PublisherClient() + topic_path = publisher.topic_path(project_id, topic_id) + subscription = subscriber.create_subscription( + request={"name": subscription_path, "topic": topic_path} + ) + + if subscription: + print(f"Subscription {subscription.name} created") + else: + raise ValueError("Subscription creation failed.") + + # Subscribe on the created subscription. + try: + streaming_pull_future = subscriber.subscribe( + subscription.name, callback=callback + ) + streaming_pull_future.result(timeout=timeout) + except TimeoutError: + streaming_pull_future.cancel() # Trigger the shutdown. + streaming_pull_future.result() # Block until the shutdown is complete. + except Exception as e: + print( + f"Exception occurred when creating subscription and subscribing to it: {e}" + ) + except Exception as e: + print(f"Exception occurred when attempting optimistic subscribe: {e}") + # [END pubsub_optimistic_subscribe] + + def create_subscription_with_dead_letter_topic( project_id: str, topic_id: str, @@ -1161,6 +1241,15 @@ def callback(message: pubsub_v1.subscriber.message.Message) -> None: remove_dead_letter_policy_parser.add_argument("topic_id") remove_dead_letter_policy_parser.add_argument("subscription_id") + optimistic_subscribe_parser = subparsers.add_parser( + "optimistic-subscribe", help=optimistic_subscribe.__doc__ + ) + optimistic_subscribe_parser.add_argument("topic_id") + optimistic_subscribe_parser.add_argument("subscription_id") + optimistic_subscribe_parser.add_argument( + "timeout", default=None, type=float, nargs="?" + ) + receive_parser = subparsers.add_parser("receive", help=receive_messages.__doc__) receive_parser.add_argument("subscription_id") receive_parser.add_argument("timeout", default=None, type=float, nargs="?") @@ -1303,6 +1392,10 @@ def callback(message: pubsub_v1.subscriber.message.Message) -> None: ) elif args.command == "remove-dead-letter-policy": remove_dead_letter_policy(args.project_id, args.topic_id, args.subscription_id) + elif args.command == "optimistic-subscribe": + optimistic_subscribe( + args.project_id, args.topic_id, args.subscription_id, args.timeout + ) elif args.command == "receive": receive_messages(args.project_id, args.subscription_id, args.timeout) elif args.command == "receive-custom-attributes": diff --git a/samples/snippets/subscriber_test.py b/samples/snippets/subscriber_test.py index 53fefa109..86f7a94ce 100644 --- a/samples/snippets/subscriber_test.py +++ b/samples/snippets/subscriber_test.py @@ -234,6 +234,56 @@ def test_create_subscription( subscriber_client.delete_subscription(request={"subscription": subscription_path}) +def test_optimistic_subscribe( + subscriber_client: pubsub_v1.SubscriberClient, + topic: str, + publisher_client: pubsub_v1.PublisherClient, + capsys: CaptureFixture[str], +) -> None: + subscription_id = f"subscription_for_optimistic_subscribe-{PY_VERSION}-{UUID}" + subscription_path = subscriber_client.subscription_path(PROJECT_ID, subscription_id) + # Ensure there is no pre-existing subscription. + # So that we can test the case where optimistic subscribe fails. + try: + subscriber_client.delete_subscription( + request={"subscription": subscription_path} + ) + except NotFound: + pass + + # Invoke optimistic_subscribe when the subscription is not present. + # This tests scenario where optimistic subscribe fails. + subscriber.optimistic_subscribe(PROJECT_ID, TOPIC, subscription_id, 5) + out, _ = capsys.readouterr() + # Verify optimistic subscription failed. + assert f"Subscription {subscription_path} not found, creating it." in out + # Verify that subscription created due to optimistic subscribe failure. + assert f"Subscription {subscription_path} created" in out + # Verify that subscription didn't already exist. + assert "Successfully subscribed until the timeout passed." not in out + + # Invoke optimistic_subscribe when the subscription is present. + # This tests scenario where optimistic subscribe succeeds. + subscriber.optimistic_subscribe(PROJECT_ID, TOPIC, subscription_id, 5) + + out, _ = capsys.readouterr() + # Verify optimistic subscription succeeded. + assert f"Subscription {subscription_path} not found, creating it." not in out + # Verify that subscription was not created due to optimistic subscribe failure. + assert f"Subscription {subscription_path} created" not in out + # Verify that subscription already existed. + assert "Successfully subscribed until the timeout passed." in out + + # Test case where optimistic subscribe throws an exception other than NotFound + # or TimeoutError. + subscriber.optimistic_subscribe(PROJECT_ID, TOPIC, "123", 5) + out, _ = capsys.readouterr() + assert "Exception occurred when attempting optimistic subscribe:" in out + + # Clean up resources created during test. + subscriber_client.delete_subscription(request={"subscription": subscription_path}) + + def test_create_subscription_with_dead_letter_policy( subscriber_client: pubsub_v1.SubscriberClient, dead_letter_topic: str,