Skip to content
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
171 changes: 171 additions & 0 deletions pubsub/tests/system.py
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@

import datetime
import itertools
import operator as op
import threading
import time

Expand Down Expand Up @@ -183,6 +184,176 @@ def test_subscribe_to_messages_async_callbacks(
future.cancel()


def test_creating_subscriptions_with_non_default_settings(
publisher, subscriber, project, topic_path, subscription_path, cleanup
):
# Make sure the topic and subscription get deleted.
cleanup.append((publisher.delete_topic, topic_path))
cleanup.append((subscriber.delete_subscription, subscription_path))

# create a topic and a subscription, customize the latter's policy
publisher.create_topic(topic_path)

msg_retention_duration = {"seconds": 911}
expiration_policy = {"ttl": {"seconds": 90210}}
new_subscription = subscriber.create_subscription(
subscription_path,
topic_path,
ack_deadline_seconds=30,
retain_acked_messages=True,
message_retention_duration=msg_retention_duration,
expiration_policy=expiration_policy,
)

# fetch the subscription and check its settings
project_path = subscriber.project_path(project)
subscriptions = subscriber.list_subscriptions(project_path)

subscriptions = [sub for sub in subscriptions if sub.topic == topic_path]
assert len(subscriptions) == 1
subscription = subscriptions[0]

assert subscription == new_subscription
assert subscription.ack_deadline_seconds == 30
assert subscription.retain_acked_messages
assert subscription.message_retention_duration.seconds == 911
assert subscription.expiration_policy.ttl.seconds == 90210


def test_listing_project_topics(publisher, project, cleanup):
topic_paths = [
publisher.topic_path(project, "topic-{}".format(i) + unique_resource_id("."))
for i in range(1, 4)
]
for topic in topic_paths:
cleanup.append((publisher.delete_topic, topic))
publisher.create_topic(topic)

project_path = publisher.project_path(project)
project_topics = publisher.list_topics(project_path)
project_topics = set(t.name for t in project_topics)

# there might be other topics in the project, thus do a "is subset" check
assert set(topic_paths) <= project_topics


def test_listing_project_subscriptions(publisher, subscriber, project, cleanup):
# create topics
topic_paths = [
publisher.topic_path(project, "topic-1" + unique_resource_id(".")),
publisher.topic_path(project, "topic-2" + unique_resource_id(".")),
]
for topic in topic_paths:
cleanup.append((publisher.delete_topic, topic))
publisher.create_topic(topic)

# create subscriptions
subscription_paths = [
subscriber.subscription_path(
project, "sub-{}".format(i) + unique_resource_id(".")
)
for i in range(1, 4)
]
for i, subscription in enumerate(subscription_paths):
topic = topic_paths[i % 2]
cleanup.append((subscriber.delete_subscription, subscription))
subscriber.create_subscription(subscription, topic)

# retrieve subscriptions and check that the list matches the expected
project_path = subscriber.project_path(project)
subscriptions = subscriber.list_subscriptions(project_path)
subscriptions = set(s.name for s in subscriptions)

# there might be other subscriptions in the project, thus do a "is subset" check
assert set(subscription_paths) <= subscriptions


def test_listing_topic_subscriptions(publisher, subscriber, project, cleanup):
# create topics
topic_paths = [
publisher.topic_path(project, "topic-1" + unique_resource_id(".")),
publisher.topic_path(project, "topic-2" + unique_resource_id(".")),
]
for topic in topic_paths:
cleanup.append((publisher.delete_topic, topic))
publisher.create_topic(topic)

# create subscriptions
subscription_paths = [
subscriber.subscription_path(
project, "sub-{}".format(i) + unique_resource_id(".")
)
for i in range(1, 4)
]
for i, subscription in enumerate(subscription_paths):
topic = topic_paths[i % 2]
cleanup.append((subscriber.delete_subscription, subscription))
subscriber.create_subscription(subscription, topic)

# retrieve subscriptions and check that the list matches the expected
subscriptions = publisher.list_topic_subscriptions(topic_paths[0])
subscriptions = set(subscriptions)

assert subscriptions == {subscription_paths[0], subscription_paths[2]}


def test_managing_topic_iam_policy(publisher, topic_path, cleanup):
cleanup.append((publisher.delete_topic, topic_path))

# create a topic and customize its policy
publisher.create_topic(topic_path)
topic_policy = publisher.get_iam_policy(topic_path)

topic_policy.bindings.add(role="roles/pubsub.editor", members=["domain:google.com"])
topic_policy.bindings.add(
role="roles/pubsub.viewer", members=["group:cloud-logs@google.com"]
)
new_policy = publisher.set_iam_policy(topic_path, topic_policy)

# fetch the topic policy again and check its values
topic_policy = publisher.get_iam_policy(topic_path)
assert topic_policy.bindings == new_policy.bindings
assert len(topic_policy.bindings) == 2

bindings = sorted(topic_policy.bindings, key=op.attrgetter("role"))
assert bindings[0].role == "roles/pubsub.editor"
assert bindings[0].members == ["domain:google.com"]

assert bindings[1].role == "roles/pubsub.viewer"
assert bindings[1].members == ["group:cloud-logs@google.com"]


def test_managing_subscription_iam_policy(
publisher, subscriber, topic_path, subscription_path, cleanup
):
# Make sure the topic and subscription get deleted.
cleanup.append((publisher.delete_topic, topic_path))
cleanup.append((subscriber.delete_subscription, subscription_path))

# create a topic and a subscription, customize the latter's policy
publisher.create_topic(topic_path)
subscriber.create_subscription(subscription_path, topic_path)
sub_policy = subscriber.get_iam_policy(subscription_path)

sub_policy.bindings.add(role="roles/pubsub.editor", members=["domain:google.com"])
sub_policy.bindings.add(
role="roles/pubsub.viewer", members=["group:cloud-logs@google.com"]
)
new_policy = subscriber.set_iam_policy(subscription_path, sub_policy)

# fetch the subscription policy again and check its values
sub_policy = subscriber.get_iam_policy(subscription_path)
assert sub_policy.bindings == new_policy.bindings
assert len(sub_policy.bindings) == 2

bindings = sorted(sub_policy.bindings, key=op.attrgetter("role"))
assert bindings[0].role == "roles/pubsub.editor"
assert bindings[0].members == ["domain:google.com"]

assert bindings[1].role == "roles/pubsub.viewer"
assert bindings[1].members == ["group:cloud-logs@google.com"]


class TestStreamingPull(object):
def test_streaming_pull_callback_error_propagation(
self, publisher, topic_path, subscriber, subscription_path, cleanup
Expand Down