Skip to content

Commit 4ed23bd

Browse files
Refractor redis transport using streams
1 parent 7162d2e commit 4ed23bd

21 files changed

+392
-447
lines changed

.travis.yml

Lines changed: 9 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -19,6 +19,7 @@ env:
1919
- MIN_PHP=7.1.3
2020
- SYMFONY_PROCESS_PHP_TEST_BINARY=~/.phpenv/shims/php
2121
- MESSENGER_AMQP_DSN=amqp://localhost/%2f/messages
22+
- MESSENGER_REDIS_DSN=redis://127.0.0.1:7001/messages
2223

2324
matrix:
2425
include:
@@ -55,8 +56,8 @@ before_install:
5556
5657
- |
5758
# Start Redis cluster
58-
docker pull grokzen/redis-cluster:4.0.8
59-
docker run -d -p 7000:7000 -p 7001:7001 -p 7002:7002 -p 7003:7003 -p 7004:7004 -p 7005:7005 --name redis-cluster grokzen/redis-cluster:4.0.8
59+
docker pull grokzen/redis-cluster:5.0.4
60+
docker run -d -p 7000:7000 -p 7001:7001 -p 7002:7002 -p 7003:7003 -p 7004:7004 -p 7005:7005 --name redis-cluster grokzen/redis-cluster:5.0.4
6061
export REDIS_CLUSTER_HOSTS='localhost:7000 localhost:7001 localhost:7002 localhost:7003 localhost:7004 localhost:7005'
6162
6263
- |
@@ -116,6 +117,7 @@ before_install:
116117
local ext_name=$1
117118
local ext_so=$2
118119
local INI=$3
120+
local input=${4:-yes}
119121
local ext_dir=$(php -r "echo ini_get('extension_dir');")
120122
local ext_cache=~/php-ext/$(basename $ext_dir)/$ext_name
121123
@@ -124,7 +126,7 @@ before_install:
124126
else
125127
rm ~/.pearrc /tmp/pear 2>/dev/null || true
126128
mkdir -p $ext_cache
127-
echo yes | pecl install -f $ext_name &&
129+
echo $input | pecl install -f $ext_name &&
128130
cp $ext_dir/$ext_so $ext_cache
129131
fi
130132
}
@@ -147,7 +149,6 @@ before_install:
147149
echo session.gc_probability = 0 >> $INI
148150
echo opcache.enable_cli = 1 >> $INI
149151
echo apc.enable_cli = 1 >> $INI
150-
echo extension = redis.so >> $INI
151152
echo extension = memcached.so >> $INI
152153
done
153154
@@ -166,7 +167,11 @@ before_install:
166167
tfold ext.igbinary tpecl igbinary-2.0.8 igbinary.so $INI
167168
tfold ext.zookeeper tpecl zookeeper-0.7.1 zookeeper.so $INI
168169
tfold ext.amqp tpecl amqp-1.9.4 amqp.so $INI
170+
tfold ext.redis tpecl redis-4.3.0 redis.so $INI "no"
169171
done
172+
- |
173+
# List all php extensions with versions
174+
- php -r 'foreach (get_loaded_extensions() as $extension) echo $extension . " " . phpversion($extension) . PHP_EOL;'
170175

171176
- |
172177
# Load fixtures

src/Symfony/Bundle/FrameworkBundle/DependencyInjection/FrameworkExtension.php

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1700,6 +1700,7 @@ private function registerMessengerConfiguration(array $config, ContainerBuilder
17001700
if (empty($config['transports'])) {
17011701
$container->removeDefinition('messenger.transport.symfony_serializer');
17021702
$container->removeDefinition('messenger.transport.amqp.factory');
1703+
$container->removeDefinition('messenger.transport.redis.factory');
17031704
} else {
17041705
$container->getDefinition('messenger.transport.symfony_serializer')
17051706
->replaceArgument(1, $config['serializer']['symfony_serializer']['format'])

src/Symfony/Bundle/FrameworkBundle/Resources/config/messenger.xml

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -66,6 +66,10 @@
6666
<tag name="messenger.transport_factory" />
6767
</service>
6868

69+
<service id="messenger.transport.redis.factory" class="Symfony\Component\Messenger\Transport\RedisExt\RedisTransportFactory">
70+
<tag name="messenger.transport_factory" />
71+
</service>
72+
6973
<service id="messenger.transport.sync.factory" class="Symfony\Component\Messenger\Transport\Sync\SyncTransportFactory">
7074
<tag name="messenger.transport_factory" />
7175
</service>

src/Symfony/Bundle/FrameworkBundle/Tests/DependencyInjection/Fixtures/php/messenger_transports.php

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -13,6 +13,7 @@
1313
'options' => ['queue' => ['name' => 'Queue']],
1414
'serializer' => 'messenger.transport.native_php_serializer',
1515
],
16+
'redis' => 'redis://127.0.0.1:6379/messages',
1617
],
1718
],
1819
]);

src/Symfony/Bundle/FrameworkBundle/Tests/DependencyInjection/Fixtures/xml/messenger_transports.xml

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -17,6 +17,7 @@
1717
</framework:queue>
1818
</framework:options>
1919
</framework:transport>
20+
<framework:transport name="redis" dsn="redis://127.0.0.1:6379/messages" />
2021
</framework:messenger>
2122
</framework:config>
2223
</container>

src/Symfony/Bundle/FrameworkBundle/Tests/DependencyInjection/Fixtures/yml/messenger_transports.yml

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -11,3 +11,4 @@ framework:
1111
queue:
1212
name: Queue
1313
serializer: 'messenger.transport.native_php_serializer'
14+
redis: 'redis://127.0.0.1:6379/messages'

src/Symfony/Bundle/FrameworkBundle/Tests/DependencyInjection/FrameworkExtensionTest.php

Lines changed: 11 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -673,6 +673,7 @@ public function testMessenger()
673673
$this->assertTrue($container->hasAlias('messenger.default_bus'));
674674
$this->assertTrue($container->getAlias('messenger.default_bus')->isPublic());
675675
$this->assertFalse($container->hasDefinition('messenger.transport.amqp.factory'));
676+
$this->assertFalse($container->hasDefinition('messenger.transport.redis.factory'));
676677
$this->assertTrue($container->hasDefinition('messenger.transport_factory'));
677678
$this->assertSame(TransportFactory::class, $container->getDefinition('messenger.transport_factory')->getClass());
678679
}
@@ -697,6 +698,16 @@ public function testMessengerTransports()
697698
$this->assertEquals(new Reference('messenger.transport.native_php_serializer'), $transportArguments[2]);
698699

699700
$this->assertTrue($container->hasDefinition('messenger.transport.amqp.factory'));
701+
702+
$this->assertTrue($container->hasDefinition('messenger.transport.redis'));
703+
$transportFactory = $container->getDefinition('messenger.transport.redis')->getFactory();
704+
$transportArguments = $container->getDefinition('messenger.transport.redis')->getArguments();
705+
706+
$this->assertEquals([new Reference('messenger.transport_factory'), 'createTransport'], $transportFactory);
707+
$this->assertCount(3, $transportArguments);
708+
$this->assertSame('redis://127.0.0.1:6379/messages', $transportArguments[0]);
709+
710+
$this->assertTrue($container->hasDefinition('messenger.transport.redis.factory'));
700711
}
701712

702713
public function testMessengerRouting()

src/Symfony/Component/Messenger/Tests/Transport/RedisExt/ConnectionTest.php

Lines changed: 76 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -12,43 +12,104 @@
1212
namespace Symfony\Component\Messenger\Tests\Transport\RedisExt;
1313

1414
use PHPUnit\Framework\TestCase;
15+
use Symfony\Component\Messenger\Exception\LogicException;
1516
use Symfony\Component\Messenger\Transport\RedisExt\Connection;
1617

1718
/**
1819
* @requires extension redis
1920
*/
2021
class ConnectionTest extends TestCase
2122
{
22-
/**
23-
* @expectedException \InvalidArgumentException
24-
* @expectedExceptionMessage The given Redis DSN "redis://" is invalid.
25-
*/
26-
public function testItCannotBeConstructedWithAWrongDsn()
23+
public function testFromInvalidDsn()
2724
{
25+
$this->expectException(\InvalidArgumentException::class);
26+
$this->expectExceptionMessage('The given Redis DSN "redis://" is invalid.');
27+
2828
Connection::fromDsn('redis://');
2929
}
3030

31-
public function testItGetsParametersFromTheDsn()
31+
public function testFromDsn()
3232
{
3333
$this->assertEquals(
34-
new Connection('queue', array(
34+
new Connection(['stream' => 'queue'], [
3535
'host' => 'localhost',
3636
'port' => 6379,
37-
)),
37+
]),
3838
Connection::fromDsn('redis://localhost/queue')
3939
);
4040
}
4141

42-
public function testOverrideOptionsViaQueryParameters()
42+
public function testFromDsnWithOptions()
4343
{
4444
$this->assertEquals(
45-
new Connection('queue', array(
46-
'host' => '127.0.0.1',
45+
new Connection(['stream' => 'queue', 'group' => 'group1', 'consumer' => 'consumer1'], [
46+
'host' => 'localhost',
4747
'port' => 6379,
48-
), array(
49-
'processing_ttl' => '8000',
50-
)),
51-
Connection::fromDsn('redis://127.0.0.1:6379/queue?processing_ttl=8000')
48+
], [
49+
'blocking_timeout' => 30,
50+
]),
51+
Connection::fromDsn('redis://localhost/queue/group1/consumer1', ['blocking_timeout' => 30])
5252
);
5353
}
54+
55+
public function testFromDsnWithQueryOptions()
56+
{
57+
$this->assertEquals(
58+
new Connection(['stream' => 'queue', 'group' => 'group1', 'consumer' => 'consumer1'], [
59+
'host' => 'localhost',
60+
'port' => 6379,
61+
], [
62+
'blocking_timeout' => 30,
63+
]),
64+
Connection::fromDsn('redis://localhost/queue/group1/consumer1?blocking_timeout=30')
65+
);
66+
}
67+
68+
public function testKeepGettingPendingMessages()
69+
{
70+
$redis = $this->getMockBuilder(\Redis::class)->disableOriginalConstructor()->getMock();
71+
72+
$redis->expects($this->exactly(3))->method('xreadgroup')
73+
->with('symfony', 'consumer', ['queue' => 0], 1, null)
74+
->willReturn(['queue' => [['message' => json_encode(['body' => 'Test', 'headers' => []])]]]);
75+
76+
$connection = Connection::fromDsn('redis://localhost/queue', [], $redis);
77+
$this->assertNotNull($connection->get());
78+
$this->assertNotNull($connection->get());
79+
$this->assertNotNull($connection->get());
80+
}
81+
82+
public function testFirstGetPendingMessagesThenNewMessages()
83+
{
84+
$redis = $this->getMockBuilder(\Redis::class)->disableOriginalConstructor()->getMock();
85+
86+
$count = 0;
87+
88+
$redis->expects($this->exactly(2))->method('xreadgroup')
89+
->with('symfony', 'consumer', $this->callback(function ($arr_streams) use (&$count) {
90+
++$count;
91+
92+
if (1 === $count) {
93+
return '0' === $arr_streams['queue'];
94+
}
95+
96+
return '>' === $arr_streams['queue'];
97+
}), 1, null)
98+
->willReturn(['queue' => []]);
99+
100+
$connection = Connection::fromDsn('redis://localhost/queue', [], $redis);
101+
$connection->get();
102+
}
103+
104+
public function testUnexpectedRedisError()
105+
{
106+
$this->expectException(LogicException::class);
107+
$this->expectExceptionMessage('Redis error happens');
108+
$redis = $this->getMockBuilder(\Redis::class)->disableOriginalConstructor()->getMock();
109+
$redis->expects($this->once())->method('xreadgroup')->willReturn(false);
110+
$redis->expects($this->once())->method('getLastError')->willReturn('Redis error happens');
111+
112+
$connection = Connection::fromDsn('redis://localhost/queue', [], $redis);
113+
$connection->get();
114+
}
54115
}

src/Symfony/Component/Messenger/Tests/Transport/RedisExt/Fixtures/long_receiver.php

Lines changed: 0 additions & 43 deletions
This file was deleted.

0 commit comments

Comments
 (0)