Skip to content

[Messenger] Added an implementation to the TransportInterface #12340

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 1 commit into from
Sep 19, 2019
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
86 changes: 81 additions & 5 deletions messenger/custom-transport.rst
Original file line number Diff line number Diff line change
Expand Up @@ -31,33 +31,109 @@ DSN. You will need a transport factory::
The transport object needs to implement the
:class:`Symfony\\Component\\Messenger\\Transport\\TransportInterface`
(which combines the :class:`Symfony\\Component\\Messenger\\Transport\\Sender\\SenderInterface`
and :class:`Symfony\\Component\\Messenger\\Transport\\Receiver\\ReceiverInterface`)::
and :class:`Symfony\\Component\\Messenger\\Transport\\Receiver\\ReceiverInterface`).
Here is a simplified example of a database transport::

use Ramsey\Uuid\Uuid;
use Symfony\Component\Messenger\Envelope;
use Symfony\Component\Messenger\Stamp\TransportMessageIdStamp;
use Symfony\Component\Messenger\Transport\Serialization\PhpSerializer;
use Symfony\Component\Messenger\Transport\Serialization\SerializerInterface;
use Symfony\Component\Messenger\Transport\TransportInterface;

class YourTransport implements TransportInterface
{
private $db;
private $serializer;

/**
* @param FakeDatabase $db is used for demo purposes. It is not a real class.
*/
public function __construct(FakeDatabase $db, SerializerInterface $serializer = null)
{
$this->db = $db;
$this->serializer = $serializer ?? new PhpSerializer();
}

public function get(): iterable
{
// ...
// Get a message from "my_queue"
$row = $this->db->createQueryBuilder()
->from('my_queue')
->where('delivered_at is null OR delivered_at < :redeliver_timeout')
->andWhere('handled = :false')
->setParameter('redeliver_timeout', new DateTimeImmutable('-5minutes'))
->setParameter('false', false)
->getOneOrNullResult();

if (null === $row) {
return [];
}

$envelope = $this->serializer->decode([
'body' => $row['envelope'],
]);

return [$envelope->with(new TransportMessageIdStamp($row['id']))];
}

public function ack(Envelope $envelope): void
{
// ...
$stamp = $envelope->last(TransportMessageIdStamp::class);
if (!$stamp instanceof TransportMessageIdStamp) {
throw new \LogicException('No TransportMessageIdStamp found on the Envelope.');
}

// Mark the message as "handled"
$this->db->createQueryBuilder()
->update('my_queue')
->setValues([
'handled' => true
])
->where('id = :id')
->setParameter('id', $stamp->getId())
->execute();
}

public function reject(Envelope $envelope): void
{
// ...
$stamp = $envelope->last(TransportMessageIdStamp::class);
if (!$stamp instanceof TransportMessageIdStamp) {
throw new \LogicException('No TransportMessageIdStamp found on the Envelope.');
}

// Delete the message from the "my_queue" table
$this->db->createQueryBuilder()
->delete('my_queue')
->where('id = :id')
->setParameter('id', $stamp->getId())
->execute();
}

public function send(Envelope $envelope): Envelope
{
// ...
$encodedMessage = $this->serializer->encode($envelope);
$uuid = Uuid::uuid4()->toString();

// Add a message to the "my_queue" table
$this->db->createQueryBuilder()
->insert('my_queue')
->values([
'id' => $uuid,
'envelope' => $encodedMessage['body'],
'delivered_at' => null,
'handled' => false,
]);

return $envelope->with(new TransportMessageIdStamp($uuid));
}
}

The implementation above is not runnable code but illustrates how a
:class:`Symfony\\Component\\Messenger\\Transport\\TransportInterface` could
be implemented. For real implementations see :class:`Symfony\\Component\\Messenger\\Transport\\InMemoryTransport`
and :class:`Symfony\\Component\\Messenger\\Transport\\Doctrine\\DoctrineReceiver`.

Register your Factory
---------------------

Expand Down