Skip to content

Commit c008725

Browse files
committed
Added support for multi subscriber
1 parent ae1e14c commit c008725

File tree

4 files changed

+52
-33
lines changed

4 files changed

+52
-33
lines changed

composer.json

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -20,7 +20,7 @@
2020
"require": {
2121
"php" : ">=7.1",
2222
"google/cloud-pubsub": "^1.1",
23-
"illuminate/queue": "^5.6"
23+
"illuminate/queue": "5.6.*"
2424
},
2525
"require-dev": {
2626
"phpunit/phpunit": "^7.1"

src/Connectors/PubSubConnector.php

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -24,10 +24,10 @@ class PubSubConnector implements ConnectorInterface
2424
public function connect(array $config)
2525
{
2626
$gcp_config = $this->transformConfig($config);
27-
2827
return new PubSubQueue(
2928
new PubSubClient($gcp_config),
30-
$config['queue'] ?? $this->default_queue
29+
$config['queue'] ?? $this->default_queue,
30+
$gcp_config
3131
);
3232
}
3333

src/PubSubQueue.php

Lines changed: 32 additions & 23 deletions
Original file line numberDiff line numberDiff line change
@@ -26,16 +26,23 @@ class PubSubQueue extends Queue implements QueueContract
2626
*/
2727
protected $default;
2828

29+
/**
30+
* PubSub config
31+
*/
32+
protected $config;
33+
34+
2935
/**
3036
* Create a new GCP PubSub instance.
3137
*
3238
* @param \Google\Cloud\PubSub\PubSubClient $pubsub
3339
* @param string $default
3440
*/
35-
public function __construct(PubSubClient $pubsub, $default)
41+
public function __construct(PubSubClient $pubsub, $default, $config)
3642
{
3743
$this->pubsub = $pubsub;
3844
$this->default = $default;
45+
$this->config = $config;
3946
}
4047

4148
/**
@@ -47,7 +54,7 @@ public function __construct(PubSubClient $pubsub, $default)
4754
*
4855
* @return int
4956
*/
50-
public function size($queue = null)
57+
public function size($subscriber = null)
5158
{
5259
return 0;
5360
}
@@ -61,9 +68,9 @@ public function size($queue = null)
6168
*
6269
* @return mixed
6370
*/
64-
public function push($job, $data = '', $queue = null)
71+
public function push($job, $data = '', $subscriber = null)
6572
{
66-
return $this->pushRaw($this->createPayload($job, $this->getQueue($queue), $data), $queue);
73+
return $this->pushRaw($this->createPayload($job, $data), $subscriber);
6774
}
6875

6976
/**
@@ -75,9 +82,9 @@ public function push($job, $data = '', $queue = null)
7582
*
7683
* @return array
7784
*/
78-
public function pushRaw($payload, $queue = null, array $options = [])
85+
public function pushRaw($payload, $subscriber = null, array $options = [])
7986
{
80-
$topic = $this->getTopic($queue, true);
87+
$topic = $this->getTopic($subscriber, true);
8188

8289
$this->subscribeToTopic($topic);
8390

@@ -100,34 +107,34 @@ public function pushRaw($payload, $queue = null, array $options = [])
100107
* @param \DateTimeInterface|\DateInterval|int $delay
101108
* @param string|object $job
102109
* @param mixed $data
103-
* @param string $queue
110+
* @param string $subscriber
104111
*
105112
* @return mixed
106113
*/
107-
public function later($delay, $job, $data = '', $queue = null)
114+
public function later($delay, $job, $data = '', $subscriber = null)
108115
{
109116
return $this->pushRaw(
110-
$this->createPayload($job, $this->getQueue($queue), $data),
111-
$queue,
117+
$this->createPayload($job, $data),
118+
$subscriber,
112119
['available_at' => $this->availableAt($delay)]
113120
);
114121
}
115122

116123
/**
117124
* Pop the next job off of the queue.
118125
*
119-
* @param string $queue
126+
* @param string $subscriber
120127
* @return \Illuminate\Contracts\Queue\Job|null
121128
*/
122-
public function pop($queue = null)
129+
public function pop($subscriber = null)
123130
{
124-
$topic = $this->getTopic($this->getQueue($queue));
131+
$topic = $this->getTopic($this->getQueue($subscriber));
125132

126133
if (! $topic->exists()) {
127134
return;
128135
}
129136

130-
$subscription = $topic->subscription($this->getSubscriberName());
137+
$subscription = $topic->subscription($this->getSubscriberName($subscriber));
131138
$messages = $subscription->pull([
132139
'returnImmediately' => true,
133140
'maxMessages' => 1,
@@ -139,7 +146,7 @@ public function pop($queue = null)
139146
$this,
140147
$messages[0],
141148
$this->connectionName,
142-
$this->getQueue($queue)
149+
$this->getQueue($subscriber)
143150
);
144151
}
145152
}
@@ -176,7 +183,7 @@ public function bulk($jobs, $data = '', $queue = null)
176183
*/
177184
public function acknowledge(Message $message, $queue = null)
178185
{
179-
$subscription = $this->getTopic($this->getQueue($queue))->subscription($this->getSubscriberName());
186+
$subscription = $this->getTopic($this->getQueue($queue))->subscription($this->getSubscriberName($queue));
180187
$subscription->acknowledge($message);
181188
}
182189

@@ -191,7 +198,7 @@ public function acknowledge(Message $message, $queue = null)
191198
public function acknowledgeAndPublish(Message $message, $queue = null, $options = [], $delay = 0)
192199
{
193200
$topic = $this->getTopic($this->getQueue($queue));
194-
$subscription = $topic->subscription($this->getSubscriberName());
201+
$subscription = $topic->subscription($this->getSubscriberName($queue));
195202

196203
$subscription->acknowledge($message);
197204

@@ -215,9 +222,9 @@ public function acknowledgeAndPublish(Message $message, $queue = null, $options
215222
*
216223
* @throws \Illuminate\Queue\InvalidPayloadException
217224
*/
218-
protected function createPayload($job, $queue, $data = '')
225+
protected function createPayload($job, $data = '')
219226
{
220-
$payload = parent::createPayload($job, $this->getQueue($queue), $data);
227+
$payload = parent::createPayload($job, $data);
221228

222229
return base64_encode($payload);
223230
}
@@ -230,9 +237,9 @@ protected function createPayload($job, $queue, $data = '')
230237
* @param mixed $data
231238
* @return array
232239
*/
233-
protected function createPayloadArray($job, $queue, $data = '')
240+
protected function createPayloadArray($job, $data = '')
234241
{
235-
return array_merge(parent::createPayloadArray($job, $this->getQueue($queue), $data), [
242+
return array_merge(parent::createPayloadArray($job, $data), [
236243
'id' => $this->getRandomId(),
237244
]);
238245
}
@@ -267,7 +274,6 @@ public function getTopic($queue, $create = false)
267274
public function subscribeToTopic(Topic $topic)
268275
{
269276
$subscription = $topic->subscription($this->getSubscriberName());
270-
271277
if (! $subscription->exists()) {
272278
$subscription = $topic->subscribe($this->getSubscriberName());
273279
}
@@ -282,8 +288,11 @@ public function subscribeToTopic(Topic $topic)
282288
*
283289
* @return string
284290
*/
285-
public function getSubscriberName()
291+
public function getSubscriberName($queue = null)
286292
{
293+
if ($this->config && $this->config['subscribers'] && $queue && isset($this->config['subscribers'][$queue])) {
294+
return $this->config['subscribers'][$queue];
295+
}
287296
return 'subscriber';
288297
}
289298

tests/Unit/PubSubQueueTests.php

Lines changed: 17 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -29,9 +29,19 @@ public function setUp()
2929
$this->client = $this->createMock(PubSubClient::class);
3030
$this->subscription = $this->createMock(Subscription::class);
3131
$this->message = $this->createMock(Message::class);
32-
32+
$this->config = [
33+
'queue' => 'test',
34+
'project_id' => 'the-project-id',
35+
'retries' => 1,
36+
'request_timeout' => 60,
37+
'subscribers' => [
38+
'sub1' => 'topic1',
39+
'sub2' => 'topic2',
40+
'sub3' => 'topic1',
41+
]
42+
];
3343
$this->queue = $this->getMockBuilder(PubSubQueue::class)
34-
->setConstructorArgs([$this->client, 'default'])
44+
->setConstructorArgs([$this->client, 'default', $this->config])
3545
->setMethods([
3646
'pushRaw',
3747
'getTopic',
@@ -68,7 +78,7 @@ public function testPushNewJob()
6878
public function testPushRaw()
6979
{
7080
$queue = $this->getMockBuilder(PubSubQueue::class)
71-
->setConstructorArgs([$this->client, 'default'])
81+
->setConstructorArgs([$this->client, 'default', $this->config])
7282
->setMethods(['getTopic', 'subscribeToTopic'])
7383
->getMock();
7484

@@ -240,7 +250,7 @@ public function testGetTopic()
240250
->willReturn($this->topic);
241251

242252
$queue = $this->getMockBuilder(PubSubQueue::class)
243-
->setConstructorArgs([$this->client, 'default'])
253+
->setConstructorArgs([$this->client, 'default', $this->config])
244254
->setMethods()
245255
->getMock();
246256

@@ -260,7 +270,7 @@ public function testCreateTopicAndReturnIt()
260270
->willReturn($this->topic);
261271

262272
$queue = $this->getMockBuilder(PubSubQueue::class)
263-
->setConstructorArgs([$this->client, 'default'])
273+
->setConstructorArgs([$this->client, 'default', $this->config])
264274
->setMethods()
265275
->getMock();
266276

@@ -279,7 +289,7 @@ public function testSubscribtionIsCreated()
279289
->willReturn(false);
280290

281291
$queue = $this->getMockBuilder(PubSubQueue::class)
282-
->setConstructorArgs([$this->client, 'default'])
292+
->setConstructorArgs([$this->client, 'default', $this->config])
283293
->setMethods()
284294
->getMock();
285295

@@ -295,7 +305,7 @@ public function testSubscriptionIsRetrieved()
295305
->willReturn(true);
296306

297307
$queue = $this->getMockBuilder(PubSubQueue::class)
298-
->setConstructorArgs([$this->client, 'default'])
308+
->setConstructorArgs([$this->client, 'default', $this->config])
299309
->setMethods()
300310
->getMock();
301311

0 commit comments

Comments
 (0)