How to do Change Stream and Add in Laravel queue #2751
Replies: 1 comment
-
User supervisor to keep it running <?php
namespace App\Console\Commands;
use App\Jobs\ProcessTrending;
use Illuminate\Console\Command;
use MongoDB\Client;
class StartChangeStreamCommand extends Command
{
/**
* The name and signature of the console command.
*
* @var string
*/
protected $signature = 'app:start-change-stream-command';
/**
* The console command description.
*
* @var string
*/
protected $description = 'Starts Change Stream';
/**
* Execute the console command.
*/
public function handle()
{
$client = new Client(config('database.connections.mongodb.dsn'));
$pipeline = [['$match' => ['operationType' => 'insert']]];
$options = [
'fullDocument' => 'updateLookup'
];
$changeStream = $client->selectCollection(config('database.connections.mongodb.database'), 'violations')
->watch($pipeline, $options);
for ($changeStream->rewind(); true; $changeStream->next()) {
if (!$changeStream->valid()) {
continue;
}
$event = $changeStream->current();
$ns = sprintf('%s.%s', $event['ns']['db'], $event['ns']['coll']);
$id = json_encode($event['documentKey']['_id']);
switch ($event['operationType']) {
case 'delete':
printf("Deleted document in %s with _id: %s\n\n", $ns, $id);
break;
case 'insert':
printf("Inserted new document in %s\n", $ns);
if ($event['fullDocument']['type'] !== "Movie") {
ProcessRecommendationJob::dispatch($event['fullDocument']);
}
echo json_encode($event['fullDocument']), "\n\n";
break;
case 'replace':
printf("Replaced new document in %s with _id: %s\n", $ns, $id);
echo json_encode($event['fullDocument']), "\n\n";
break;
case 'update':
printf("Updated document in %s with _id: %s\n", $ns, $id);
echo json_encode($event['updateDescription']), "\n\n";
break;
}
}
}
} |
Beta Was this translation helpful? Give feedback.
0 replies
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment
Uh oh!
There was an error while loading. Please reload this page.
Uh oh!
There was an error while loading. Please reload this page.
-
So I want to watch a change in my collection and save it in laravel queue for retriable job. Does someone can share to me how did you achieve it?
I got here but where is the best place to add this in laravel?
https://www.mongodb.com/docs/php-library/current/reference/method/MongoDBChangeStream-current/
Beta Was this translation helpful? Give feedback.
All reactions