Skip to content

Commit dc18bf7

Browse files
feat(PubSub): Add CloudStorageConfig to subscribe function (#6482)
1 parent 1130265 commit dc18bf7

File tree

4 files changed

+236
-18
lines changed

4 files changed

+236
-18
lines changed

src/Connection/Grpc.php

+8
Original file line numberDiff line numberDiff line change
@@ -43,6 +43,7 @@
4343
use Google\Protobuf\Duration;
4444
use Google\Protobuf\FieldMask;
4545
use Google\Protobuf\Timestamp;
46+
use Google\Cloud\PubSub\V1\CloudStorageConfig;
4647

4748
/**
4849
* Implementation of the
@@ -324,6 +325,13 @@ public function createSubscription(array $args)
324325
);
325326
}
326327

328+
if (isset($args['cloudStorageConfig'])) {
329+
$args['cloudStorageConfig'] = $this->serializer->decodeMessage(
330+
new CloudStorageConfig(),
331+
$args['cloudStorageConfig']
332+
);
333+
}
334+
327335
return $this->send([$this->getSubscriberClient(), 'createSubscription'], [
328336
$this->pluck('name', $args),
329337
$this->pluck('topic', $args),

src/Subscription.php

+69
Original file line numberDiff line numberDiff line change
@@ -351,6 +351,35 @@ public function detached(array $options = [])
351351
* be between 0 and 600 seconds. Defaults to 600 seconds.
352352
* @type bool $enableExactlyOnceDelivery Indicates whether to enable
353353
* 'Exactly Once Delivery' on the subscription.
354+
* @type array $cloudStorageConfig If provided, messages will be delivered to Google Cloud Storage.
355+
* @type string $cloudStorageConfig.bucket User-provided name for the Cloud Storage bucket.
356+
* The bucket must be created by the user. The bucket name must be without
357+
* any prefix like "gs://". See the [bucket naming
358+
* requirements] (https://cloud.google.com/storage/docs/buckets#naming).
359+
* @type string $cloudStorageConfig.filenamePrefix
360+
* User-provided prefix for Cloud Storage filename. See the [object naming
361+
* requirements](https://cloud.google.com/storage/docs/objects#naming).
362+
* @type string $cloudStorageConfig.filenameSuffix
363+
* User-provided suffix for Cloud Storage filename. See the [object naming
364+
* requirements](https://cloud.google.com/storage/docs/objects#naming). Must
365+
* not end in "/".
366+
* @type array $cloudStorageConfig.textConfig If present, payloads will be written
367+
* to Cloud Storage as raw text, separated by a newline.
368+
* @type array $cloudStorageConfig.avroConfig If set, message payloads and metadata
369+
* will be written to Cloud Storage in Avro format.
370+
* @type bool $cloudStorageConfig.avroConfig.writeMetadata
371+
* When true, write the subscription name, message_id, publish_time,
372+
* attributes, and ordering_key as additional fields in the output.
373+
* @type Duration|string $cloudStorageConfig.maxDuration The maximum duration
374+
* that can elapse before a new Cloud Storage file is created.
375+
* Min 1 minute, max 10 minutes, default 5 minutes. May not exceed the
376+
* subscription's acknowledgement deadline. If a string is provided,
377+
* it should be as a duration in seconds with up to nine fractional digits,
378+
* terminated by 's', e.g "3.5s"
379+
* @type int|string $cloudStorageConfig.maxBytes The maximum bytes that can be
380+
* written to a Cloud Storage file before a new file is created.
381+
* Min 1 KB, max 10 GiB. The max_bytes limit may be exceeded in cases where
382+
* messages are larger than the limit.
354383
* }
355384
* @return array An array of subscription info
356385
* @throws \InvalidArgumentException
@@ -499,6 +528,35 @@ public function create(array $options = [])
499528
* be between 0 and 600 seconds. Defaults to 600 seconds.
500529
* @type bool $enableExactlyOnceDelivery Indicates whether to enable
501530
* 'Exactly Once Delivery' on the subscription.
531+
* @type array $cloudStorageConfig If provided, messages will be delivered to Google Cloud Storage.
532+
* @type string $cloudStorageConfig.bucket User-provided name for the Cloud Storage bucket.
533+
* The bucket must be created by the user. The bucket name must be without
534+
* any prefix like "gs://". See the [bucket naming
535+
* requirements] (https://cloud.google.com/storage/docs/buckets#naming).
536+
* @type string $cloudStorageConfig.filenamePrefix
537+
* User-provided prefix for Cloud Storage filename. See the [object naming
538+
* requirements](https://cloud.google.com/storage/docs/objects#naming).
539+
* @type string $cloudStorageConfig.filenameSuffix
540+
* User-provided suffix for Cloud Storage filename. See the [object naming
541+
* requirements](https://cloud.google.com/storage/docs/objects#naming). Must
542+
* not end in "/".
543+
* @type array $cloudStorageConfig.textConfig If present, payloads will be written
544+
* to Cloud Storage as raw text, separated by a newline.
545+
* @type array $cloudStorageConfig.avroConfig If set, message payloads and metadata
546+
* will be written to Cloud Storage in Avro format.
547+
* @type bool $cloudStorageConfig.avroConfig.writeMetadata
548+
* When true, write the subscription name, message_id, publish_time,
549+
* attributes, and ordering_key as additional fields in the output.
550+
* @type Duration|string $cloudStorageConfig.maxDuration The maximum duration
551+
* that can elapse before a new Cloud Storage file is created.
552+
* Min 1 minute, max 10 minutes, default 5 minutes. May not exceed the
553+
* subscription's acknowledgement deadline. If a string is provided,
554+
* it should be as a duration in seconds with up to nine fractional digits,
555+
* terminated by 's', e.g "3.5s"
556+
* @type int|string $cloudStorageConfig.maxBytes The maximum bytes that can be
557+
* written to a Cloud Storage file before a new file is created.
558+
* Min 1 KB, max 10 GiB. The max_bytes limit may be exceeded in cases where
559+
* messages are larger than the limit.
502560
* }
503561
* @param array $options [optional] {
504562
* Configuration options.
@@ -1263,6 +1321,17 @@ private function formatSubscriptionDurations(array $options)
12631321
);
12641322
}
12651323

1324+
if (isset($options['cloudStorageConfig']['maxDuration']) &&
1325+
$options['cloudStorageConfig']['maxDuration'] instanceof Duration
1326+
) {
1327+
$duration = $options['cloudStorageConfig']['maxDuration']->get();
1328+
$options['cloudStorageConfig']['maxDuration'] = sprintf(
1329+
'%s.%ss',
1330+
$duration['seconds'],
1331+
$this->convertNanoSecondsToFraction($duration['nanos'], false)
1332+
);
1333+
}
1334+
12661335
return $options;
12671336
}
12681337

tests/System/ManageSubscriptionsTest.php

+96-18
Original file line numberDiff line numberDiff line change
@@ -48,6 +48,71 @@ public function testCreateAndListSubscriptions($client)
4848
$this->assertSubsFound($topic, $subsToCreate);
4949
}
5050

51+
/**
52+
* @dataProvider clientProvider
53+
*/
54+
public function testCreateSubscriptionWithCloudStorageConfig($client)
55+
{
56+
$gcsBucket = getenv('GCP_PHP_PUBSUB_TEST_CLOUD_STORAGE_BUCKET');
57+
if (!$gcsBucket) {
58+
$this->markTestSkipped(
59+
'Must provide `GCP_PHP_PUBSUB_TEST_CLOUD_STORAGE_BUCKET` to run this test.'
60+
);
61+
return;
62+
}
63+
64+
$topic = self::topic($client);
65+
$bucket = [
66+
'bucket' => $gcsBucket,
67+
'avroConfig' => ['writeMetadata' => false],
68+
'maxDuration' => new Duration(150, 1e+9),
69+
'maxBytes' => '2000'
70+
];
71+
72+
$subsToCreate = [
73+
uniqid(self::TESTING_PREFIX),
74+
];
75+
76+
foreach ($subsToCreate as $subToCreate) {
77+
self::$deletionQueue->add($client->subscribe(
78+
$subToCreate,
79+
$topic,
80+
['cloudStorageConfig' => $bucket]
81+
));
82+
}
83+
84+
$this->assertSubsFound($client, $subsToCreate, true);
85+
}
86+
87+
/**
88+
* @dataProvider clientProvider
89+
*/
90+
public function testUpdateSubscriptionWithCloudStorageConfig($client)
91+
{
92+
$gcsBucket = getenv('GCP_PHP_PUBSUB_TEST_CLOUD_STORAGE_BUCKET');
93+
if (!$gcsBucket) {
94+
$this->markTestSkipped(
95+
'Must provide `GCP_PHP_PUBSUB_TEST_CLOUD_STORAGE_BUCKET` to run this test.'
96+
);
97+
return;
98+
}
99+
100+
$topic = self::topic($client);
101+
$subToCreate = uniqid(self::TESTING_PREFIX);
102+
$sub = $client->subscribe($subToCreate, $topic);
103+
self::$deletionQueue->add($sub);
104+
105+
$isSetCloudStorageConfig = isset($sub->info()['cloudStorageConfig']) ?? false;
106+
$bucket = ['bucket' => $gcsBucket];
107+
108+
$sub->update([
109+
'cloudStorageConfig' => $bucket
110+
]);
111+
112+
$this->assertEquals(false, $isSetCloudStorageConfig);
113+
$this->assertEquals(true, $sub->info()['cloudStorageConfig'] ? true : false);
114+
}
115+
51116
/**
52117
* @dataProvider clientProvider
53118
*/
@@ -399,29 +464,42 @@ public function testDetach($client)
399464
$this->assertTrue($sub->detached());
400465
}
401466

402-
private function assertSubsFound($class, $expectedSubs)
403-
{
467+
private function assertSubsFound(
468+
$class,
469+
$expectedSubs,
470+
$assertForStorageConfig = false
471+
) {
404472
$backoff = new ExponentialBackoff(8);
405-
$hasFoundSubs = $backoff->execute(function () use ($class, $expectedSubs) {
406-
$foundSubs = [];
407-
$subs = $class->subscriptions();
408-
409-
foreach ($subs as $sub) {
410-
$nameParts = explode('/', $sub->name());
411-
$sName = end($nameParts);
412-
foreach ($expectedSubs as $key => $expectedSub) {
413-
if ($sName === $expectedSub) {
414-
$foundSubs[$key] = $sName;
473+
$hasFoundSubs = $backoff->execute(
474+
function () use ($class, $expectedSubs, $assertForStorageConfig) {
475+
$foundSubs = [];
476+
$subs = $class->subscriptions();
477+
478+
foreach ($subs as $sub) {
479+
$nameParts = explode('/', $sub->name());
480+
$sName = end($nameParts);
481+
foreach ($expectedSubs as $key => $expectedSub) {
482+
if ($sName === $expectedSub) {
483+
if ($assertForStorageConfig) {
484+
if (isset($sub->info()['cloudStorageConfig'])) {
485+
$foundSubs[$key] = $sName;
486+
}
487+
} else {
488+
$foundSubs[$key] = $sName;
489+
}
490+
}
415491
}
416492
}
417-
}
418493

419-
if (sort($foundSubs) === sort($expectedSubs)) {
420-
return true;
421-
}
494+
if (sort($foundSubs) === sort($expectedSubs)) {
495+
return true;
496+
}
422497

423-
throw new \Exception('Items not found in the allotted number of attempts.');
424-
});
498+
throw new \Exception(
499+
'Items not found in the allotted number of attempts.'
500+
);
501+
}
502+
);
425503

426504
$this->assertTrue($hasFoundSubs);
427505
}

tests/Unit/SubscriptionTest.php

+63
Original file line numberDiff line numberDiff line change
@@ -975,6 +975,69 @@ public function testDetach()
975975
$this->assertEquals([], $this->subscription->detach());
976976
}
977977

978+
public function testCreateSubscriptionWithCloudStorageConfig()
979+
{
980+
$bucket = [
981+
'bucket' => 'pubsub-test-bucket',
982+
'maxDuration' => new Duration(3, 1e+9)
983+
];
984+
$bucketString = [
985+
'bucket' => 'pubsub-test-bucket',
986+
'maxDuration' => '3.1s'
987+
];
988+
$this->connection->createSubscription(Argument::allOf(
989+
Argument::withEntry('foo', 'bar'),
990+
Argument::withEntry('cloudStorageConfig', $bucketString)
991+
))->willReturn([
992+
'name' => self::SUBSCRIPTION,
993+
'topic' => self::TOPIC
994+
])->shouldBeCalledTimes(1);
995+
996+
$this->connection->getSubscription()->shouldNotBeCalled();
997+
998+
$this->subscription->___setProperty('connection', $this->connection->reveal());
999+
1000+
$sub = $this->subscription->create([
1001+
'foo' => 'bar',
1002+
'cloudStorageConfig' => $bucket
1003+
]);
1004+
1005+
$this->assertEquals($sub['name'], self::SUBSCRIPTION);
1006+
$this->assertEquals($sub['topic'], self::TOPIC);
1007+
}
1008+
1009+
public function testUpdateSubscriptionWithCloudStorageConfig()
1010+
{
1011+
$bucket = [
1012+
'bucket' => 'pubsub-test-bucket',
1013+
'maxDuration' => new Duration(3, 1e+9)
1014+
];
1015+
$bucketString = [
1016+
'name' => 'projects/project-id/subscriptions/subscription-name',
1017+
'cloudStorageConfig' => [
1018+
'bucket' => 'pubsub-test-bucket',
1019+
'maxDuration' => '3.1s'
1020+
]
1021+
];
1022+
$this->connection->updateSubscription(
1023+
Argument::containing($bucketString)
1024+
)->willReturn([
1025+
'name' => self::SUBSCRIPTION,
1026+
'topic' => self::TOPIC
1027+
])->shouldBeCalledTimes(1);
1028+
1029+
$this->connection->getSubscription()->shouldNotBeCalled();
1030+
1031+
$this->subscription->___setProperty('connection', $this->connection->reveal());
1032+
1033+
$sub = $this->subscription->update([
1034+
'cloudStorageConfig' => $bucket
1035+
]);
1036+
1037+
$this->assertEquals($sub['name'], self::SUBSCRIPTION);
1038+
$this->assertEquals($sub['topic'], self::TOPIC);
1039+
}
1040+
9781041
// Helper method to generate the exception sent during an invalid EOD operation
9791042
// like acknowledge or modifyAckDeadline
9801043
private function generateEodException($metadata, $failureReason = 'EXACTLY_ONCE_ACKID_FAILURE')

0 commit comments

Comments
 (0)