Skip to content
This repository was archived by the owner on Feb 7, 2024. It is now read-only.

[fix] Redis connection counter didn't work properly #497

Merged
merged 26 commits into from
Sep 6, 2020
Merged
Show file tree
Hide file tree
Changes from 3 commits
Commits
Show all changes
26 commits
Select commit Hold shift + click to select a range
fadb3fc
Added redis connection counter.
rennokki Sep 3, 2020
d5a90d8
Using the built-in Redis cache connection to handle non-pubsub features.
rennokki Sep 3, 2020
21db4b3
Using the concatenated string for the config retrieve
rennokki Sep 3, 2020
a45c0bf
Using the Redis non-blocking client.
rennokki Sep 4, 2020
855646a
Apply fixes from StyleCI (#500)
rennokki Sep 4, 2020
e9ec650
Removed $redis from RedisClient
rennokki Sep 4, 2020
4e11355
Merge branch 'fix/app-connections-count' of github.com:beyondcode/lar…
rennokki Sep 4, 2020
b9dfeca
Using separate connection counts for global & local.
rennokki Sep 4, 2020
0375000
Remove duplicated method.
rennokki Sep 4, 2020
0cb0e6c
Added local driver as default
rennokki Sep 4, 2020
e6cfa85
Replaced blocking Redis instance with non-blocking I/O client
rennokki Sep 4, 2020
d20adcd
Apply fixes from StyleCI (#502)
rennokki Sep 4, 2020
ea97410
Fixed tests
rennokki Sep 4, 2020
1dfe14a
Merge branch 'fix/app-connections-count' of github.com:beyondcode/lar…
rennokki Sep 4, 2020
1e2672d
Updated tests
rennokki Sep 4, 2020
b2ac909
Apply fixes from StyleCI (#503)
rennokki Sep 4, 2020
7a629cf
Fixed typo
rennokki Sep 4, 2020
cadd260
Merge branch 'fix/app-connections-count' of github.com:beyondcode/lar…
rennokki Sep 4, 2020
7e9d3cd
Fixed tests
rennokki Sep 4, 2020
b45d786
Merge branch '2.x' of github.com:beyondcode/laravel-websockets into f…
rennokki Sep 5, 2020
ca4a9a1
Running then() closures as block in tests
rennokki Sep 5, 2020
593c48f
Fixed statistics logger
rennokki Sep 5, 2020
dd33a33
Apply fixes from StyleCI (#505)
rennokki Sep 5, 2020
b2263dc
Forcing ^2.0 on react/promise
rennokki Sep 5, 2020
1c889be
Merge branch 'fix/app-connections-count' of github.com:beyondcode/lar…
rennokki Sep 5, 2020
5ba24cb
Fixed tests for stats metrics
rennokki Sep 6, 2020
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
19 changes: 4 additions & 15 deletions config/websockets.php
Original file line number Diff line number Diff line change
Expand Up @@ -42,21 +42,6 @@

'app' => \BeyondCode\LaravelWebSockets\Apps\ConfigAppManager::class,

/*
|--------------------------------------------------------------------------
| Channel Manager
|--------------------------------------------------------------------------
|
| When users subscribe or unsubscribe from specific channels,
| the connections are stored to keep track of any interaction with the
| WebSocket server.
| You can however add your own implementation that will help the store
| of the channels alongside their connections.
|
*/

'channel' => \BeyondCode\LaravelWebSockets\WebSockets\Channels\ChannelManagers\ArrayChannelManager::class,

],

/*
Expand Down Expand Up @@ -191,6 +176,8 @@

'statistics_logger' => \BeyondCode\LaravelWebSockets\Statistics\Logger\MemoryStatisticsLogger::class,

'channel_manager' => \BeyondCode\LaravelWebSockets\WebSockets\Channels\ChannelManagers\ArrayChannelManager::class,

],

/*
Expand All @@ -214,6 +201,8 @@

'statistics_logger' => \BeyondCode\LaravelWebSockets\Statistics\Logger\RedisStatisticsLogger::class,

'channel_manager' => \BeyondCode\LaravelWebSockets\WebSockets\Channels\ChannelManagers\RedisChannelManager::class,

],

],
Expand Down
33 changes: 33 additions & 0 deletions src/PubSub/Drivers/LocalClient.php
Original file line number Diff line number Diff line change
Expand Up @@ -66,6 +66,28 @@ public function unsubscribe($appId, string $channel): bool
return true;
}

/**
* Subscribe to the app's pubsub keyspace.
*
* @param mixed $appId
* @return bool
*/
public function subscribeToApp($appId): bool
{
return true;
}

/**
* Unsubscribe from the app's pubsub keyspace.
*
* @param mixed $appId
* @return bool
*/
public function unsubscribeFromApp($appId): bool
{
return true;
}

/**
* Add a member to a channel. To be called when they have
* subscribed to the channel.
Expand Down Expand Up @@ -137,4 +159,15 @@ public function channelMemberCounts($appId, array $channelNames): PromiseInterfa

return new FulfilledPromise($results);
}

/**
* Get the amount of unique connections.
*
* @param mixed $appId
* @return null|int|\React\Promise\PromiseInterface
*/
public function appConnectionsCount($appId)
{
return null;
}
}
68 changes: 63 additions & 5 deletions src/PubSub/Drivers/RedisClient.php
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@
use BeyondCode\LaravelWebSockets\WebSockets\Channels\ChannelManager;
use Clue\React\Redis\Client;
use Clue\React\Redis\Factory;
use Illuminate\Support\Facades\Cache;
use Illuminate\Support\Str;
use React\EventLoop\LoopInterface;
use React\Promise\PromiseInterface;
Expand Down Expand Up @@ -42,6 +43,13 @@ class RedisClient extends LocalClient
*/
protected $subscribeClient;

/**
* The Redis manager instance.
*
* @var \Illuminate\Redis\RedisManager
*/
protected $redis;

/**
* Mapping of subscribed channels, where the key is the channel name,
* and the value is the amount of connections which are subscribed to
Expand All @@ -60,6 +68,7 @@ class RedisClient extends LocalClient
public function __construct()
{
$this->serverId = Str::uuid()->toString();
$this->redis = Cache::getRedis();
}

/**
Expand Down Expand Up @@ -175,6 +184,36 @@ public function unsubscribe($appId, string $channel): bool
return true;
}

/**
* Subscribe to the app's pubsub keyspace.
*
* @param mixed $appId
* @return bool
*/
public function subscribeToApp($appId): bool
{
$this->subscribeClient->__call('subscribe', [$this->getTopicName($appId)]);

$this->redis->hincrby($this->getTopicName($appId), 'connections', 1);

return true;
}

/**
* Unsubscribe from the app's pubsub keyspace.
*
* @param mixed $appId
* @return bool
*/
public function unsubscribeFromApp($appId): bool
{
$this->subscribeClient->__call('unsubscribe', [$this->getTopicName($appId)]);

$this->redis->hincrby($this->getTopicName($appId), 'connections', -1);

return true;
}

/**
* Add a member to a channel. To be called when they have
* subscribed to the channel.
Expand All @@ -187,7 +226,7 @@ public function unsubscribe($appId, string $channel): bool
*/
public function joinChannel($appId, string $channel, string $socketId, string $data)
{
$this->publishClient->__call('hset', [$this->getTopicName($appId, $channel), $socketId, $data]);
$this->redis->hset($this->getTopicName($appId, $channel), $socketId, $data);

DashboardLogger::log($appId, DashboardLogger::TYPE_REPLICATOR_JOINED_CHANNEL, [
'channel' => $channel,
Expand All @@ -209,7 +248,7 @@ public function joinChannel($appId, string $channel, string $socketId, string $d
*/
public function leaveChannel($appId, string $channel, string $socketId)
{
$this->publishClient->__call('hdel', [$this->getTopicName($appId, $channel), $socketId]);
$this->redis->hdel($this->getTopicName($appId, $channel), $socketId);

DashboardLogger::log($appId, DashboardLogger::TYPE_REPLICATOR_LEFT_CHANNEL, [
'channel' => $channel,
Expand Down Expand Up @@ -258,6 +297,19 @@ public function channelMemberCounts($appId, array $channelNames): PromiseInterfa
});
}

/**
* Get the amount of unique connections.
*
* @param mixed $appId
* @return null|int|\React\Promise\PromiseInterface
*/
public function appConnectionsCount($appId)
{
// Use the in-built Redis manager to avoid async run.

return $this->redis->hget($this->getTopicName($appId), 'connections') ?: 0;
}

/**
* Handle a message received from Redis on a specific channel.
*
Expand Down Expand Up @@ -377,13 +429,19 @@ public function getServerId()
* app ID and channel name.
*
* @param mixed $appId
* @param string $channel
* @param string|null $channel
* @return string
*/
protected function getTopicName($appId, string $channel): string
protected function getTopicName($appId, string $channel = null): string
{
$prefix = config('database.redis.options.prefix', null);

return "{$prefix}{$appId}:{$channel}";
$hash = "{$prefix}{$appId}";

if ($channel) {
$hash .= ":{$channel}";
}

return $hash;
}
}
24 changes: 24 additions & 0 deletions src/PubSub/ReplicationInterface.php
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,22 @@ public function subscribe($appId, string $channel): bool;
*/
public function unsubscribe($appId, string $channel): bool;

/**
* Subscribe to the app's pubsub keyspace.
*
* @param mixed $appId
* @return bool
*/
public function subscribeToApp($appId): bool;

/**
* Unsubscribe from the app's pubsub keyspace.
*
* @param mixed $appId
* @return bool
*/
public function unsubscribeFromApp($appId): bool;

/**
* Add a member to a channel. To be called when they have
* subscribed to the channel.
Expand Down Expand Up @@ -85,4 +101,12 @@ public function channelMembers($appId, string $channel): PromiseInterface;
* @return PromiseInterface
*/
public function channelMemberCounts($appId, array $channelNames): PromiseInterface;

/**
* Get the amount of unique connections.
*
* @param mixed $appId
* @return null|int|\React\Promise\PromiseInterface
*/
public function appConnectionsCount($appId);
}
36 changes: 36 additions & 0 deletions src/WebSockets/Channels/ChannelManagers/RedisChannelManager.php
Original file line number Diff line number Diff line change
@@ -0,0 +1,36 @@
<?php

namespace BeyondCode\LaravelWebSockets\WebSockets\Channels\ChannelManagers;

use BeyondCode\LaravelWebSockets\PubSub\ReplicationInterface;

class RedisChannelManager extends ArrayChannelManager
{
/**
* The replicator driver.
*
* @var \BeyondCode\LaravelWebSockets\PubSub\ReplicationInterface
*/
protected $replicator;

/**
* Initialize the channel manager.
*
* @return void
*/
public function __construct()
{
$this->replicator = app(ReplicationInterface::class);
}

/**
* Get the connections count on the app.
*
* @param mixed $appId
* @return int
*/
public function getConnectionCount($appId): int
{
return $this->replicator->appConnectionsCount($appId);
}
}
15 changes: 15 additions & 0 deletions src/WebSockets/WebSocketHandler.php
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@
use BeyondCode\LaravelWebSockets\Apps\App;
use BeyondCode\LaravelWebSockets\Dashboard\DashboardLogger;
use BeyondCode\LaravelWebSockets\Facades\StatisticsLogger;
use BeyondCode\LaravelWebSockets\PubSub\ReplicationInterface;
use BeyondCode\LaravelWebSockets\QueryParameters;
use BeyondCode\LaravelWebSockets\WebSockets\Channels\ChannelManager;
use BeyondCode\LaravelWebSockets\WebSockets\Exceptions\ConnectionsOverCapacity;
Expand All @@ -26,6 +27,13 @@ class WebSocketHandler implements MessageComponentInterface
*/
protected $channelManager;

/**
* The replicator client.
*
* @var ReplicationInterface
*/
protected $replicator;

/**
* Initialize a new handler.
*
Expand All @@ -35,6 +43,7 @@ class WebSocketHandler implements MessageComponentInterface
public function __construct(ChannelManager $channelManager)
{
$this->channelManager = $channelManager;
$this->replicator = app(ReplicationInterface::class);
}

/**
Expand Down Expand Up @@ -83,6 +92,8 @@ public function onClose(ConnectionInterface $connection)
]);

StatisticsLogger::disconnection($connection->app->id);

$this->replicator->unsubscribeFromApp($connection->app->id);
}

/**
Expand All @@ -99,6 +110,8 @@ public function onError(ConnectionInterface $connection, Exception $exception)
$exception->getPayload()
));
}

$this->replicator->unsubscribeFromApp($connection->app->id);
}

/**
Expand Down Expand Up @@ -203,6 +216,8 @@ protected function establishConnection(ConnectionInterface $connection)

StatisticsLogger::connection($connection->app->id);

$this->replicator->subscribeToApp($connection->app->id);

return $this;
}
}
13 changes: 8 additions & 5 deletions src/WebSocketsServiceProvider.php
Original file line number Diff line number Diff line change
Expand Up @@ -59,9 +59,11 @@ public function register()
});

$this->app->singleton(ChannelManager::class, function () {
$channelManager = config('websockets.managers.channel', ArrayChannelManager::class);
$replicationDriver = config('websockets.replication.driver', 'local');

return new $channelManager;
$class = config("websockets.replication.{$replicationDriver}.channel_manager", ArrayChannelManager::class);

return new $class;
});

$this->app->singleton(AppManager::class, function () {
Expand All @@ -72,9 +74,10 @@ public function register()
$driver = config('websockets.statistics.driver');

return $this->app->make(
config('websockets.statistics')[$driver]['driver']
??
\BeyondCode\LaravelWebSockets\Statistics\Drivers\DatabaseDriver::class
config(
"websockets.statistics.{$driver}.driver",
\BeyondCode\LaravelWebSockets\Statistics\Drivers\DatabaseDriver::class
)
);
});
}
Expand Down
Loading