Skip to content

Lazy Streaming Client #82

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Closed
wants to merge 6 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
7 changes: 7 additions & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@ It enables you to set and query its data or use its PubSub topics to react to in
* [Usage](#usage)
* [Factory](#factory)
* [createClient()](#createclient)
* [createLazyClient()](#createlazyclient)
* [Client](#client)
* [Commands](#commands)
* [Promises](#promises)
Expand Down Expand Up @@ -166,6 +167,12 @@ $factory->createClient('redis+unix:///tmp/redis.sock?password=secret&db=2');
$factory->createClient('redis+unix://:secret@/tmp/redis.sock');
```

#### createLazyClient()

The `createLazyClient($redisUri)` method can be used to create a new [`Client`](#client) which lazily
creates and connects to the configured redis server on the first command. Internally it will use `createClient()`
when the first command comes in, queues all commands while connecting, and pass on all commands directly when connected.

### Client

The `Client` is responsible for exchanging messages with Redis
Expand Down
9 changes: 6 additions & 3 deletions examples/cli.php
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,10 @@

echo '# connecting to redis...' . PHP_EOL;

$factory->createClient('localhost')->then(function (Client $client) use ($loop) {
/** @var Client $client */
$client = $factory->createLazyClient('localhost');

try {
echo '# connected! Entering interactive mode, hit CTRL-D to quit' . PHP_EOL;

$loop->addReadStream(STDIN, function () use ($client, $loop) {
Expand Down Expand Up @@ -48,10 +51,10 @@

$loop->removeReadStream(STDIN);
});
}, function (Exception $error) {
} catch (Exception $error) {
echo 'CONNECTION ERROR: ' . $error->getMessage() . PHP_EOL;
exit(1);
});
};


$loop->run();
14 changes: 7 additions & 7 deletions examples/incr.php
Original file line number Diff line number Diff line change
Expand Up @@ -8,14 +8,14 @@
$loop = React\EventLoop\Factory::create();
$factory = new Factory($loop);

$factory->createClient('localhost')->then(function (Client $client) {
$client->incr('test');
/** @var Client $client */
$client = $factory->createLazyClient('localhost');
$client->incr('test');

$client->get('test')->then(function ($result) {
var_dump($result);
});

$client->end();
$client->get('test')->then(function ($result) {
var_dump($result);
});

$client->end();

$loop->run();
12 changes: 6 additions & 6 deletions examples/publish.php
Original file line number Diff line number Diff line change
Expand Up @@ -11,12 +11,12 @@
$channel = isset($argv[1]) ? $argv[1] : 'channel';
$message = isset($argv[2]) ? $argv[2] : 'message';

$factory->createClient('localhost')->then(function (Client $client) use ($channel, $message) {
$client->publish($channel, $message)->then(function ($received) {
echo 'successfully published. Received by ' . $received . PHP_EOL;
});

$client->end();
/** @var Client $client */
$client = $factory->createLazyClient('localhost');
$client->publish($channel, $message)->then(function ($received) {
echo 'successfully published. Received by ' . $received . PHP_EOL;
});

$client->end();

$loop->run();
14 changes: 7 additions & 7 deletions examples/subscribe.php
Original file line number Diff line number Diff line change
Expand Up @@ -10,14 +10,14 @@

$channel = isset($argv[1]) ? $argv[1] : 'channel';

$factory->createClient('localhost')->then(function (Client $client) use ($channel) {
$client->subscribe($channel)->then(function () {
echo 'Now subscribed to channel ' . PHP_EOL;
});
/** @var Client $client */
$client = $factory->createLazyClient('localhost');
$client->subscribe($channel)->then(function () {
echo 'Now subscribed to channel ' . PHP_EOL;
});

$client->on('message', function ($channel, $message) {
echo 'Message on ' . $channel . ': ' . $message . PHP_EOL;
});
$client->on('message', function ($channel, $message) {
echo 'Message on ' . $channel . ': ' . $message . PHP_EOL;
});

$loop->run();
5 changes: 5 additions & 0 deletions src/Factory.php
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,11 @@ public function __construct(LoopInterface $loop, ConnectorInterface $connector =
$this->protocol = $protocol;
}

public function createLazyClient($target)
{
return new LazyStreamingClient($target, $this);
}

/**
* create redis client connected to address of given redis instance
*
Expand Down
122 changes: 122 additions & 0 deletions src/LazyStreamingClient.php
Original file line number Diff line number Diff line change
@@ -0,0 +1,122 @@
<?php

namespace Clue\React\Redis;

use Evenement\EventEmitter;
use Clue\Redis\Protocol\Parser\ParserInterface;
use Clue\Redis\Protocol\Parser\ParserException;
use Clue\Redis\Protocol\Serializer\SerializerInterface;
use Clue\Redis\Protocol\Factory as ProtocolFactory;
use React\Promise\FulfilledPromise;
use React\Promise\Promise;
use React\Promise\PromiseInterface;
use React\Stream\Util;
use UnderflowException;
use RuntimeException;
use InvalidArgumentException;
use React\Promise\Deferred;
use Clue\Redis\Protocol\Model\ErrorReply;
use Clue\Redis\Protocol\Model\ModelInterface;
use Clue\Redis\Protocol\Model\MultiBulkReply;
use React\Stream\DuplexStreamInterface;

/**
* @internal
*/
class LazyStreamingClient extends EventEmitter implements Client
{
private $target;
/** @var Factory */
private $factory;
private $ending = false;
private $closed = false;
public $promise = null;
public $client = null;

/**
* @param $target
*/
public function __construct($target, Factory $factory)
{
$this->target = $target;
$this->factory = $factory;

$this->on('close', array($this, 'removeAllListeners'));
}

private function client()
{
if ($this->promise instanceof PromiseInterface) {
return $this->promise;
}

if ($this->client instanceof Client) {
return new FulfilledPromise($this->client());
}

$self = $this;
return $this->promise = $this->factory->createClient($this->target)->then(function (Client $client) use ($self) {
$self->client = $client;
$self->promise = null;

Util::forwardEvents(
$self->client,
$self,
array(
'error',
'close',
'message',
'subscribe',
'unsubscribe',
'pmessage',
'psubscribe',
'punsubscribe',
)
);

return $client;
}, function (\Exception $e) use ($self) {
// connection failed => emit error if connection is not already closed
if ($self->closed) {
return;
}
$self->emit('error', array($e));
$self->close();

return $e;
});
}

public function __call($name, $args)
{
if ($this->client instanceof Client) {
return \call_user_func_array(array($this->client, $name), $args);
}

return $this->client()->then(function (Client $client) use ($name, $args) {
return \call_user_func_array(array($client, $name), $args);
});
}

public function end()
{
if ($this->client instanceof Client) {
return $this->client->end();
}

return $this->client()->then(function (Client $client) {
return $client->end();
});
}

public function close()
{
if ($this->client instanceof Client) {
return $this->client->close();
}

return $this->client()->then(function (Client $client) {
return $client->close();
});
}
}
151 changes: 151 additions & 0 deletions tests/FactoryLazyStreamingClientTest.php
Original file line number Diff line number Diff line change
@@ -0,0 +1,151 @@
<?php

use Clue\React\Redis\Factory;
use React\Promise;

class FactoryLazyStreamingClientTest extends TestCase
{
private $loop;
private $connector;
private $factory;

public function setUp()
{
$this->loop = $this->getMockBuilder('React\EventLoop\LoopInterface')->getMock();
$this->connector = $this->getMockBuilder('React\Socket\ConnectorInterface')->getMock();
$this->factory = new Factory($this->loop, $this->connector);
}

public function testCtor()
{
$this->factory = new Factory($this->loop);
}

public function testWillConnectWithDefaultPort()
{
$this->connector->expects($this->never())->method('connect')->with('redis.example.com:6379')->willReturn(Promise\reject(new \RuntimeException()));
$this->factory->createLazyClient('redis.example.com');
}

public function testWillConnectToLocalhost()
{
$this->connector->expects($this->never())->method('connect')->with('localhost:1337')->willReturn(Promise\reject(new \RuntimeException()));
$this->factory->createLazyClient('localhost:1337');
}

public function testWillResolveIfConnectorResolves()
{
$stream = $this->getMockBuilder('React\Socket\ConnectionInterface')->getMock();
$stream->expects($this->never())->method('write');

$this->connector->expects($this->never())->method('connect')->willReturn(Promise\resolve($stream));
$client = $this->factory->createLazyClient('localhost');

$this->assertInstanceOf('Clue\React\Redis\Client', $client);
}

public function testWillWriteSelectCommandIfTargetContainsPath()
{
$stream = $this->getMockBuilder('React\Socket\ConnectionInterface')->getMock();
$stream->expects($this->never())->method('write')->with("*2\r\n$6\r\nselect\r\n$4\r\ndemo\r\n");

$this->connector->expects($this->never())->method('connect')->willReturn(Promise\resolve($stream));
$this->factory->createLazyClient('redis://127.0.0.1/demo');
}

public function testWillWriteSelectCommandIfTargetContainsDbQueryParameter()
{
$stream = $this->getMockBuilder('React\Socket\ConnectionInterface')->getMock();
$stream->expects($this->never())->method('write')->with("*2\r\n$6\r\nselect\r\n$1\r\n4\r\n");

$this->connector->expects($this->never())->method('connect')->willReturn(Promise\resolve($stream));
$this->factory->createLazyClient('redis://127.0.0.1?db=4');
}

public function testWillWriteAuthCommandIfRedisUriContainsUserInfo()
{
$stream = $this->getMockBuilder('React\Socket\ConnectionInterface')->getMock();
$stream->expects($this->never())->method('write')->with("*2\r\n$4\r\nauth\r\n$5\r\nworld\r\n");

$this->connector->expects($this->never())->method('connect')->with('example.com:6379')->willReturn(Promise\resolve($stream));
$this->factory->createLazyClient('redis://hello:world@example.com');
}

public function testWillWriteAuthCommandIfRedisUriContainsEncodedUserInfo()
{
$stream = $this->getMockBuilder('React\Socket\ConnectionInterface')->getMock();
$stream->expects($this->never())->method('write')->with("*2\r\n$4\r\nauth\r\n$5\r\nh@llo\r\n");

$this->connector->expects($this->never())->method('connect')->with('example.com:6379')->willReturn(Promise\resolve($stream));
$this->factory->createLazyClient('redis://:h%40llo@example.com');
}

public function testWillWriteAuthCommandIfTargetContainsPasswordQueryParameter()
{
$stream = $this->getMockBuilder('React\Socket\ConnectionInterface')->getMock();
$stream->expects($this->never())->method('write')->with("*2\r\n$4\r\nauth\r\n$6\r\nsecret\r\n");

$this->connector->expects($this->never())->method('connect')->with('example.com:6379')->willReturn(Promise\resolve($stream));
$this->factory->createLazyClient('redis://example.com?password=secret');
}

public function testWillWriteAuthCommandIfTargetContainsEncodedPasswordQueryParameter()
{
$stream = $this->getMockBuilder('React\Socket\ConnectionInterface')->getMock();
$stream->expects($this->never())->method('write')->with("*2\r\n$4\r\nauth\r\n$5\r\nh@llo\r\n");

$this->connector->expects($this->never())->method('connect')->with('example.com:6379')->willReturn(Promise\resolve($stream));
$this->factory->createLazyClient('redis://example.com?password=h%40llo');
}

public function testWillWriteAuthCommandIfRedissUriContainsUserInfo()
{
$stream = $this->getMockBuilder('React\Socket\ConnectionInterface')->getMock();
$stream->expects($this->never())->method('write')->with("*2\r\n$4\r\nauth\r\n$5\r\nworld\r\n");

$this->connector->expects($this->never())->method('connect')->with('tls://example.com:6379')->willReturn(Promise\resolve($stream));
$this->factory->createLazyClient('rediss://hello:world@example.com');
}

public function testWillWriteAuthCommandIfRedisUnixUriContainsPasswordQueryParameter()
{
$stream = $this->getMockBuilder('React\Socket\ConnectionInterface')->getMock();
$stream->expects($this->never())->method('write')->with("*2\r\n$4\r\nauth\r\n$5\r\nworld\r\n");

$this->connector->expects($this->never())->method('connect')->with('unix:///tmp/redis.sock')->willReturn(Promise\resolve($stream));
$this->factory->createLazyClient('redis+unix:///tmp/redis.sock?password=world');
}

public function testWillWriteAuthCommandIfRedisUnixUriContainsUserInfo()
{
$stream = $this->getMockBuilder('React\Socket\ConnectionInterface')->getMock();
$stream->expects($this->never())->method('write')->with("*2\r\n$4\r\nauth\r\n$5\r\nworld\r\n");

$this->connector->expects($this->never())->method('connect')->with('unix:///tmp/redis.sock')->willReturn(Promise\resolve($stream));
$this->factory->createLazyClient('redis+unix://hello:world@/tmp/redis.sock');
}

public function testWillWriteSelectCommandIfRedisUnixUriContainsDbQueryParameter()
{
$stream = $this->getMockBuilder('React\Socket\ConnectionInterface')->getMock();
$stream->expects($this->never())->method('write')->with("*2\r\n$6\r\nselect\r\n$4\r\ndemo\r\n");

$this->connector->expects($this->never())->method('connect')->with('unix:///tmp/redis.sock')->willReturn(Promise\resolve($stream));
$this->factory->createLazyClient('redis+unix:///tmp/redis.sock?db=demo');
}

public function testWillRejectIfConnectorRejects()
{
$this->connector->expects($this->never())->method('connect')->with('127.0.0.1:2')->willReturn(Promise\reject(new \RuntimeException()));
$client = $this->factory->createLazyClient('redis://127.0.0.1:2');

$this->assertInstanceOf('Clue\React\Redis\Client', $client);
}

public function testWillRejectIfTargetIsInvalid()
{
$client = $this->factory->createLazyClient('http://invalid target');

$this->assertInstanceOf('Clue\React\Redis\Client', $client);
}
}
Loading