Skip to content

Commit

Permalink
[8.x] Queued closure listeners (#33463)
Browse files Browse the repository at this point in the history
Queued closure listener support.
  • Loading branch information
taylorotwell authored Jul 8, 2020
1 parent 12ee1c4 commit 6520552
Show file tree
Hide file tree
Showing 8 changed files with 220 additions and 1 deletion.
1 change: 1 addition & 0 deletions composer.json
Original file line number Diff line number Diff line change
Expand Up @@ -98,6 +98,7 @@
},
"autoload": {
"files": [
"src/Illuminate/Events/functions.php",
"src/Illuminate/Foundation/helpers.php",
"src/Illuminate/Support/helpers.php"
],
Expand Down
3 changes: 2 additions & 1 deletion src/Illuminate/Events/CallQueuedListener.php
Original file line number Diff line number Diff line change
Expand Up @@ -2,14 +2,15 @@

namespace Illuminate\Events;

use Illuminate\Bus\Queueable;
use Illuminate\Container\Container;
use Illuminate\Contracts\Queue\Job;
use Illuminate\Contracts\Queue\ShouldQueue;
use Illuminate\Queue\InteractsWithQueue;

class CallQueuedListener implements ShouldQueue
{
use InteractsWithQueue;
use InteractsWithQueue, Queueable;

/**
* The listener class name.
Expand Down
4 changes: 4 additions & 0 deletions src/Illuminate/Events/Dispatcher.php
Original file line number Diff line number Diff line change
Expand Up @@ -77,6 +77,10 @@ public function listen($events, $listener = null)
{
if ($events instanceof Closure) {
return $this->listen($this->firstClosureParameterType($events), $events);
} elseif ($events instanceof QueuedClosure) {
return $this->listen($this->firstClosureParameterType($events->closure), $events->resolve());
} elseif ($listener instanceof QueuedClosure) {
$listener = $listener->resolve();
}

foreach ((array) $events as $event) {
Expand Down
34 changes: 34 additions & 0 deletions src/Illuminate/Events/InvokeQueuedClosure.php
Original file line number Diff line number Diff line change
@@ -0,0 +1,34 @@
<?php

namespace Illuminate\Events;

class InvokeQueuedClosure
{
/**
* Handle the event.
*
* @param \Illuminate\Queue\SerializableClosure $closure
* @param array $arguments
* @return void
*/
public function handle($closure, array $arguments)
{
call_user_func($closure->getClosure(), ...$arguments);
}

/**
* Handle a job failure.
*
* @param \Illuminate\Queue\SerializableClosure $closure
* @param array $arguments
* @param array $catchCallbacks
* @param \Throwable $exception
* @return void
*/
public function failed($closure, array $arguments, array $catchCallbacks, $exception)
{
$arguments[] = $exception;

collect($catchCallbacks)->each->__invoke(...$arguments);
}
}
125 changes: 125 additions & 0 deletions src/Illuminate/Events/QueuedClosure.php
Original file line number Diff line number Diff line change
@@ -0,0 +1,125 @@
<?php

namespace Illuminate\Events;

use Closure;
use Illuminate\Queue\SerializableClosure;

class QueuedClosure
{
/**
* The underlying Closure.
*
* @var \Closure
*/
public $closure;

/**
* The name of the connection the job should be sent to.
*
* @var string|null
*/
public $connection;

/**
* The name of the queue the job should be sent to.
*
* @var string|null
*/
public $queue;

/**
* The number of seconds before the job should be made available.
*
* @var \DateTimeInterface|\DateInterval|int|null
*/
public $delay;

/**
* All of the "catch" callbacks for the queued closure.
*
* @var array
*/
public $catchCallbacks = [];

/**
* Create a new queued closure event listener resolver.
*
* @param \Closure $closure
* @return void
*/
public function __construct(Closure $closure)
{
$this->closure = $closure;
}

/**
* Set the desired connection for the job.
*
* @param string|null $connection
* @return $this
*/
public function onConnection($connection)
{
$this->connection = $connection;

return $this;
}

/**
* Set the desired queue for the job.
*
* @param string|null $queue
* @return $this
*/
public function onQueue($queue)
{
$this->queue = $queue;

return $this;
}

/**
* Set the desired delay for the job.
*
* @param \DateTimeInterface|\DateInterval|int|null $delay
* @return $this
*/
public function delay($delay)
{
$this->delay = $delay;

return $this;
}

/**
* Specify a callback that should be invoked if the queued listener job fails.
*
* @param \Closure $closure
* @return $this
*/
public function catch(Closure $closure)
{
$this->catchCallbacks[] = $closure;

return $this;
}

/**
* Resolve the actual event listener callback.
*
* @return \Closure
*/
public function resolve()
{
return function (...$arguments) {
dispatch(new CallQueuedListener(InvokeQueuedClosure::class, 'handle', [
'closure' => new SerializableClosure($this->closure),
'arguments' => $arguments,
'catch' => collect($this->catchCallbacks)->map(function ($callback) {
return new SerializableClosure($callback);
})->all(),
]))->onConnection($this->connection)->onQueue($this->queue)->delay($this->delay);
};
}
}
1 change: 1 addition & 0 deletions src/Illuminate/Events/composer.json
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@
],
"require": {
"php": "^7.3",
"illuminate/bus": "^8.0",
"illuminate/collections": "^8.0",
"illuminate/container": "^8.0",
"illuminate/contracts": "^8.0",
Expand Down
18 changes: 18 additions & 0 deletions src/Illuminate/Events/functions.php
Original file line number Diff line number Diff line change
@@ -0,0 +1,18 @@
<?php

namespace Illuminate\Events;

use Closure;

if (! function_exists('Illuminate\Events\queueable')) {
/**
* Create a new queued Closure event listener.
*
* @param \Closure $closure
* @return \Illuminate\Events\QueuedClosure
*/
function queueable(Closure $closure)
{
return new QueuedClosure($closure);
}
}
35 changes: 35 additions & 0 deletions tests/Integration/Events/QueuedClosureListenerTest.php
Original file line number Diff line number Diff line change
@@ -0,0 +1,35 @@
<?php

namespace Illuminate\Tests\Integration\Events;

use Illuminate\Events\CallQueuedListener;
use Illuminate\Events\InvokeQueuedClosure;
use function Illuminate\Events\queueable;
use Illuminate\Support\Facades\Bus;
use Illuminate\Support\Facades\Event;
use Orchestra\Testbench\TestCase;

class QueuedClosureListenerTest extends TestCase
{
public function testAnonymousQueuedListenerIsQueued()
{
Bus::fake();

Event::listen(queueable(function (TestEvent $event) {
//
})->catch(function (TestEvent $event) {
//
})->onConnection(null)->onQueue(null));

Event::dispatch(new TestEvent);

Bus::assertDispatched(CallQueuedListener::class, function ($job) {
return $job->class == InvokeQueuedClosure::class;
});
}
}

class TestEvent
{
//
}

0 comments on commit 6520552

Please sign in to comment.