@@ -31,33 +31,109 @@ DSN. You will need a transport factory::
31
31
The transport object needs to implement the
32
32
:class: `Symfony\\ Component\\ Messenger\\ Transport\\ TransportInterface `
33
33
(which combines the :class: `Symfony\\ Component\\ Messenger\\ Transport\\ Sender\\ SenderInterface `
34
- and :class: `Symfony\\ Component\\ Messenger\\ Transport\\ Receiver\\ ReceiverInterface `)::
34
+ and :class: `Symfony\\ Component\\ Messenger\\ Transport\\ Receiver\\ ReceiverInterface `).
35
+ Here is a simplified example of a database transport::
35
36
37
+ use Ramsey\Uuid\Uuid;
36
38
use Symfony\Component\Messenger\Envelope;
39
+ use Symfony\Component\Messenger\Stamp\TransportMessageIdStamp;
40
+ use Symfony\Component\Messenger\Transport\Serialization\PhpSerializer;
41
+ use Symfony\Component\Messenger\Transport\Serialization\SerializerInterface;
42
+ use Symfony\Component\Messenger\Transport\TransportInterface;
37
43
38
44
class YourTransport implements TransportInterface
39
45
{
46
+ private $db;
47
+ private $serializer;
48
+
49
+ /**
50
+ * @param FakeDatabase $db is used for demo purposes. It is not a real class.
51
+ */
52
+ public function __construct(FakeDatabase $db, SerializerInterface $serializer = null)
53
+ {
54
+ $this->db = $db;
55
+ $this->serializer = $serializer ?? new PhpSerializer();
56
+ }
57
+
40
58
public function get(): iterable
41
59
{
42
- // ...
60
+ // Get a message from "my_queue"
61
+ $row = $this->db->createQueryBuilder()
62
+ ->from('my_queue')
63
+ ->where('delivered_at is null OR delivered_at < :redeliver_timeout')
64
+ ->andWhere('handled = :false')
65
+ ->setParameter('redeliver_timeout', new DateTimeImmutable('-5minutes'))
66
+ ->setParameter('false', false)
67
+ ->getOneOrNullResult();
68
+
69
+ if (null === $row) {
70
+ return [];
71
+ }
72
+
73
+ $envelope = $this->serializer->decode([
74
+ 'body' => $row['envelope'],
75
+ ]);
76
+
77
+ return [$envelope->with(new TransportMessageIdStamp($row['id']))];
43
78
}
44
79
45
80
public function ack(Envelope $envelope): void
46
81
{
47
- // ...
82
+ $stamp = $envelope->last(TransportMessageIdStamp::class);
83
+ if (!$stamp instanceof TransportMessageIdStamp) {
84
+ throw new \LogicException('No TransportMessageIdStamp found on the Envelope.');
85
+ }
86
+
87
+ // Mark the message as "handled"
88
+ $this->db->createQueryBuilder()
89
+ ->update('my_queue')
90
+ ->setValues([
91
+ 'handled' => true
92
+ ])
93
+ ->where('id = :id')
94
+ ->setParameter('id', $stamp->getId())
95
+ ->execute();
48
96
}
49
97
50
98
public function reject(Envelope $envelope): void
51
99
{
52
- // ...
100
+ $stamp = $envelope->last(TransportMessageIdStamp::class);
101
+ if (!$stamp instanceof TransportMessageIdStamp) {
102
+ throw new \LogicException('No TransportMessageIdStamp found on the Envelope.');
103
+ }
104
+
105
+ // Delete the message from the "my_queue" table
106
+ $this->db->createQueryBuilder()
107
+ ->delete('my_queue')
108
+ ->where('id = :id')
109
+ ->setParameter('id', $stamp->getId())
110
+ ->execute();
53
111
}
54
112
55
113
public function send(Envelope $envelope): Envelope
56
114
{
57
- // ...
115
+ $encodedMessage = $this->serializer->encode($envelope);
116
+ $uuid = Uuid::uuid4()->toString();
117
+
118
+ // Add a message to the "my_queue" table
119
+ $this->db->createQueryBuilder()
120
+ ->insert('my_queue')
121
+ ->values([
122
+ 'id' => $uuid,
123
+ 'envelope' => $encodedMessage['body'],
124
+ 'delivered_at' => null,
125
+ 'handled' => false,
126
+ ]);
127
+
128
+ return $envelope->with(new TransportMessageIdStamp($uuid));
58
129
}
59
130
}
60
131
132
+ The implementation above is not runnable code but illustrates how a
133
+ :class: `Symfony\\ Component\\ Messenger\\ Transport\\ TransportInterface ` could
134
+ be implemented. For real implementations see :class: `Symfony\\ Component\\ Messenger\\ Transport\\ InMemoryTransport `
135
+ and :class: `Symfony\\ Component\\ Messenger\\ Transport\\ Doctrine\\ DoctrineReceiver `.
136
+
61
137
Register your Factory
62
138
---------------------
63
139
0 commit comments