Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
10 changes: 10 additions & 0 deletions airflow/providers/google/cloud/hooks/pubsub.py
Original file line number Diff line number Diff line change
Expand Up @@ -57,6 +57,7 @@
PushConfig,
ReceivedMessage,
RetryPolicy,
SchemaSettings,
)


Expand Down Expand Up @@ -182,6 +183,8 @@ def create_topic(
labels: dict[str, str] | None = None,
message_storage_policy: dict | MessageStoragePolicy = None,
kms_key_name: str | None = None,
schema_settings: dict | SchemaSettings = None,
message_retention_duration: str | None = None,
retry: Retry | _MethodDefault = DEFAULT,
timeout: float | None = None,
metadata: Sequence[tuple[str, str]] = (),
Expand All @@ -206,6 +209,11 @@ def create_topic(
to be used to protect access to messages published on this topic.
The expected format is
``projects/*/locations/*/keyRings/*/cryptoKeys/*``.
:param schema_settings: (Optional) Settings for validating messages published against an
existing schema. The expected format is ``projects/*/schemas/*``.
:param message_retention_duration: (Optional) Indicates the minimum duration to retain a
message after it is published to the topic. The expected format is a duration in
seconds with up to nine fractional digits, ending with 's'. Example: "3.5s".
:param retry: (Optional) A retry object used to retry requests.
If None is specified, requests will not be retried.
:param timeout: (Optional) The amount of time, in seconds, to wait for the request
Expand All @@ -228,6 +236,8 @@ def create_topic(
"labels": labels,
"message_storage_policy": message_storage_policy,
"kms_key_name": kms_key_name,
"schema_settings": schema_settings,
"message_retention_duration": message_retention_duration,
},
retry=retry,
timeout=timeout,
Expand Down
7 changes: 7 additions & 0 deletions airflow/providers/google/cloud/operators/pubsub.py
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,7 @@
PushConfig,
ReceivedMessage,
RetryPolicy,
SchemaSettings,
)

from airflow.providers.google.cloud.hooks.pubsub import PubSubHook
Expand Down Expand Up @@ -130,6 +131,8 @@ def __init__(
labels: dict[str, str] | None = None,
message_storage_policy: dict | MessageStoragePolicy = None,
kms_key_name: str | None = None,
schema_settings: dict | SchemaSettings = None,
message_retention_duration: str | None = None,
retry: Retry | _MethodDefault = DEFAULT,
timeout: float | None = None,
metadata: Sequence[tuple[str, str]] = (),
Expand All @@ -144,6 +147,8 @@ def __init__(
self.labels = labels
self.message_storage_policy = message_storage_policy
self.kms_key_name = kms_key_name
self.schema_settings = schema_settings
self.message_retention_duration = message_retention_duration
self.retry = retry
self.timeout = timeout
self.metadata = metadata
Expand All @@ -163,6 +168,8 @@ def execute(self, context: Context) -> None:
labels=self.labels,
message_storage_policy=self.message_storage_policy,
kms_key_name=self.kms_key_name,
schema_settings=self.schema_settings,
message_retention_duration=self.message_retention_duration,
retry=self.retry,
timeout=self.timeout,
metadata=self.metadata,
Expand Down
9 changes: 8 additions & 1 deletion tests/providers/google/cloud/hooks/test_pubsub.py
Original file line number Diff line number Diff line change
Expand Up @@ -103,7 +103,14 @@ def test_create_nonexistent_topic(self, mock_service):
create_method = mock_service.return_value.create_topic
self.pubsub_hook.create_topic(project_id=TEST_PROJECT, topic=TEST_TOPIC)
create_method.assert_called_once_with(
request=dict(name=EXPANDED_TOPIC, labels=LABELS, message_storage_policy=None, kms_key_name=None),
request=dict(
name=EXPANDED_TOPIC,
labels=LABELS,
message_storage_policy=None,
kms_key_name=None,
schema_settings=None,
message_retention_duration=None,
),
retry=DEFAULT,
timeout=None,
metadata=(),
Expand Down
4 changes: 4 additions & 0 deletions tests/providers/google/cloud/operators/test_pubsub.py
Original file line number Diff line number Diff line change
Expand Up @@ -59,6 +59,8 @@ def test_failifexists(self, mock_hook):
labels=None,
message_storage_policy=None,
kms_key_name=None,
schema_settings=None,
message_retention_duration=None,
retry=DEFAULT,
timeout=None,
metadata=(),
Expand All @@ -79,6 +81,8 @@ def test_succeedifexists(self, mock_hook):
labels=None,
message_storage_policy=None,
kms_key_name=None,
schema_settings=None,
message_retention_duration=None,
retry=DEFAULT,
timeout=None,
metadata=(),
Expand Down