diff --git a/samples/snippets/publisher.py b/samples/snippets/publisher.py index e2c63556c..282bbf4db 100644 --- a/samples/snippets/publisher.py +++ b/samples/snippets/publisher.py @@ -60,6 +60,51 @@ def create_topic(project_id: str, topic_id: str) -> None: # [END pubsub_create_topic] +def create_topic_kinesis_ingestion( + project_id: str, + topic_id: str, + stream_arn: str, + consumer_arn: str, + aws_role_arn: str, + gcp_service_account: str, +) -> None: + """Create a new Pub/Sub topic with AWS Kinesis Ingestion Settings.""" + # [START pubsub_quickstart_create_topic_kinesis_ingestion] + # [START pubsub_create_topic_kinesis_ingestion] + from google.cloud import pubsub_v1 + from google.pubsub_v1.types import Topic + from google.pubsub_v1.types import IngestionDataSourceSettings + + # TODO(developer) + # project_id = "your-project-id" + # topic_id = "your-topic-id" + # stream_arn = "your-stream-arn" + # consumer_arn = "your-consumer-arn" + # aws_role_arn = "your-aws-role-arn" + # gcp_service_account = "your-gcp-service-account" + + publisher = pubsub_v1.PublisherClient() + topic_path = publisher.topic_path(project_id, topic_id) + + request = Topic( + name=topic_path, + ingestion_data_source_settings=IngestionDataSourceSettings( + aws_kinesis=IngestionDataSourceSettings.AwsKinesis( + stream_arn=stream_arn, + consumer_arn=consumer_arn, + aws_role_arn=aws_role_arn, + gcp_service_account=gcp_service_account, + ) + ), + ) + + topic = publisher.create_topic(request=request) + + print(f"Created topic: {topic.name} with AWS Kinesis Ingestion Settings") + # [END pubsub_quickstart_create_topic_kinesis_ingestion] + # [END pubsub_create_topic_kinesis_ingestion] + + def delete_topic(project_id: str, topic_id: str) -> None: """Deletes an existing Pub/Sub topic.""" # [START pubsub_delete_topic] @@ -430,6 +475,15 @@ def detach_subscription(project_id: str, subscription_id: str) -> None: create_parser = subparsers.add_parser("create", help=create_topic.__doc__) create_parser.add_argument("topic_id") + create_topic_kinesis_ingestion_parser = subparsers.add_parser( + "create_kinesis_ingestion", help=create_topic_kinesis_ingestion.__doc__ + ) + create_topic_kinesis_ingestion_parser.add_argument("topic_id") + create_topic_kinesis_ingestion_parser.add_argument("stream_arn") + create_topic_kinesis_ingestion_parser.add_argument("consumer_arn") + create_topic_kinesis_ingestion_parser.add_argument("aws_role_arn") + create_topic_kinesis_ingestion_parser.add_argument("gcp_service_account") + delete_parser = subparsers.add_parser("delete", help=delete_topic.__doc__) delete_parser.add_argument("topic_id") @@ -490,6 +544,15 @@ def detach_subscription(project_id: str, subscription_id: str) -> None: list_topics(args.project_id) elif args.command == "create": create_topic(args.project_id, args.topic_id) + elif args.command == "create_kinesis_ingestion": + create_topic_kinesis_ingestion( + args.project_id, + args.topic_id, + args.stream_arn, + args.consumer_arn, + args.aws_role_arn, + args.gcp_service_account, + ) elif args.command == "delete": delete_topic(args.project_id, args.topic_id) elif args.command == "publish": diff --git a/samples/snippets/publisher_test.py b/samples/snippets/publisher_test.py index 0a6311308..fa31a74cf 100644 --- a/samples/snippets/publisher_test.py +++ b/samples/snippets/publisher_test.py @@ -124,6 +124,38 @@ def test_create( assert f"Created topic: {topic_path}" in out +def test_create_kinesis_ingestion( + publisher_client: pubsub_v1.PublisherClient, capsys: CaptureFixture[str] +) -> None: + # The scope of `topic_path` is limited to this function. + topic_path = publisher_client.topic_path(PROJECT_ID, TOPIC_ID) + + # Outside of automated CI tests, these values must be of actual AWS resources for the test to pass. + stream_arn = "arn:aws:kinesis:us-west-2:111111111111:stream/fake-stream-name" + consumer_arn = "arn:aws:kinesis:us-west-2:111111111111:stream/fake-stream-name/consumer/consumer-1:1111111111" + aws_role_arn = "arn:aws:iam::111111111111:role/fake-role-name" + gcp_service_account = ( + "fake-service-account@fake-gcp-project.iam.gserviceaccount.com" + ) + + try: + publisher_client.delete_topic(request={"topic": topic_path}) + except NotFound: + pass + + publisher.create_topic_kinesis_ingestion( + PROJECT_ID, + TOPIC_ID, + stream_arn, + consumer_arn, + aws_role_arn, + gcp_service_account, + ) + + out, _ = capsys.readouterr() + assert f"Created topic: {topic_path} with AWS Kinesis Ingestion Settings" in out + + def test_list(topic_path: str, capsys: CaptureFixture[str]) -> None: publisher.list_topics(PROJECT_ID) out, _ = capsys.readouterr() diff --git a/samples/snippets/requirements.txt b/samples/snippets/requirements.txt index 3fb4e0a69..aba41c7d7 100644 --- a/samples/snippets/requirements.txt +++ b/samples/snippets/requirements.txt @@ -1,4 +1,4 @@ -google-cloud-pubsub==2.19.0 +google-cloud-pubsub==2.20.1 avro==1.11.3 protobuf===4.24.4; python_version == '3.7' protobuf==4.25.1; python_version >= '3.8'