Skip to content

Commit 4b3a7c8

Browse files
committed
Merge branch '6.0'
2 parents e73d67f + e115a3d commit 4b3a7c8

File tree

9 files changed

+229
-138
lines changed

9 files changed

+229
-138
lines changed

.gitignore

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1,2 +1,3 @@
1-
vendor
2-
composer.lock
1+
vendor
2+
composer.lock
3+
.DS_store

composer.json

Lines changed: 11 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -1,8 +1,8 @@
11
{
2-
"name": "kainxspirits/laravel-pubsub-queue",
2+
"name": "munir131/laravel-pubsub-queue",
33
"description": "Queue driver for Google Cloud Pub/Sub.",
44
"keywords": [
5-
"kainxspirits",
5+
"munir131",
66
"laravel",
77
"queue",
88
"gcp",
@@ -13,26 +13,27 @@
1313
"type": "library",
1414
"authors": [
1515
{
16-
"name": "Kendryck",
17-
"email": "kainxspirits@users.noreply.github.com"
16+
"name": "Munir Khakhi",
17+
"email": "munir131@users.noreply.github.com"
1818
}
1919
],
2020
"require": {
21-
"php" : ">=7.1",
22-
"illuminate/queue": "~5.7.0",
23-
"google/cloud-pubsub": "^1.1"
21+
"php": ">=7.1",
22+
"google/cloud-pubsub": "^1.1",
23+
"illuminate/queue": "6.*",
24+
"laravel/helpers": "^1.1"
2425
},
2526
"require-dev": {
2627
"phpunit/phpunit": "^7.1"
2728
},
2829
"autoload": {
2930
"psr-4": {
30-
"Kainxspirits\\PubSubQueue\\": "src/"
31+
"PubSub\\PubSubQueue\\": "src/"
3132
}
3233
},
3334
"autoload-dev": {
3435
"psr-4": {
35-
"Kainxspirits\\PubSubQueue\\Tests\\": "tests/"
36+
"PubSub\\PubSubQueue\\Tests\\": "tests/"
3637
}
3738
},
3839
"scripts": {
@@ -41,7 +42,7 @@
4142
"extra": {
4243
"laravel": {
4344
"providers": [
44-
"Kainxspirits\\PubSubQueue\\PubSubQueueServiceProvider"
45+
"PubSub\\PubSubQueue\\PubSubQueueServiceProvider"
4546
]
4647
}
4748
},

src/Connectors/PubSubConnector.php

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -1,9 +1,9 @@
11
<?php
22

3-
namespace Kainxspirits\PubSubQueue\Connectors;
3+
namespace PubSub\PubSubQueue\Connectors;
44

55
use Google\Cloud\PubSub\PubSubClient;
6-
use Kainxspirits\PubSubQueue\PubSubQueue;
6+
use PubSub\PubSubQueue\PubSubQueue;
77
use Illuminate\Queue\Connectors\ConnectorInterface;
88

99
class PubSubConnector implements ConnectorInterface
@@ -24,10 +24,10 @@ class PubSubConnector implements ConnectorInterface
2424
public function connect(array $config)
2525
{
2626
$gcp_config = $this->transformConfig($config);
27-
2827
return new PubSubQueue(
2928
new PubSubClient($gcp_config),
30-
$config['queue'] ?? $this->default_queue
29+
$config['queue'] ?? $this->default_queue,
30+
$gcp_config
3131
);
3232
}
3333

src/Jobs/PubSubJob.php

Lines changed: 40 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -1,19 +1,19 @@
11
<?php
22

3-
namespace Kainxspirits\PubSubQueue\Jobs;
3+
namespace PubSub\PubSubQueue\Jobs;
44

5-
use Illuminate\Queue\Jobs\Job;
65
use Google\Cloud\PubSub\Message;
76
use Illuminate\Container\Container;
8-
use Kainxspirits\PubSubQueue\PubSubQueue;
97
use Illuminate\Contracts\Queue\Job as JobContract;
8+
use Illuminate\Queue\Jobs\Job;
9+
use PubSub\PubSubQueue\PubSubQueue;
1010

1111
class PubSubJob extends Job implements JobContract
1212
{
1313
/**
1414
* The PubSub queue.
1515
*
16-
* @var \Kainxspirits\PubSubQueue\PubSubQueue
16+
* @var \PubSub\PubSubQueue\PubSubQueue
1717
*/
1818
protected $pubsub;
1919

@@ -24,23 +24,30 @@ class PubSubJob extends Job implements JobContract
2424
*/
2525
protected $job;
2626

27+
/**
28+
* subscriber name
29+
*
30+
* @var string
31+
*/
32+
protected $subscriber;
33+
2734
/**
2835
* Create a new job instance.
2936
*
3037
* @param \Illuminate\Container\Container $container
31-
* @param \Kainxspirits\PubSubQueue\PubSubQueue $sqs
38+
* @param \PubSub\PubSubQueue\PubSubQueue $sqs
3239
* @param \Google\Cloud\PubSub\Message $job
3340
* @param string $connectionName
3441
* @param string $queue
3542
*/
36-
public function __construct(Container $container, PubSubQueue $pubsub, Message $job, $connectionName, $queue)
43+
public function __construct(Container $container, PubSubQueue $pubsub, Message $job, $connectionName, $subscriberName, $subscriber = null)
3744
{
3845
$this->pubsub = $pubsub;
3946
$this->job = $job;
40-
$this->queue = $queue;
47+
$this->queue = $subscriberName;
4148
$this->container = $container;
4249
$this->connectionName = $connectionName;
43-
50+
$this->subscriber = $subscriber;
4451
$this->decoded = $this->payload();
4552
}
4653

@@ -61,9 +68,34 @@ public function getJobId()
6168
*/
6269
public function getRawBody()
6370
{
71+
if ($this->pubsub->checkHandler($this->subscriber)) {
72+
return $this->modifyPayload(
73+
$this->job->data(),
74+
$this->pubsub->getHandler($this->subscriber)
75+
);
76+
}
6477
return base64_decode($this->job->data());
6578
}
6679

80+
/**
81+
* @param string|array $payload
82+
* @param string $class
83+
* @return array
84+
*/
85+
private function modifyPayload($payload, $class)
86+
{
87+
if (!is_array($payload)) {
88+
$payload = json_decode($payload, true);
89+
}
90+
91+
$body = [
92+
'job' => $class . '@handle',
93+
'data' => $payload,
94+
];
95+
96+
return json_encode($body);
97+
}
98+
6799
/**
68100
* Delete the job from the queue.
69101
*

0 commit comments

Comments
 (0)