Skip to content

Commit 0a83fa6

Browse files
committed
Child process queue workers
- Remove `ShouldBroadcast` in favor of `ShouldBroadcastNow` in `Events\\ChildProcess` namespace - Implement `QueueWorker::class` - Add new binding to `NativeServiceProvider::class` - Fire up workers: iterate through queue worker config in `NativeServiceProvider::configureApp()` - Test `QueueWorker::up()`, `QueueWorker::down()` - Test `QueueWorkerFake::class` assertions work as expected
1 parent 8267087 commit 0a83fa6

File tree

14 files changed

+385
-8
lines changed

14 files changed

+385
-8
lines changed

config/nativephp.php

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -114,4 +114,12 @@
114114
],
115115
],
116116
],
117+
118+
'queue_workers' => [
119+
'default' => [
120+
'queues' => ['default'],
121+
'memory_limit' => 128,
122+
'timeout' => 60,
123+
],
124+
],
117125
];

src/Contracts/QueueWorker.php

Lines changed: 12 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,12 @@
1+
<?php
2+
3+
namespace Native\Laravel\Contracts;
4+
5+
use Native\Laravel\DTOs\QueueConfig;
6+
7+
interface QueueWorker
8+
{
9+
public function up(QueueConfig $config): void;
10+
11+
public function down(string $alias): void;
12+
}

src/DTOs/QueueConfig.php

Lines changed: 39 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,39 @@
1+
<?php
2+
3+
namespace Native\Laravel\DTOs;
4+
5+
class QueueConfig
6+
{
7+
/**
8+
* @param array<int, string> $queuesToConsume
9+
*/
10+
public function __construct(
11+
public readonly string $alias,
12+
public readonly array $queuesToConsume,
13+
public readonly int $memoryLimit,
14+
public readonly int $timeout,
15+
) {}
16+
17+
/**
18+
* @return array<int, self>
19+
*/
20+
public static function fromConfigArray(array $config): array
21+
{
22+
return array_map(
23+
function (array|string $worker, string $alias) {
24+
if (is_string($worker)) {
25+
return new self($worker, ['default'], 128, 60);
26+
}
27+
28+
return new self(
29+
$alias,
30+
$worker['queues'] ?? ['default'],
31+
$worker['memory_limit'] ?? 128,
32+
$worker['timeout'] ?? 60,
33+
);
34+
},
35+
$config,
36+
array_keys($config),
37+
);
38+
}
39+
}

src/Events/ChildProcess/ErrorReceived.php

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -3,11 +3,11 @@
33
namespace Native\Laravel\Events\ChildProcess;
44

55
use Illuminate\Broadcasting\Channel;
6-
use Illuminate\Contracts\Broadcasting\ShouldBroadcast;
6+
use Illuminate\Contracts\Broadcasting\ShouldBroadcastNow;
77
use Illuminate\Foundation\Events\Dispatchable;
88
use Illuminate\Queue\SerializesModels;
99

10-
class ErrorReceived implements ShouldBroadcast
10+
class ErrorReceived implements ShouldBroadcastNow
1111
{
1212
use Dispatchable, SerializesModels;
1313

src/Events/ChildProcess/MessageReceived.php

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -3,11 +3,11 @@
33
namespace Native\Laravel\Events\ChildProcess;
44

55
use Illuminate\Broadcasting\Channel;
6-
use Illuminate\Contracts\Broadcasting\ShouldBroadcast;
6+
use Illuminate\Contracts\Broadcasting\ShouldBroadcastNow;
77
use Illuminate\Foundation\Events\Dispatchable;
88
use Illuminate\Queue\SerializesModels;
99

10-
class MessageReceived implements ShouldBroadcast
10+
class MessageReceived implements ShouldBroadcastNow
1111
{
1212
use Dispatchable, SerializesModels;
1313

src/Events/ChildProcess/ProcessExited.php

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -3,11 +3,11 @@
33
namespace Native\Laravel\Events\ChildProcess;
44

55
use Illuminate\Broadcasting\Channel;
6-
use Illuminate\Contracts\Broadcasting\ShouldBroadcast;
6+
use Illuminate\Contracts\Broadcasting\ShouldBroadcastNow;
77
use Illuminate\Foundation\Events\Dispatchable;
88
use Illuminate\Queue\SerializesModels;
99

10-
class ProcessExited implements ShouldBroadcast
10+
class ProcessExited implements ShouldBroadcastNow
1111
{
1212
use Dispatchable, SerializesModels;
1313

src/Events/ChildProcess/ProcessSpawned.php

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -3,11 +3,11 @@
33
namespace Native\Laravel\Events\ChildProcess;
44

55
use Illuminate\Broadcasting\Channel;
6-
use Illuminate\Contracts\Broadcasting\ShouldBroadcast;
6+
use Illuminate\Contracts\Broadcasting\ShouldBroadcastNow;
77
use Illuminate\Foundation\Events\Dispatchable;
88
use Illuminate\Queue\SerializesModels;
99

10-
class ProcessSpawned implements ShouldBroadcast
10+
class ProcessSpawned implements ShouldBroadcastNow
1111
{
1212
use Dispatchable, SerializesModels;
1313

src/Facades/QueueWorker.php

Lines changed: 29 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,29 @@
1+
<?php
2+
3+
namespace Native\Laravel\Facades;
4+
5+
use Illuminate\Support\Facades\Facade;
6+
use Native\Laravel\Contracts\QueueWorker as QueueWorkerContract;
7+
use Native\Laravel\DTOs\QueueConfig;
8+
use Native\Laravel\Fakes\QueueWorkerFake;
9+
10+
/**
11+
* @method static void up(QueueConfig $config)
12+
* @method static void down(string $alias)
13+
*/
14+
class QueueWorker extends Facade
15+
{
16+
public static function fake()
17+
{
18+
return tap(static::getFacadeApplication()->make(QueueWorkerFake::class), function ($fake) {
19+
static::swap($fake);
20+
});
21+
}
22+
23+
protected static function getFacadeAccessor(): string
24+
{
25+
self::clearResolvedInstance(QueueWorkerContract::class);
26+
27+
return QueueWorkerContract::class;
28+
}
29+
}

src/Fakes/QueueWorkerFake.php

Lines changed: 61 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,61 @@
1+
<?php
2+
3+
namespace Native\Laravel\Fakes;
4+
5+
use Closure;
6+
use Native\Laravel\Contracts\QueueWorker as QueueWorkerContract;
7+
use Native\Laravel\DTOs\QueueConfig;
8+
use PHPUnit\Framework\Assert as PHPUnit;
9+
10+
class QueueWorkerFake implements QueueWorkerContract
11+
{
12+
/**
13+
* @var array<int, QueueConfig>
14+
*/
15+
public array $ups = [];
16+
17+
/**
18+
* @var array<int, string>
19+
*/
20+
public array $downs = [];
21+
22+
public function up(QueueConfig $config): void
23+
{
24+
$this->ups[] = $config;
25+
}
26+
27+
public function down(string $alias): void
28+
{
29+
$this->downs[] = $alias;
30+
}
31+
32+
public function assertUp(Closure $callback): void
33+
{
34+
$hit = empty(
35+
array_filter(
36+
$this->ups,
37+
fn (QueueConfig $up) => $callback($up) === true
38+
)
39+
) === false;
40+
41+
PHPUnit::assertTrue($hit);
42+
}
43+
44+
public function assertDown(string|Closure $alias): void
45+
{
46+
if (is_callable($alias) === false) {
47+
PHPUnit::assertContains($alias, $this->downs);
48+
49+
return;
50+
}
51+
52+
$hit = empty(
53+
array_filter(
54+
$this->downs,
55+
fn (string $down) => $alias($down) === true
56+
)
57+
) === false;
58+
59+
PHPUnit::assertTrue($hit);
60+
}
61+
}

src/NativeServiceProvider.php

Lines changed: 17 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -17,7 +17,9 @@
1717
use Native\Laravel\Contracts\ChildProcess as ChildProcessContract;
1818
use Native\Laravel\Contracts\GlobalShortcut as GlobalShortcutContract;
1919
use Native\Laravel\Contracts\PowerMonitor as PowerMonitorContract;
20+
use Native\Laravel\Contracts\QueueWorker as QueueWorkerContract;
2021
use Native\Laravel\Contracts\WindowManager as WindowManagerContract;
22+
use Native\Laravel\DTOs\QueueConfig;
2123
use Native\Laravel\Events\EventWatcher;
2224
use Native\Laravel\Exceptions\Handler;
2325
use Native\Laravel\GlobalShortcut as GlobalShortcutImplementation;
@@ -73,6 +75,10 @@ public function packageRegistered()
7375
return $app->make(PowerMonitorImplementation::class);
7476
});
7577

78+
$this->app->bind(QueueWorkerContract::class, function (Foundation $app) {
79+
return $app->make(QueueWorker::class);
80+
});
81+
7682
if (config('nativephp-internal.running')) {
7783
$this->app->singleton(
7884
\Illuminate\Contracts\Debug\ExceptionHandler::class,
@@ -112,6 +118,8 @@ protected function configureApp()
112118

113119
config(['session.driver' => 'file']);
114120
config(['queue.default' => 'database']);
121+
122+
$this->fireUpQueueWorkers();
115123
}
116124

117125
protected function rewriteStoragePath()
@@ -210,4 +218,13 @@ protected function configureDisks(): void
210218
]);
211219
}
212220
}
221+
222+
protected function fireUpQueueWorkers(): void
223+
{
224+
$queueConfigs = QueueConfig::fromConfigArray(config('nativephp.queue_workers'));
225+
226+
foreach ($queueConfigs as $queueConfig) {
227+
$this->app->make(QueueWorkerContract::class)->up($queueConfig);
228+
}
229+
}
213230
}

src/QueueWorker.php

Lines changed: 37 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,37 @@
1+
<?php
2+
3+
namespace Native\Laravel;
4+
5+
use Native\Laravel\Contracts\ChildProcess as ChildProcessContract;
6+
use Native\Laravel\Contracts\QueueWorker as QueueWorkerContract;
7+
use Native\Laravel\DTOs\QueueConfig;
8+
9+
class QueueWorker implements QueueWorkerContract
10+
{
11+
public function __construct(
12+
private readonly ChildProcessContract $childProcess,
13+
) {}
14+
15+
public function up(QueueConfig $config): void
16+
{
17+
$this->childProcess->php(
18+
[
19+
'-d',
20+
"memory_limit={$config->memoryLimit}M",
21+
'artisan',
22+
'queue:work',
23+
"--name={$config->alias}",
24+
'--queue='.implode(',', $config->queuesToConsume),
25+
"--memory={$config->memoryLimit}",
26+
"--timeout={$config->timeout}",
27+
],
28+
$config->alias,
29+
persistent: true,
30+
);
31+
}
32+
33+
public function down(string $alias): void
34+
{
35+
$this->childProcess->stop($alias);
36+
}
37+
}

tests/DTOs/QueueWorkerTest.php

Lines changed: 66 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,66 @@
1+
<?php
2+
3+
use Illuminate\Support\Arr;
4+
use Native\Laravel\DTOs\QueueConfig;
5+
6+
test('the factory method generates an array of config objects for several formats', function (array $config) {
7+
$configObject = QueueConfig::fromConfigArray($config);
8+
9+
expect($configObject)->toBeArray();
10+
expect($configObject)->toHaveCount(count($config));
11+
12+
foreach ($config as $alias => $worker) {
13+
if (is_string($worker)) {
14+
expect(
15+
Arr::first(
16+
array_filter($configObject, fn (QueueConfig $config) => $config->alias === $worker))
17+
)->queuesToConsume->toBe(['default']
18+
);
19+
20+
expect(Arr::first(array_filter($configObject, fn (QueueConfig $config) => $config->alias === $worker)))->memoryLimit->toBe(128);
21+
expect(Arr::first(array_filter($configObject, fn (QueueConfig $config) => $config->alias === $worker)))->timeout->toBe(60);
22+
23+
continue;
24+
}
25+
26+
expect(
27+
Arr::first(
28+
array_filter($configObject, fn (QueueConfig $config) => $config->alias === $alias))
29+
)->queuesToConsume->toBe($worker['queues'] ?? ['default']
30+
);
31+
32+
expect(Arr::first(array_filter($configObject, fn (QueueConfig $config) => $config->alias === $alias)))->memoryLimit->toBe($worker['memory_limit'] ?? 128);
33+
expect(Arr::first(array_filter($configObject, fn (QueueConfig $config) => $config->alias === $alias)))->timeout->toBe($worker['timeout'] ?? 60);
34+
}
35+
})->with([
36+
[
37+
'queue_workers' => [
38+
'some_worker' => [
39+
'queues' => ['default'],
40+
'memory_limit' => 64,
41+
'timeout' => 60,
42+
],
43+
],
44+
],
45+
[
46+
'queue_workers' => [
47+
'some_worker',
48+
'another_worker',
49+
],
50+
],
51+
[
52+
'queue_workers' => [
53+
'some_worker' => [
54+
],
55+
'another_worker' => [
56+
'queues' => ['default', 'another'],
57+
],
58+
'yet_another_worker' => [
59+
'memory_limit' => 256,
60+
],
61+
'one_more_worker' => [
62+
'timeout' => 120,
63+
],
64+
],
65+
],
66+
]);

0 commit comments

Comments
 (0)