Skip to content

Commit d5487c3

Browse files
committed
Do not enter "idle" state when using PubSub channels
1 parent c9f2cc5 commit d5487c3

File tree

3 files changed

+57
-12
lines changed

3 files changed

+57
-12
lines changed

README.md

Lines changed: 7 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -235,12 +235,13 @@ will again try to open a new underlying connection. Note that this may
235235
require special care if you're using transactions (`MULTI`/`EXEC`) that are kept
236236
open for longer than the idle period.
237237

238-
If the underlying database connection drops while using PubSub channels
239-
(see `SUBSCRIBE` and `PSUBSCRIBE` commands), it will automatically send the
240-
appropriate `unsubscribe` and `punsubscribe` events for all currently active
241-
channel and pattern subscriptions. This allows you to react to these
242-
events and restore your subscriptions by creating a new underlying
243-
connection with the above commands.
238+
While using PubSub channels (see `SUBSCRIBE` and `PSUBSCRIBE` commands), this client
239+
will never reach an "idle" state and will keep pending forever (or until the
240+
underlying database connection is lost). Additionally, if the underlying
241+
database connection drops, it will automatically send the appropriate `unsubscribe`
242+
and `punsubscribe` events for all currently active channel and pattern subscriptions.
243+
This allows you to react to these events and restore your subscriptions by
244+
creating a new underlying connection repeating the above commands again.
244245

245246
Note that creating the underlying connection will be deferred until the
246247
first request is invoked. Accordingly, any eventual connection issues

src/LazyClient.php

Lines changed: 11 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -22,6 +22,9 @@ class LazyClient extends EventEmitter implements Client
2222
private $idleTimer;
2323
private $pending = 0;
2424

25+
private $subscribed = array();
26+
private $psubscribed = array();
27+
2528
/**
2629
* @param $target
2730
*/
@@ -47,11 +50,11 @@ private function client()
4750
$self = $this;
4851
$pending =& $this->promise;
4952
$idleTimer=& $this->idleTimer;
53+
$subscribed =& $this->subscribed;
54+
$psubscribed =& $this->psubscribed;
5055
$loop = $this->loop;
51-
return $pending = $this->factory->createClient($this->target)->then(function (Client $client) use ($self, &$pending, &$idleTimer, $loop) {
56+
return $pending = $this->factory->createClient($this->target)->then(function (Client $client) use ($self, &$pending, &$idleTimer, &$subscribed, &$psubscribed, $loop) {
5257
// connection completed => remember only until closed
53-
$subscribed = array();
54-
$psubscribed = array();
5558
$client->on('close', function () use (&$pending, $self, &$subscribed, &$psubscribed, &$idleTimer, $loop) {
5659
$pending = null;
5760

@@ -64,6 +67,8 @@ private function client()
6467
foreach ($psubscribed as $pattern => $_) {
6568
$self->emit('punsubscribe', array($pattern, --$n));
6669
}
70+
$subscribed = array();
71+
$psubscribed = array();
6772

6873
if ($idleTimer !== null) {
6974
$loop->cancelTimer($idleTimer);
@@ -194,15 +199,15 @@ public function idle()
194199
{
195200
--$this->pending;
196201

197-
if ($this->pending < 1 && $this->idlePeriod >= 0) {
202+
if ($this->pending < 1 && $this->idlePeriod >= 0 && !$this->subscribed && !$this->psubscribed) {
198203
$idleTimer =& $this->idleTimer;
199204
$promise =& $this->promise;
200205
$idleTimer = $this->loop->addTimer($this->idlePeriod, function () use (&$idleTimer, &$promise) {
201206
$promise->then(function (Client $client) {
202207
$client->close();
203208
});
204-
$promise = null;
205-
$idleTimer = null;
209+
$promise = null;
210+
$idleTimer = null;
206211
});
207212
}
208213
}

tests/LazyClientTest.php

Lines changed: 39 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -438,4 +438,43 @@ public function testEmitsUnsubscribeAndPunsubscribeEventsWhenUnderlyingClientClo
438438
$this->client->on('punsubscribe', $this->expectCallableOnce());
439439
$client->emit('close');
440440
}
441+
442+
public function testSubscribeWillResolveWhenUnderlyingClientResolvesSubscribeAndNotStartIdleTimerWithIdleDueToSubscription()
443+
{
444+
$deferred = new Deferred();
445+
$client = $this->getMockBuilder('Clue\React\Redis\StreamingClient')->disableOriginalConstructor()->setMethods(array('__call'))->getMock();
446+
$client->expects($this->once())->method('__call')->with('subscribe')->willReturn($deferred->promise());
447+
448+
$this->factory->expects($this->once())->method('createClient')->willReturn(\React\Promise\resolve($client));
449+
450+
$this->loop->expects($this->never())->method('addTimer');
451+
452+
$promise = $this->client->subscribe('foo');
453+
$client->emit('subscribe', array('foo', 1));
454+
$deferred->resolve(array('subscribe', 'foo', 1));
455+
456+
$promise->then($this->expectCallableOnceWith(array('subscribe', 'foo', 1)));
457+
}
458+
459+
public function testUnsubscribeAfterSubscribeWillResolveWhenUnderlyingClientResolvesUnsubscribeAndStartIdleTimerWhenSubscriptionStopped()
460+
{
461+
$deferredSubscribe = new Deferred();
462+
$deferredUnsubscribe = new Deferred();
463+
$client = $this->getMockBuilder('Clue\React\Redis\StreamingClient')->disableOriginalConstructor()->setMethods(array('__call'))->getMock();
464+
$client->expects($this->exactly(2))->method('__call')->willReturnOnConsecutiveCalls($deferredSubscribe->promise(), $deferredUnsubscribe->promise());
465+
466+
$this->factory->expects($this->once())->method('createClient')->willReturn(\React\Promise\resolve($client));
467+
468+
$this->loop->expects($this->once())->method('addTimer');
469+
470+
$promise = $this->client->subscribe('foo');
471+
$client->emit('subscribe', array('foo', 1));
472+
$deferredSubscribe->resolve(array('subscribe', 'foo', 1));
473+
$promise->then($this->expectCallableOnceWith(array('subscribe', 'foo', 1)));
474+
475+
$promise = $this->client->unsubscribe('foo');
476+
$client->emit('unsubscribe', array('foo', 0));
477+
$deferredUnsubscribe->resolve(array('unsubscribe', 'foo', 0));
478+
$promise->then($this->expectCallableOnceWith(array('unsubscribe', 'foo', 0)));
479+
}
441480
}

0 commit comments

Comments
 (0)