diff --git a/samples/snippets/publisher.py b/samples/snippets/publisher.py index 282bbf4db..6453762a3 100644 --- a/samples/snippets/publisher.py +++ b/samples/snippets/publisher.py @@ -69,7 +69,6 @@ def create_topic_kinesis_ingestion( gcp_service_account: str, ) -> None: """Create a new Pub/Sub topic with AWS Kinesis Ingestion Settings.""" - # [START pubsub_quickstart_create_topic_kinesis_ingestion] # [START pubsub_create_topic_kinesis_ingestion] from google.cloud import pubsub_v1 from google.pubsub_v1.types import Topic @@ -101,10 +100,58 @@ def create_topic_kinesis_ingestion( topic = publisher.create_topic(request=request) print(f"Created topic: {topic.name} with AWS Kinesis Ingestion Settings") - # [END pubsub_quickstart_create_topic_kinesis_ingestion] # [END pubsub_create_topic_kinesis_ingestion] +def update_topic_kinesis_ingestion( + project_id: str, + topic_id: str, + stream_arn: str, + consumer_arn: str, + aws_role_arn: str, + gcp_service_account: str, +) -> None: + """Update Pub/Sub topic with AWS Kinesis Ingestion Settings.""" + # [START pubsub_update_topic_kinesis_ingestion] + from google.cloud import pubsub_v1 + from google.pubsub_v1.types import Topic + from google.pubsub_v1.types import IngestionDataSourceSettings + from google.pubsub_v1.types import UpdateTopicRequest + from google.protobuf import field_mask_pb2 + + # TODO(developer) + # project_id = "your-project-id" + # topic_id = "your-topic-id" + # stream_arn = "your-stream-arn" + # consumer_arn = "your-consumer-arn" + # aws_role_arn = "your-aws-role-arn" + # gcp_service_account = "your-gcp-service-account" + + publisher = pubsub_v1.PublisherClient() + topic_path = publisher.topic_path(project_id, topic_id) + + update_request = UpdateTopicRequest( + topic=Topic( + name=topic_path, + ingestion_data_source_settings=IngestionDataSourceSettings( + aws_kinesis=IngestionDataSourceSettings.AwsKinesis( + stream_arn=stream_arn, + consumer_arn=consumer_arn, + aws_role_arn=aws_role_arn, + gcp_service_account=gcp_service_account, + ) + ), + ), + update_mask=field_mask_pb2.FieldMask(paths=["ingestion_data_source_settings"]), + ) + + topic = publisher.update_topic(request=update_request) + print(f"Updated topic: {topic.name} with AWS Kinesis Ingestion Settings") + + +# [END pubsub_update_topic_kinesis_ingestion] + + def delete_topic(project_id: str, topic_id: str) -> None: """Deletes an existing Pub/Sub topic.""" # [START pubsub_delete_topic] @@ -484,6 +531,15 @@ def detach_subscription(project_id: str, subscription_id: str) -> None: create_topic_kinesis_ingestion_parser.add_argument("aws_role_arn") create_topic_kinesis_ingestion_parser.add_argument("gcp_service_account") + update_topic_kinesis_ingestion_parser = subparsers.add_parser( + "update_kinesis_ingestion", help=update_topic_kinesis_ingestion.__doc__ + ) + update_topic_kinesis_ingestion_parser.add_argument("topic_id") + update_topic_kinesis_ingestion_parser.add_argument("stream_arn") + update_topic_kinesis_ingestion_parser.add_argument("consumer_arn") + update_topic_kinesis_ingestion_parser.add_argument("aws_role_arn") + update_topic_kinesis_ingestion_parser.add_argument("gcp_service_account") + delete_parser = subparsers.add_parser("delete", help=delete_topic.__doc__) delete_parser.add_argument("topic_id") @@ -553,6 +609,15 @@ def detach_subscription(project_id: str, subscription_id: str) -> None: args.aws_role_arn, args.gcp_service_account, ) + elif args.command == "update_kinesis_ingestion": + update_topic_kinesis_ingestion( + args.project_id, + args.topic_id, + args.stream_arn, + args.consumer_arn, + args.aws_role_arn, + args.gcp_service_account, + ) elif args.command == "delete": delete_topic(args.project_id, args.topic_id) elif args.command == "publish": diff --git a/samples/snippets/publisher_test.py b/samples/snippets/publisher_test.py index fa31a74cf..1e673f134 100644 --- a/samples/snippets/publisher_test.py +++ b/samples/snippets/publisher_test.py @@ -123,6 +123,9 @@ def test_create( out, _ = capsys.readouterr() assert f"Created topic: {topic_path}" in out + # Clean up resource created for the test. + publisher_client.delete_topic(request={"topic": topic_path}) + def test_create_kinesis_ingestion( publisher_client: pubsub_v1.PublisherClient, capsys: CaptureFixture[str] @@ -155,6 +158,49 @@ def test_create_kinesis_ingestion( out, _ = capsys.readouterr() assert f"Created topic: {topic_path} with AWS Kinesis Ingestion Settings" in out + # Clean up resource created for the test. + publisher_client.delete_topic(request={"topic": topic_path}) + + +def test_update_kinesis_ingestion( + publisher_client: pubsub_v1.PublisherClient, capsys: CaptureFixture[str] +) -> None: + # The scope of `topic_path` is limited to this function. + topic_path = publisher_client.topic_path(PROJECT_ID, TOPIC_ID) + + # Outside of automated CI tests, these values must be of actual AWS resources for the test to pass. + stream_arn = "arn:aws:kinesis:us-west-2:111111111111:stream/fake-stream-name" + consumer_arn = "arn:aws:kinesis:us-west-2:111111111111:stream/fake-stream-name/consumer/consumer-1:1111111111" + aws_role_arn = "arn:aws:iam::111111111111:role/fake-role-name" + gcp_service_account = ( + "fake-service-account@fake-gcp-project.iam.gserviceaccount.com" + ) + + try: + publisher_client.delete_topic(request={"topic": topic_path}) + except NotFound: + pass + + publisher.create_topic(PROJECT_ID, TOPIC_ID) + + out, _ = capsys.readouterr() + assert f"Created topic: {topic_path}" in out + + publisher.update_topic_kinesis_ingestion( + PROJECT_ID, + TOPIC_ID, + stream_arn, + consumer_arn, + aws_role_arn, + gcp_service_account, + ) + + out, _ = capsys.readouterr() + assert f"Updated topic: {topic_path} with AWS Kinesis Ingestion Settings" in out + + # Clean up resource created for the test. + publisher_client.delete_topic(request={"topic": topic_path}) + def test_list(topic_path: str, capsys: CaptureFixture[str]) -> None: publisher.list_topics(PROJECT_ID)