Skip to content

Commit b35b2ed

Browse files
WyriHaximusclue
authored andcommitted
Initial skeleton for lazy client connection
Originally filed in clue#82 by @WyriHaximus
1 parent 4e28607 commit b35b2ed

10 files changed

+526
-24
lines changed

README.md

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -27,6 +27,7 @@ It enables you to set and query its data or use its PubSub topics to react to in
2727
* [Usage](#usage)
2828
* [Factory](#factory)
2929
* [createClient()](#createclient)
30+
* [createLazyClient()](#createlazyclient)
3031
* [Client](#client)
3132
* [Commands](#commands)
3233
* [Promises](#promises)
@@ -195,6 +196,12 @@ authentication. You can explicitly pass a custom timeout value in seconds
195196
$factory->createClient('localhost?timeout=0.5');
196197
```
197198

199+
#### createLazyClient()
200+
201+
The `createLazyClient($redisUri)` method can be used to create a new [`Client`](#client) which lazily
202+
creates and connects to the configured redis server on the first command. Internally it will use `createClient()`
203+
when the first command comes in, queues all commands while connecting, and pass on all commands directly when connected.
204+
198205
### Client
199206

200207
The `Client` is responsible for exchanging messages with Redis

examples/cli.php

Lines changed: 6 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -11,7 +11,10 @@
1111

1212
echo '# connecting to redis...' . PHP_EOL;
1313

14-
$factory->createClient('localhost')->then(function (Client $client) use ($loop) {
14+
/** @var Client $client */
15+
$client = $factory->createLazyClient('localhost');
16+
17+
try {
1518
echo '# connected! Entering interactive mode, hit CTRL-D to quit' . PHP_EOL;
1619

1720
$loop->addReadStream(STDIN, function () use ($client, $loop) {
@@ -48,10 +51,10 @@
4851

4952
$loop->removeReadStream(STDIN);
5053
});
51-
}, function (Exception $error) {
54+
} catch (Exception $error) {
5255
echo 'CONNECTION ERROR: ' . $error->getMessage() . PHP_EOL;
5356
exit(1);
54-
});
57+
};
5558

5659

5760
$loop->run();

examples/incr.php

Lines changed: 7 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -8,14 +8,14 @@
88
$loop = React\EventLoop\Factory::create();
99
$factory = new Factory($loop);
1010

11-
$factory->createClient('localhost')->then(function (Client $client) {
12-
$client->incr('test');
11+
/** @var Client $client */
12+
$client = $factory->createLazyClient('localhost');
13+
$client->incr('test');
1314

14-
$client->get('test')->then(function ($result) {
15-
var_dump($result);
16-
});
17-
18-
$client->end();
15+
$client->get('test')->then(function ($result) {
16+
var_dump($result);
1917
});
2018

19+
$client->end();
20+
2121
$loop->run();

examples/publish.php

Lines changed: 6 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -11,12 +11,12 @@
1111
$channel = isset($argv[1]) ? $argv[1] : 'channel';
1212
$message = isset($argv[2]) ? $argv[2] : 'message';
1313

14-
$factory->createClient('localhost')->then(function (Client $client) use ($channel, $message) {
15-
$client->publish($channel, $message)->then(function ($received) {
16-
echo 'successfully published. Received by ' . $received . PHP_EOL;
17-
});
18-
19-
$client->end();
14+
/** @var Client $client */
15+
$client = $factory->createLazyClient('localhost');
16+
$client->publish($channel, $message)->then(function ($received) {
17+
echo 'successfully published. Received by ' . $received . PHP_EOL;
2018
});
2119

20+
$client->end();
21+
2222
$loop->run();

examples/subscribe.php

Lines changed: 7 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -10,14 +10,14 @@
1010

1111
$channel = isset($argv[1]) ? $argv[1] : 'channel';
1212

13-
$factory->createClient('localhost')->then(function (Client $client) use ($channel) {
14-
$client->subscribe($channel)->then(function () {
15-
echo 'Now subscribed to channel ' . PHP_EOL;
16-
});
13+
/** @var Client $client */
14+
$client = $factory->createLazyClient('localhost');
15+
$client->subscribe($channel)->then(function () {
16+
echo 'Now subscribed to channel ' . PHP_EOL;
17+
});
1718

18-
$client->on('message', function ($channel, $message) {
19-
echo 'Message on ' . $channel . ': ' . $message . PHP_EOL;
20-
});
19+
$client->on('message', function ($channel, $message) {
20+
echo 'Message on ' . $channel . ': ' . $message . PHP_EOL;
2121
});
2222

2323
$loop->run();

src/Factory.php

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -38,6 +38,11 @@ public function __construct(LoopInterface $loop, ConnectorInterface $connector =
3838
$this->protocol = $protocol;
3939
}
4040

41+
public function createLazyClient($target)
42+
{
43+
return new LazyStreamingClient($target, $this);
44+
}
45+
4146
/**
4247
* create redis client connected to address of given redis instance
4348
*

src/LazyStreamingClient.php

Lines changed: 122 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,122 @@
1+
<?php
2+
3+
namespace Clue\React\Redis;
4+
5+
use Evenement\EventEmitter;
6+
use Clue\Redis\Protocol\Parser\ParserInterface;
7+
use Clue\Redis\Protocol\Parser\ParserException;
8+
use Clue\Redis\Protocol\Serializer\SerializerInterface;
9+
use Clue\Redis\Protocol\Factory as ProtocolFactory;
10+
use React\Promise\FulfilledPromise;
11+
use React\Promise\Promise;
12+
use React\Promise\PromiseInterface;
13+
use React\Stream\Util;
14+
use UnderflowException;
15+
use RuntimeException;
16+
use InvalidArgumentException;
17+
use React\Promise\Deferred;
18+
use Clue\Redis\Protocol\Model\ErrorReply;
19+
use Clue\Redis\Protocol\Model\ModelInterface;
20+
use Clue\Redis\Protocol\Model\MultiBulkReply;
21+
use React\Stream\DuplexStreamInterface;
22+
23+
/**
24+
* @internal
25+
*/
26+
class LazyStreamingClient extends EventEmitter implements Client
27+
{
28+
private $target;
29+
/** @var Factory */
30+
private $factory;
31+
private $ending = false;
32+
private $closed = false;
33+
public $promise = null;
34+
public $client = null;
35+
36+
/**
37+
* @param $target
38+
*/
39+
public function __construct($target, Factory $factory)
40+
{
41+
$this->target = $target;
42+
$this->factory = $factory;
43+
44+
$this->on('close', array($this, 'removeAllListeners'));
45+
}
46+
47+
private function client()
48+
{
49+
if ($this->promise instanceof PromiseInterface) {
50+
return $this->promise;
51+
}
52+
53+
if ($this->client instanceof Client) {
54+
return new FulfilledPromise($this->client());
55+
}
56+
57+
$self = $this;
58+
return $this->promise = $this->factory->createClient($this->target)->then(function (Client $client) use ($self) {
59+
$self->client = $client;
60+
$self->promise = null;
61+
62+
Util::forwardEvents(
63+
$self->client,
64+
$self,
65+
array(
66+
'error',
67+
'close',
68+
'message',
69+
'subscribe',
70+
'unsubscribe',
71+
'pmessage',
72+
'psubscribe',
73+
'punsubscribe',
74+
)
75+
);
76+
77+
return $client;
78+
}, function (\Exception $e) use ($self) {
79+
// connection failed => emit error if connection is not already closed
80+
if ($self->closed) {
81+
return;
82+
}
83+
$self->emit('error', array($e));
84+
$self->close();
85+
86+
return $e;
87+
});
88+
}
89+
90+
public function __call($name, $args)
91+
{
92+
if ($this->client instanceof Client) {
93+
return \call_user_func_array(array($this->client, $name), $args);
94+
}
95+
96+
return $this->client()->then(function (Client $client) use ($name, $args) {
97+
return \call_user_func_array(array($client, $name), $args);
98+
});
99+
}
100+
101+
public function end()
102+
{
103+
if ($this->client instanceof Client) {
104+
return $this->client->end();
105+
}
106+
107+
return $this->client()->then(function (Client $client) {
108+
return $client->end();
109+
});
110+
}
111+
112+
public function close()
113+
{
114+
if ($this->client instanceof Client) {
115+
return $this->client->close();
116+
}
117+
118+
return $this->client()->then(function (Client $client) {
119+
return $client->close();
120+
});
121+
}
122+
}
Lines changed: 148 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,148 @@
1+
<?php
2+
3+
namespace Clue\Tests\React\Redis;
4+
5+
use Clue\React\Redis\Factory;
6+
use React\Promise;
7+
8+
class FactoryLazyStreamingClientTest extends TestCase
9+
{
10+
private $loop;
11+
private $connector;
12+
private $factory;
13+
14+
public function setUp()
15+
{
16+
$this->loop = $this->getMockBuilder('React\EventLoop\LoopInterface')->getMock();
17+
$this->connector = $this->getMockBuilder('React\Socket\ConnectorInterface')->getMock();
18+
$this->factory = new Factory($this->loop, $this->connector);
19+
}
20+
21+
public function testWillConnectWithDefaultPort()
22+
{
23+
$this->connector->expects($this->never())->method('connect')->with('redis.example.com:6379')->willReturn(Promise\reject(new \RuntimeException()));
24+
$this->factory->createLazyClient('redis.example.com');
25+
}
26+
27+
public function testWillConnectToLocalhost()
28+
{
29+
$this->connector->expects($this->never())->method('connect')->with('localhost:1337')->willReturn(Promise\reject(new \RuntimeException()));
30+
$this->factory->createLazyClient('localhost:1337');
31+
}
32+
33+
public function testWillResolveIfConnectorResolves()
34+
{
35+
$stream = $this->getMockBuilder('React\Socket\ConnectionInterface')->getMock();
36+
$stream->expects($this->never())->method('write');
37+
38+
$this->connector->expects($this->never())->method('connect')->willReturn(Promise\resolve($stream));
39+
$client = $this->factory->createLazyClient('localhost');
40+
41+
$this->assertInstanceOf('Clue\React\Redis\Client', $client);
42+
}
43+
44+
public function testWillWriteSelectCommandIfTargetContainsPath()
45+
{
46+
$stream = $this->getMockBuilder('React\Socket\ConnectionInterface')->getMock();
47+
$stream->expects($this->never())->method('write')->with("*2\r\n$6\r\nselect\r\n$4\r\ndemo\r\n");
48+
49+
$this->connector->expects($this->never())->method('connect')->willReturn(Promise\resolve($stream));
50+
$this->factory->createLazyClient('redis://127.0.0.1/demo');
51+
}
52+
53+
public function testWillWriteSelectCommandIfTargetContainsDbQueryParameter()
54+
{
55+
$stream = $this->getMockBuilder('React\Socket\ConnectionInterface')->getMock();
56+
$stream->expects($this->never())->method('write')->with("*2\r\n$6\r\nselect\r\n$1\r\n4\r\n");
57+
58+
$this->connector->expects($this->never())->method('connect')->willReturn(Promise\resolve($stream));
59+
$this->factory->createLazyClient('redis://127.0.0.1?db=4');
60+
}
61+
62+
public function testWillWriteAuthCommandIfRedisUriContainsUserInfo()
63+
{
64+
$stream = $this->getMockBuilder('React\Socket\ConnectionInterface')->getMock();
65+
$stream->expects($this->never())->method('write')->with("*2\r\n$4\r\nauth\r\n$5\r\nworld\r\n");
66+
67+
$this->connector->expects($this->never())->method('connect')->with('example.com:6379')->willReturn(Promise\resolve($stream));
68+
$this->factory->createLazyClient('redis://hello:world@example.com');
69+
}
70+
71+
public function testWillWriteAuthCommandIfRedisUriContainsEncodedUserInfo()
72+
{
73+
$stream = $this->getMockBuilder('React\Socket\ConnectionInterface')->getMock();
74+
$stream->expects($this->never())->method('write')->with("*2\r\n$4\r\nauth\r\n$5\r\nh@llo\r\n");
75+
76+
$this->connector->expects($this->never())->method('connect')->with('example.com:6379')->willReturn(Promise\resolve($stream));
77+
$this->factory->createLazyClient('redis://:h%40llo@example.com');
78+
}
79+
80+
public function testWillWriteAuthCommandIfTargetContainsPasswordQueryParameter()
81+
{
82+
$stream = $this->getMockBuilder('React\Socket\ConnectionInterface')->getMock();
83+
$stream->expects($this->never())->method('write')->with("*2\r\n$4\r\nauth\r\n$6\r\nsecret\r\n");
84+
85+
$this->connector->expects($this->never())->method('connect')->with('example.com:6379')->willReturn(Promise\resolve($stream));
86+
$this->factory->createLazyClient('redis://example.com?password=secret');
87+
}
88+
89+
public function testWillWriteAuthCommandIfTargetContainsEncodedPasswordQueryParameter()
90+
{
91+
$stream = $this->getMockBuilder('React\Socket\ConnectionInterface')->getMock();
92+
$stream->expects($this->never())->method('write')->with("*2\r\n$4\r\nauth\r\n$5\r\nh@llo\r\n");
93+
94+
$this->connector->expects($this->never())->method('connect')->with('example.com:6379')->willReturn(Promise\resolve($stream));
95+
$this->factory->createLazyClient('redis://example.com?password=h%40llo');
96+
}
97+
98+
public function testWillWriteAuthCommandIfRedissUriContainsUserInfo()
99+
{
100+
$stream = $this->getMockBuilder('React\Socket\ConnectionInterface')->getMock();
101+
$stream->expects($this->never())->method('write')->with("*2\r\n$4\r\nauth\r\n$5\r\nworld\r\n");
102+
103+
$this->connector->expects($this->never())->method('connect')->with('tls://example.com:6379')->willReturn(Promise\resolve($stream));
104+
$this->factory->createLazyClient('rediss://hello:world@example.com');
105+
}
106+
107+
public function testWillWriteAuthCommandIfRedisUnixUriContainsPasswordQueryParameter()
108+
{
109+
$stream = $this->getMockBuilder('React\Socket\ConnectionInterface')->getMock();
110+
$stream->expects($this->never())->method('write')->with("*2\r\n$4\r\nauth\r\n$5\r\nworld\r\n");
111+
112+
$this->connector->expects($this->never())->method('connect')->with('unix:///tmp/redis.sock')->willReturn(Promise\resolve($stream));
113+
$this->factory->createLazyClient('redis+unix:///tmp/redis.sock?password=world');
114+
}
115+
116+
public function testWillWriteAuthCommandIfRedisUnixUriContainsUserInfo()
117+
{
118+
$stream = $this->getMockBuilder('React\Socket\ConnectionInterface')->getMock();
119+
$stream->expects($this->never())->method('write')->with("*2\r\n$4\r\nauth\r\n$5\r\nworld\r\n");
120+
121+
$this->connector->expects($this->never())->method('connect')->with('unix:///tmp/redis.sock')->willReturn(Promise\resolve($stream));
122+
$this->factory->createLazyClient('redis+unix://hello:world@/tmp/redis.sock');
123+
}
124+
125+
public function testWillWriteSelectCommandIfRedisUnixUriContainsDbQueryParameter()
126+
{
127+
$stream = $this->getMockBuilder('React\Socket\ConnectionInterface')->getMock();
128+
$stream->expects($this->never())->method('write')->with("*2\r\n$6\r\nselect\r\n$4\r\ndemo\r\n");
129+
130+
$this->connector->expects($this->never())->method('connect')->with('unix:///tmp/redis.sock')->willReturn(Promise\resolve($stream));
131+
$this->factory->createLazyClient('redis+unix:///tmp/redis.sock?db=demo');
132+
}
133+
134+
public function testWillRejectIfConnectorRejects()
135+
{
136+
$this->connector->expects($this->never())->method('connect')->with('127.0.0.1:2')->willReturn(Promise\reject(new \RuntimeException()));
137+
$client = $this->factory->createLazyClient('redis://127.0.0.1:2');
138+
139+
$this->assertInstanceOf('Clue\React\Redis\Client', $client);
140+
}
141+
142+
public function testWillRejectIfTargetIsInvalid()
143+
{
144+
$client = $this->factory->createLazyClient('http://invalid target');
145+
146+
$this->assertInstanceOf('Clue\React\Redis\Client', $client);
147+
}
148+
}

0 commit comments

Comments
 (0)