Skip to content

Commit a8a1e7a

Browse files
committed
feature #30917 [Messenger] Add a redis stream transport (soyuka, alexander-schranz)
This PR was merged into the 4.3-dev branch. Discussion ---------- [Messenger] Add a redis stream transport | Q | A | ------------- | --- | Branch? | master | Bug fix? | no | New feature? | yes | BC breaks? | no | Deprecations? | no | Tests pass? | Yes | Fixed tickets | #28681 | License | MIT | Doc PR | symfony/symfony-docs#11341 As discussed in #28681 this will refractor @soyuka implementation of redis using the redis stream features so we don't need to handle parking the messages ourself and redis is doing it for us. Some interesting links about streams: - https://redis.io/topics/streams-intro - https://brandur.org/redis-streams ``` +-----------R | GET | -> XREADGROUP +-----------+ | | handleMessage V +-----------+ No | failed? |---------------------------+ +-----------+ | | | | Yes | V | +-----------+ No | | retry? |---------------------------+ +-----------+ | | | | Yes | V V +-----------R +-----------R | REJECT | -> XDEL | ACK | -> XACK +-----------+ +-----------+ ``` **GET**: Will use `XREADGROUP` to read the one message from the stream **REJECT**: Reject will just remove the message with `XDEL` from the stream as adding it back to the stream is handled by symfony worker itself **ACK**: Will use the `XACK` Method to ack the message for the specific group The sender will still be simple by calling the `XADD` redis function. #EU-FOSSA Commits ------- ff0b8554ea Refractor redis transport using redis streams 7162d2ec1d Implement redis transport
2 parents bca3527 + 793abc2 commit a8a1e7a

12 files changed

+825
-0
lines changed
Lines changed: 115 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,115 @@
1+
<?php
2+
3+
/*
4+
* This file is part of the Symfony package.
5+
*
6+
* (c) Fabien Potencier <fabien@symfony.com>
7+
*
8+
* For the full copyright and license information, please view the LICENSE
9+
* file that was distributed with this source code.
10+
*/
11+
12+
namespace Symfony\Component\Messenger\Tests\Transport\RedisExt;
13+
14+
use PHPUnit\Framework\TestCase;
15+
use Symfony\Component\Messenger\Exception\LogicException;
16+
use Symfony\Component\Messenger\Transport\RedisExt\Connection;
17+
18+
/**
19+
* @requires extension redis
20+
*/
21+
class ConnectionTest extends TestCase
22+
{
23+
public function testFromInvalidDsn()
24+
{
25+
$this->expectException(\InvalidArgumentException::class);
26+
$this->expectExceptionMessage('The given Redis DSN "redis://" is invalid.');
27+
28+
Connection::fromDsn('redis://');
29+
}
30+
31+
public function testFromDsn()
32+
{
33+
$this->assertEquals(
34+
new Connection(['stream' => 'queue'], [
35+
'host' => 'localhost',
36+
'port' => 6379,
37+
]),
38+
Connection::fromDsn('redis://localhost/queue')
39+
);
40+
}
41+
42+
public function testFromDsnWithOptions()
43+
{
44+
$this->assertEquals(
45+
new Connection(['stream' => 'queue', 'group' => 'group1', 'consumer' => 'consumer1'], [
46+
'host' => 'localhost',
47+
'port' => 6379,
48+
], [
49+
'blocking_timeout' => 30,
50+
]),
51+
Connection::fromDsn('redis://localhost/queue/group1/consumer1', ['blocking_timeout' => 30])
52+
);
53+
}
54+
55+
public function testFromDsnWithQueryOptions()
56+
{
57+
$this->assertEquals(
58+
new Connection(['stream' => 'queue', 'group' => 'group1', 'consumer' => 'consumer1'], [
59+
'host' => 'localhost',
60+
'port' => 6379,
61+
], [
62+
'blocking_timeout' => 30,
63+
]),
64+
Connection::fromDsn('redis://localhost/queue/group1/consumer1?blocking_timeout=30')
65+
);
66+
}
67+
68+
public function testKeepGettingPendingMessages()
69+
{
70+
$redis = $this->getMockBuilder(\Redis::class)->disableOriginalConstructor()->getMock();
71+
72+
$redis->expects($this->exactly(3))->method('xreadgroup')
73+
->with('symfony', 'consumer', ['queue' => 0], 1, null)
74+
->willReturn(['queue' => [['message' => json_encode(['body' => 'Test', 'headers' => []])]]]);
75+
76+
$connection = Connection::fromDsn('redis://localhost/queue', [], $redis);
77+
$this->assertNotNull($connection->get());
78+
$this->assertNotNull($connection->get());
79+
$this->assertNotNull($connection->get());
80+
}
81+
82+
public function testFirstGetPendingMessagesThenNewMessages()
83+
{
84+
$redis = $this->getMockBuilder(\Redis::class)->disableOriginalConstructor()->getMock();
85+
86+
$count = 0;
87+
88+
$redis->expects($this->exactly(2))->method('xreadgroup')
89+
->with('symfony', 'consumer', $this->callback(function ($arr_streams) use (&$count) {
90+
++$count;
91+
92+
if (1 === $count) {
93+
return '0' === $arr_streams['queue'];
94+
}
95+
96+
return '>' === $arr_streams['queue'];
97+
}), 1, null)
98+
->willReturn(['queue' => []]);
99+
100+
$connection = Connection::fromDsn('redis://localhost/queue', [], $redis);
101+
$connection->get();
102+
}
103+
104+
public function testUnexpectedRedisError()
105+
{
106+
$this->expectException(LogicException::class);
107+
$this->expectExceptionMessage('Redis error happens');
108+
$redis = $this->getMockBuilder(\Redis::class)->disableOriginalConstructor()->getMock();
109+
$redis->expects($this->once())->method('xreadgroup')->willReturn(false);
110+
$redis->expects($this->once())->method('getLastError')->willReturn('Redis error happens');
111+
112+
$connection = Connection::fromDsn('redis://localhost/queue', [], $redis);
113+
$connection->get();
114+
}
115+
}
Lines changed: 65 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,65 @@
1+
<?php
2+
3+
/*
4+
* This file is part of the Symfony package.
5+
*
6+
* (c) Fabien Potencier <fabien@symfony.com>
7+
*
8+
* For the full copyright and license information, please view the LICENSE
9+
* file that was distributed with this source code.
10+
*/
11+
12+
namespace Symfony\Component\Messenger\Tests\Transport\RedisExt;
13+
14+
use PHPUnit\Framework\TestCase;
15+
use Symfony\Component\Messenger\Tests\Fixtures\DummyMessage;
16+
use Symfony\Component\Messenger\Transport\RedisExt\Connection;
17+
18+
/**
19+
* @requires extension redis
20+
*/
21+
class RedisExtIntegrationTest extends TestCase
22+
{
23+
private $redis;
24+
private $connection;
25+
26+
protected function setUp()
27+
{
28+
if (!getenv('MESSENGER_REDIS_DSN')) {
29+
$this->markTestSkipped('The "MESSENGER_REDIS_DSN" environment variable is required.');
30+
}
31+
32+
$this->redis = new \Redis();
33+
$this->connection = Connection::fromDsn(getenv('MESSENGER_REDIS_DSN'), [], $this->redis);
34+
$this->clearRedis();
35+
$this->connection->setup();
36+
}
37+
38+
public function testConnectionSendAndGet()
39+
{
40+
$this->connection->add('{"message": "Hi"}', ['type' => DummyMessage::class]);
41+
$encoded = $this->connection->get();
42+
$this->assertEquals('{"message": "Hi"}', $encoded['body']);
43+
$this->assertEquals(['type' => DummyMessage::class], $encoded['headers']);
44+
}
45+
46+
public function testGetTheFirstAvailableMessage()
47+
{
48+
$this->connection->add('{"message": "Hi1"}', ['type' => DummyMessage::class]);
49+
$this->connection->add('{"message": "Hi2"}', ['type' => DummyMessage::class]);
50+
$encoded = $this->connection->get();
51+
$this->assertEquals('{"message": "Hi1"}', $encoded['body']);
52+
$this->assertEquals(['type' => DummyMessage::class], $encoded['headers']);
53+
$encoded = $this->connection->get();
54+
$this->assertEquals('{"message": "Hi2"}', $encoded['body']);
55+
$this->assertEquals(['type' => DummyMessage::class], $encoded['headers']);
56+
}
57+
58+
private function clearRedis()
59+
{
60+
$parsedUrl = parse_url(getenv('MESSENGER_REDIS_DSN'));
61+
$pathParts = explode('/', $parsedUrl['path'] ?? '');
62+
$stream = $pathParts[1] ?? 'symfony';
63+
$this->redis->del($stream);
64+
}
65+
}
Lines changed: 76 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,76 @@
1+
<?php
2+
3+
/*
4+
* This file is part of the Symfony package.
5+
*
6+
* (c) Fabien Potencier <fabien@symfony.com>
7+
*
8+
* For the full copyright and license information, please view the LICENSE
9+
* file that was distributed with this source code.
10+
*/
11+
12+
namespace Symfony\Component\Messenger\Tests\Transport\RedisExt;
13+
14+
use PHPUnit\Framework\TestCase;
15+
use Symfony\Component\Messenger\Exception\MessageDecodingFailedException;
16+
use Symfony\Component\Messenger\Tests\Fixtures\DummyMessage;
17+
use Symfony\Component\Messenger\Transport\RedisExt\Connection;
18+
use Symfony\Component\Messenger\Transport\RedisExt\RedisReceiver;
19+
use Symfony\Component\Messenger\Transport\Serialization\PhpSerializer;
20+
use Symfony\Component\Messenger\Transport\Serialization\Serializer;
21+
use Symfony\Component\Serializer as SerializerComponent;
22+
use Symfony\Component\Serializer\Encoder\JsonEncoder;
23+
use Symfony\Component\Serializer\Normalizer\ObjectNormalizer;
24+
25+
class RedisReceiverTest extends TestCase
26+
{
27+
public function testItReturnsTheDecodedMessageToTheHandler()
28+
{
29+
$serializer = $this->createSerializer();
30+
31+
$redisEnvelop = $this->createRedisEnvelope();
32+
$connection = $this->getMockBuilder(Connection::class)->disableOriginalConstructor()->getMock();
33+
$connection->method('get')->willReturn($redisEnvelop);
34+
35+
$receiver = new RedisReceiver($connection, $serializer);
36+
$actualEnvelopes = iterator_to_array($receiver->get());
37+
$this->assertCount(1, $actualEnvelopes);
38+
$this->assertEquals(new DummyMessage('Hi'), $actualEnvelopes[0]->getMessage());
39+
}
40+
41+
public function testItRejectTheMessageIfThereIsAMessageDecodingFailedException()
42+
{
43+
$this->expectException(MessageDecodingFailedException::class);
44+
45+
$serializer = $this->createMock(PhpSerializer::class);
46+
$serializer->method('decode')->willThrowException(new MessageDecodingFailedException());
47+
48+
$redisEnvelop = $this->createRedisEnvelope();
49+
$connection = $this->getMockBuilder(Connection::class)->disableOriginalConstructor()->getMock();
50+
$connection->method('get')->willReturn($redisEnvelop);
51+
$connection->expects($this->once())->method('reject');
52+
53+
$receiver = new RedisReceiver($connection, $serializer);
54+
iterator_to_array($receiver->get());
55+
}
56+
57+
private function createRedisEnvelope()
58+
{
59+
return [
60+
'id' => 1,
61+
'body' => '{"message": "Hi"}',
62+
'headers' => [
63+
'type' => DummyMessage::class,
64+
],
65+
];
66+
}
67+
68+
private function createSerializer(): Serializer
69+
{
70+
$serializer = new Serializer(
71+
new SerializerComponent\Serializer([new ObjectNormalizer()], ['json' => new JsonEncoder()])
72+
);
73+
74+
return $serializer;
75+
}
76+
}
Lines changed: 39 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,39 @@
1+
<?php
2+
3+
/*
4+
* This file is part of the Symfony package.
5+
*
6+
* (c) Fabien Potencier <fabien@symfony.com>
7+
*
8+
* For the full copyright and license information, please view the LICENSE
9+
* file that was distributed with this source code.
10+
*/
11+
12+
namespace Symfony\Component\Messenger\Tests\Transport\RedisExt;
13+
14+
use PHPUnit\Framework\TestCase;
15+
use Symfony\Component\Messenger\Envelope;
16+
use Symfony\Component\Messenger\Tests\Fixtures\DummyMessage;
17+
use Symfony\Component\Messenger\Transport\RedisExt\Connection;
18+
use Symfony\Component\Messenger\Transport\RedisExt\RedisSender;
19+
use Symfony\Component\Messenger\Transport\Serialization\SerializerInterface;
20+
21+
class RedisSenderTest extends TestCase
22+
{
23+
public function testSend()
24+
{
25+
$envelope = new Envelope(new DummyMessage('Oy'));
26+
$encoded = ['body' => '...', 'headers' => ['type' => DummyMessage::class]];
27+
28+
$connection = $this->getMockBuilder(Connection::class)
29+
->disableOriginalConstructor()
30+
->getMock();
31+
$connection->expects($this->once())->method('add')->with($encoded['body'], $encoded['headers']);
32+
33+
$serializer = $this->getMockBuilder(SerializerInterface::class)->getMock();
34+
$serializer->method('encode')->with($envelope)->willReturnOnConsecutiveCalls($encoded);
35+
36+
$sender = new RedisSender($connection, $serializer);
37+
$sender->send($envelope);
38+
}
39+
}
Lines changed: 42 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,42 @@
1+
<?php
2+
3+
/*
4+
* This file is part of the Symfony package.
5+
*
6+
* (c) Fabien Potencier <fabien@symfony.com>
7+
*
8+
* For the full copyright and license information, please view the LICENSE
9+
* file that was distributed with this source code.
10+
*/
11+
12+
namespace Symfony\Component\Messenger\Tests\Transport\RedisExt;
13+
14+
use PHPUnit\Framework\TestCase;
15+
use Symfony\Component\Messenger\Transport\RedisExt\Connection;
16+
use Symfony\Component\Messenger\Transport\RedisExt\RedisTransport;
17+
use Symfony\Component\Messenger\Transport\RedisExt\RedisTransportFactory;
18+
use Symfony\Component\Messenger\Transport\Serialization\SerializerInterface;
19+
20+
/**
21+
* @requires extension redis
22+
*/
23+
class RedisTransportFactoryTest extends TestCase
24+
{
25+
public function testSupportsOnlyRedisTransports()
26+
{
27+
$factory = new RedisTransportFactory();
28+
29+
$this->assertTrue($factory->supports('redis://localhost', []));
30+
$this->assertFalse($factory->supports('sqs://localhost', []));
31+
$this->assertFalse($factory->supports('invalid-dsn', []));
32+
}
33+
34+
public function testCreateTransport()
35+
{
36+
$factory = new RedisTransportFactory();
37+
$serializer = $this->getMockBuilder(SerializerInterface::class)->getMock();
38+
$expectedTransport = new RedisTransport(Connection::fromDsn('redis://localhost', ['foo' => 'bar']), $serializer);
39+
40+
$this->assertEquals($expectedTransport, $factory->createTransport('redis://localhost', ['foo' => 'bar'], $serializer));
41+
}
42+
}

0 commit comments

Comments
 (0)