Skip to content

Commit 751d49f

Browse files
committed
Fix multiple subscriber issue
1 parent c008725 commit 751d49f

File tree

4 files changed

+41
-37
lines changed

4 files changed

+41
-37
lines changed

.gitignore

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1,2 +1,3 @@
1-
vendor
2-
composer.lock
1+
vendor
2+
composer.lock
3+
.DS_store

src/Jobs/PubSubJob.php

Lines changed: 11 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -2,11 +2,11 @@
22

33
namespace PubSub\PubSubQueue\Jobs;
44

5-
use Illuminate\Queue\Jobs\Job;
65
use Google\Cloud\PubSub\Message;
76
use Illuminate\Container\Container;
8-
use PubSub\PubSubQueue\PubSubQueue;
97
use Illuminate\Contracts\Queue\Job as JobContract;
8+
use Illuminate\Queue\Jobs\Job;
9+
use PubSub\PubSubQueue\PubSubQueue;
1010

1111
class PubSubJob extends Job implements JobContract
1212
{
@@ -24,6 +24,13 @@ class PubSubJob extends Job implements JobContract
2424
*/
2525
protected $job;
2626

27+
/**
28+
* subscriber name
29+
*
30+
* @var string
31+
*/
32+
protected $subscriber;
33+
2734
/**
2835
* Create a new job instance.
2936
*
@@ -33,14 +40,14 @@ class PubSubJob extends Job implements JobContract
3340
* @param string $connectionName
3441
* @param string $queue
3542
*/
36-
public function __construct(Container $container, PubSubQueue $pubsub, Message $job, $connectionName, $queue)
43+
public function __construct(Container $container, PubSubQueue $pubsub, Message $job, $connectionName, $queue, $subscriber = null)
3744
{
3845
$this->pubsub = $pubsub;
3946
$this->job = $job;
4047
$this->queue = $queue;
4148
$this->container = $container;
4249
$this->connectionName = $connectionName;
43-
50+
$this->subscriber = $subscriber;
4451
$this->decoded = $this->payload();
4552
}
4653

src/PubSubQueue.php

Lines changed: 27 additions & 26 deletions
Original file line numberDiff line numberDiff line change
@@ -2,13 +2,13 @@
22

33
namespace PubSub\PubSubQueue;
44

5-
use Illuminate\Queue\Queue;
6-
use Illuminate\Support\Str;
7-
use Google\Cloud\PubSub\Topic;
85
use Google\Cloud\PubSub\Message;
96
use Google\Cloud\PubSub\PubSubClient;
10-
use PubSub\PubSubQueue\Jobs\PubSubJob;
7+
use Google\Cloud\PubSub\Topic;
118
use Illuminate\Contracts\Queue\Queue as QueueContract;
9+
use Illuminate\Queue\Queue;
10+
use Illuminate\Support\Str;
11+
use PubSub\PubSubQueue\Jobs\PubSubJob;
1212

1313
class PubSubQueue extends Queue implements QueueContract
1414
{
@@ -31,6 +31,10 @@ class PubSubQueue extends Queue implements QueueContract
3131
*/
3232
protected $config;
3333

34+
/**
35+
* Subscriber name
36+
*/
37+
protected $subscriber;
3438

3539
/**
3640
* Create a new GCP PubSub instance.
@@ -86,11 +90,9 @@ public function pushRaw($payload, $subscriber = null, array $options = [])
8690
{
8791
$topic = $this->getTopic($subscriber, true);
8892

89-
$this->subscribeToTopic($topic);
90-
9193
$publish = ['data' => $payload];
9294

93-
if (! empty($options)) {
95+
if (!empty($options)) {
9496
$publish['attributes'] = $options;
9597
}
9698

@@ -128,25 +130,26 @@ public function later($delay, $job, $data = '', $subscriber = null)
128130
*/
129131
public function pop($subscriber = null)
130132
{
133+
$this->subscriber = $subscriber;
131134
$topic = $this->getTopic($this->getQueue($subscriber));
132135

133-
if (! $topic->exists()) {
136+
if (!$topic->exists()) {
134137
return;
135138
}
136-
137-
$subscription = $topic->subscription($this->getSubscriberName($subscriber));
139+
$subscription = $topic->subscription($subscriber);
138140
$messages = $subscription->pull([
139141
'returnImmediately' => true,
140142
'maxMessages' => 1,
141143
]);
142144

143-
if (! empty($messages) && count($messages) > 0) {
145+
if (!empty($messages) && count($messages) > 0) {
144146
return new PubSubJob(
145147
$this->container,
146148
$this,
147149
$messages[0],
148150
$this->connectionName,
149-
$this->getQueue($subscriber)
151+
$this->getQueue($subscriber),
152+
$subscriber
150153
);
151154
}
152155
}
@@ -170,8 +173,6 @@ public function bulk($jobs, $data = '', $queue = null)
170173

171174
$topic = $this->getTopic($this->getQueue($queue), true);
172175

173-
$this->subscribeToTopic($topic);
174-
175176
return $topic->publishBatch($payloads);
176177
}
177178

@@ -183,7 +184,7 @@ public function bulk($jobs, $data = '', $queue = null)
183184
*/
184185
public function acknowledge(Message $message, $queue = null)
185186
{
186-
$subscription = $this->getTopic($this->getQueue($queue))->subscription($this->getSubscriberName($queue));
187+
$subscription = $this->getTopic($this->getQueue($queue))->subscription($this->subscriber);
187188
$subscription->acknowledge($message);
188189
}
189190

@@ -198,7 +199,7 @@ public function acknowledge(Message $message, $queue = null)
198199
public function acknowledgeAndPublish(Message $message, $queue = null, $options = [], $delay = 0)
199200
{
200201
$topic = $this->getTopic($this->getQueue($queue));
201-
$subscription = $topic->subscription($this->getSubscriberName($queue));
202+
$subscription = $topic->subscription($queue);
202203

203204
$subscription->acknowledge($message);
204205

@@ -257,7 +258,7 @@ public function getTopic($queue, $create = false)
257258
$queue = $this->getQueue($queue);
258259
$topic = $this->pubsub->topic($queue);
259260

260-
if (! $topic->exists() && $create) {
261+
if (!$topic->exists() && $create) {
261262
$topic->create();
262263
}
263264

@@ -271,11 +272,11 @@ public function getTopic($queue, $create = false)
271272
*
272273
* @return \Google\Cloud\PubSub\Subscription
273274
*/
274-
public function subscribeToTopic(Topic $topic)
275+
public function subscribeToTopic(Topic $topic, $subscriber = null)
275276
{
276-
$subscription = $topic->subscription($this->getSubscriberName());
277-
if (! $subscription->exists()) {
278-
$subscription = $topic->subscribe($this->getSubscriberName());
277+
$subscription = $topic->subscription($subscriber);
278+
if (!$subscription->exists()) {
279+
$subscription = $topic->subscribe($subscriber);
279280
}
280281

281282
return $subscription;
@@ -290,10 +291,7 @@ public function subscribeToTopic(Topic $topic)
290291
*/
291292
public function getSubscriberName($queue = null)
292293
{
293-
if ($this->config && $this->config['subscribers'] && $queue && isset($this->config['subscribers'][$queue])) {
294-
return $this->config['subscribers'][$queue];
295-
}
296-
return 'subscriber';
294+
return $queue;
297295
}
298296

299297
/**
@@ -314,7 +312,10 @@ public function getPubSub()
314312
*/
315313
public function getQueue($queue)
316314
{
317-
return $queue ?: $this->default;
315+
if ($this->config && $this->config['subscribers'] && $queue && isset($this->config['subscribers'][$queue])) {
316+
return $this->config['subscribers'][$queue];
317+
}
318+
return $this->default;
318319
}
319320

320321
/**

tests/Unit/PubSubQueueTests.php

Lines changed: 0 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -312,11 +312,6 @@ public function testSubscriptionIsRetrieved()
312312
$this->assertTrue($queue->subscribeToTopic($this->topic) instanceof Subscription);
313313
}
314314

315-
public function testGetSubscriberName()
316-
{
317-
$this->assertTrue(is_string($this->queue->getSubscriberName()));
318-
}
319-
320315
public function testGetPubSub()
321316
{
322317
$this->assertTrue($this->queue->getPubSub() instanceof PubSubClient);

0 commit comments

Comments
 (0)