Skip to content

Commit e1c669b

Browse files
Nyholmjaviereguiluz
authored andcommitted
[Messenger] Added an implementation to the TransportInterface
1 parent 3f12092 commit e1c669b

File tree

1 file changed

+81
-5
lines changed

1 file changed

+81
-5
lines changed

messenger/custom-transport.rst

Lines changed: 81 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -31,33 +31,109 @@ DSN. You will need a transport factory::
3131
The transport object needs to implement the
3232
:class:`Symfony\\Component\\Messenger\\Transport\\TransportInterface`
3333
(which combines the :class:`Symfony\\Component\\Messenger\\Transport\\Sender\\SenderInterface`
34-
and :class:`Symfony\\Component\\Messenger\\Transport\\Receiver\\ReceiverInterface`)::
34+
and :class:`Symfony\\Component\\Messenger\\Transport\\Receiver\\ReceiverInterface`).
35+
Here is a simplified example of a database transport::
3536

37+
use Ramsey\Uuid\Uuid;
3638
use Symfony\Component\Messenger\Envelope;
39+
use Symfony\Component\Messenger\Stamp\TransportMessageIdStamp;
40+
use Symfony\Component\Messenger\Transport\Serialization\PhpSerializer;
41+
use Symfony\Component\Messenger\Transport\Serialization\SerializerInterface;
42+
use Symfony\Component\Messenger\Transport\TransportInterface;
3743

3844
class YourTransport implements TransportInterface
3945
{
46+
private $db;
47+
private $serializer;
48+
49+
/**
50+
* @param FakeDatabase $db is used for demo purposes. It is not a real class.
51+
*/
52+
public function __construct(FakeDatabase $db, SerializerInterface $serializer = null)
53+
{
54+
$this->db = $db;
55+
$this->serializer = $serializer ?? new PhpSerializer();
56+
}
57+
4058
public function get(): iterable
4159
{
42-
// ...
60+
// Get a message from "my_queue"
61+
$row = $this->db->createQueryBuilder()
62+
->from('my_queue')
63+
->where('delivered_at is null OR delivered_at < :redeliver_timeout')
64+
->andWhere('handled = :false')
65+
->setParameter('redeliver_timeout', new DateTimeImmutable('-5minutes'))
66+
->setParameter('false', false)
67+
->getOneOrNullResult();
68+
69+
if (null === $row) {
70+
return [];
71+
}
72+
73+
$envelope = $this->serializer->decode([
74+
'body' => $row['envelope'],
75+
]);
76+
77+
return [$envelope->with(new TransportMessageIdStamp($row['id']))];
4378
}
4479

4580
public function ack(Envelope $envelope): void
4681
{
47-
// ...
82+
$stamp = $envelope->last(TransportMessageIdStamp::class);
83+
if (!$stamp instanceof TransportMessageIdStamp) {
84+
throw new \LogicException('No TransportMessageIdStamp found on the Envelope.');
85+
}
86+
87+
// Mark the message as "handled"
88+
$this->db->createQueryBuilder()
89+
->update('my_queue')
90+
->setValues([
91+
'handled' => true
92+
])
93+
->where('id = :id')
94+
->setParameter('id', $stamp->getId())
95+
->execute();
4896
}
4997

5098
public function reject(Envelope $envelope): void
5199
{
52-
// ...
100+
$stamp = $envelope->last(TransportMessageIdStamp::class);
101+
if (!$stamp instanceof TransportMessageIdStamp) {
102+
throw new \LogicException('No TransportMessageIdStamp found on the Envelope.');
103+
}
104+
105+
// Delete the message from the "my_queue" table
106+
$this->db->createQueryBuilder()
107+
->delete('my_queue')
108+
->where('id = :id')
109+
->setParameter('id', $stamp->getId())
110+
->execute();
53111
}
54112

55113
public function send(Envelope $envelope): Envelope
56114
{
57-
// ...
115+
$encodedMessage = $this->serializer->encode($envelope);
116+
$uuid = Uuid::uuid4()->toString();
117+
118+
// Add a message to the "my_queue" table
119+
$this->db->createQueryBuilder()
120+
->insert('my_queue')
121+
->values([
122+
'id' => $uuid,
123+
'envelope' => $encodedMessage['body'],
124+
'delivered_at' => null,
125+
'handled' => false,
126+
]);
127+
128+
return $envelope->with(new TransportMessageIdStamp($uuid));
58129
}
59130
}
60131

132+
The implementation above is not runnable code but illustrates how a
133+
:class:`Symfony\\Component\\Messenger\\Transport\\TransportInterface` could
134+
be implemented. For real implementations see :class:`Symfony\\Component\\Messenger\\Transport\\InMemoryTransport`
135+
and :class:`Symfony\\Component\\Messenger\\Transport\\Doctrine\\DoctrineReceiver`.
136+
61137
Register your Factory
62138
---------------------
63139

0 commit comments

Comments
 (0)