Skip to content
This repository was archived by the owner on Feb 7, 2024. It is now read-only.

[fix] Redis connection counter didn't work properly #497

Merged
merged 26 commits into from
Sep 6, 2020
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
26 commits
Select commit Hold shift + click to select a range
fadb3fc
Added redis connection counter.
rennokki Sep 3, 2020
d5a90d8
Using the built-in Redis cache connection to handle non-pubsub features.
rennokki Sep 3, 2020
21db4b3
Using the concatenated string for the config retrieve
rennokki Sep 3, 2020
a45c0bf
Using the Redis non-blocking client.
rennokki Sep 4, 2020
855646a
Apply fixes from StyleCI (#500)
rennokki Sep 4, 2020
e9ec650
Removed $redis from RedisClient
rennokki Sep 4, 2020
4e11355
Merge branch 'fix/app-connections-count' of github.com:beyondcode/lar…
rennokki Sep 4, 2020
b9dfeca
Using separate connection counts for global & local.
rennokki Sep 4, 2020
0375000
Remove duplicated method.
rennokki Sep 4, 2020
0cb0e6c
Added local driver as default
rennokki Sep 4, 2020
e6cfa85
Replaced blocking Redis instance with non-blocking I/O client
rennokki Sep 4, 2020
d20adcd
Apply fixes from StyleCI (#502)
rennokki Sep 4, 2020
ea97410
Fixed tests
rennokki Sep 4, 2020
1dfe14a
Merge branch 'fix/app-connections-count' of github.com:beyondcode/lar…
rennokki Sep 4, 2020
1e2672d
Updated tests
rennokki Sep 4, 2020
b2ac909
Apply fixes from StyleCI (#503)
rennokki Sep 4, 2020
7a629cf
Fixed typo
rennokki Sep 4, 2020
cadd260
Merge branch 'fix/app-connections-count' of github.com:beyondcode/lar…
rennokki Sep 4, 2020
7e9d3cd
Fixed tests
rennokki Sep 4, 2020
b45d786
Merge branch '2.x' of github.com:beyondcode/laravel-websockets into f…
rennokki Sep 5, 2020
ca4a9a1
Running then() closures as block in tests
rennokki Sep 5, 2020
593c48f
Fixed statistics logger
rennokki Sep 5, 2020
dd33a33
Apply fixes from StyleCI (#505)
rennokki Sep 5, 2020
b2263dc
Forcing ^2.0 on react/promise
rennokki Sep 5, 2020
1c889be
Merge branch 'fix/app-connections-count' of github.com:beyondcode/lar…
rennokki Sep 5, 2020
5ba24cb
Fixed tests for stats metrics
rennokki Sep 6, 2020
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 2 additions & 1 deletion composer.json
Original file line number Diff line number Diff line change
Expand Up @@ -36,11 +36,12 @@
"illuminate/routing": "^6.0|^7.0",
"illuminate/support": "^6.0|^7.0",
"pusher/pusher-php-server": "^3.0|^4.0",
"react/dns": "^1.1",
"react/promise": "^2.0",
"symfony/http-kernel": "^4.0|^5.0",
"symfony/psr-http-message-bridge": "^1.1|^2.0"
},
"require-dev": {
"clue/block-react": "^1.4",
"mockery/mockery": "^1.3",
"orchestra/testbench-browser-kit": "^4.0|^5.0",
"phpunit/phpunit": "^8.0|^9.0"
Expand Down
19 changes: 4 additions & 15 deletions config/websockets.php
Original file line number Diff line number Diff line change
Expand Up @@ -42,21 +42,6 @@

'app' => \BeyondCode\LaravelWebSockets\Apps\ConfigAppManager::class,

/*
|--------------------------------------------------------------------------
| Channel Manager
|--------------------------------------------------------------------------
|
| When users subscribe or unsubscribe from specific channels,
| the connections are stored to keep track of any interaction with the
| WebSocket server.
| You can however add your own implementation that will help the store
| of the channels alongside their connections.
|
*/

'channel' => \BeyondCode\LaravelWebSockets\WebSockets\Channels\ChannelManagers\ArrayChannelManager::class,

],

/*
Expand Down Expand Up @@ -191,6 +176,8 @@

'statistics_logger' => \BeyondCode\LaravelWebSockets\Statistics\Logger\MemoryStatisticsLogger::class,

'channel_manager' => \BeyondCode\LaravelWebSockets\WebSockets\Channels\ChannelManagers\ArrayChannelManager::class,

],

/*
Expand All @@ -214,6 +201,8 @@

'statistics_logger' => \BeyondCode\LaravelWebSockets\Statistics\Logger\RedisStatisticsLogger::class,

'channel_manager' => \BeyondCode\LaravelWebSockets\WebSockets\Channels\ChannelManagers\RedisChannelManager::class,

],

],
Expand Down
44 changes: 44 additions & 0 deletions src/PubSub/Drivers/LocalClient.php
Original file line number Diff line number Diff line change
Expand Up @@ -66,6 +66,28 @@ public function unsubscribe($appId, string $channel): bool
return true;
}

/**
* Subscribe to the app's pubsub keyspace.
*
* @param mixed $appId
* @return bool
*/
public function subscribeToApp($appId): bool
{
return true;
}

/**
* Unsubscribe from the app's pubsub keyspace.
*
* @param mixed $appId
* @return bool
*/
public function unsubscribeFromApp($appId): bool
{
return true;
}

/**
* Add a member to a channel. To be called when they have
* subscribed to the channel.
Expand Down Expand Up @@ -137,4 +159,26 @@ public function channelMemberCounts($appId, array $channelNames): PromiseInterfa

return new FulfilledPromise($results);
}

/**
* Get the amount of unique connections.
*
* @param mixed $appId
* @return null|int
*/
public function getLocalConnectionsCount($appId)
{
return null;
}

/**
* Get the amount of connections aggregated on multiple instances.
*
* @param mixed $appId
* @return null|int|\React\Promise\PromiseInterface
*/
public function getGlobalConnectionsCount($appId)
{
return null;
}
}
57 changes: 52 additions & 5 deletions src/PubSub/Drivers/RedisClient.php
Original file line number Diff line number Diff line change
Expand Up @@ -175,6 +175,36 @@ public function unsubscribe($appId, string $channel): bool
return true;
}

/**
* Subscribe to the app's pubsub keyspace.
*
* @param mixed $appId
* @return bool
*/
public function subscribeToApp($appId): bool
{
$this->subscribeClient->__call('subscribe', [$this->getTopicName($appId)]);

$this->publishClient->__call('hincrby', [$this->getTopicName($appId), 'connections', 1]);

return true;
}

/**
* Unsubscribe from the app's pubsub keyspace.
*
* @param mixed $appId
* @return bool
*/
public function unsubscribeFromApp($appId): bool
{
$this->subscribeClient->__call('unsubscribe', [$this->getTopicName($appId)]);

$this->publishClient->__call('hincrby', [$this->getTopicName($appId), 'connections', -1]);

return true;
}

/**
* Add a member to a channel. To be called when they have
* subscribed to the channel.
Expand Down Expand Up @@ -258,6 +288,17 @@ public function channelMemberCounts($appId, array $channelNames): PromiseInterfa
});
}

/**
* Get the amount of connections aggregated on multiple instances.
*
* @param mixed $appId
* @return null|int|\React\Promise\PromiseInterface
*/
public function getGlobalConnectionsCount($appId)
{
return $this->publishClient->hget($this->getTopicName($appId), 'connections');
}

/**
* Handle a message received from Redis on a specific channel.
*
Expand Down Expand Up @@ -321,8 +362,8 @@ public function onMessage(string $redisChannel, string $payload)
*/
protected function getConnectionUri()
{
$name = config('websockets.replication.redis.connection') ?: 'default';
$config = config('database.redis')[$name];
$name = config('websockets.replication.redis.connection', 'default');
$config = config("database.redis.{$name}");

$host = $config['host'];
$port = $config['port'] ?: 6379;
Expand Down Expand Up @@ -377,13 +418,19 @@ public function getServerId()
* app ID and channel name.
*
* @param mixed $appId
* @param string $channel
* @param string|null $channel
* @return string
*/
protected function getTopicName($appId, string $channel): string
protected function getTopicName($appId, string $channel = null): string
{
$prefix = config('database.redis.options.prefix', null);

return "{$prefix}{$appId}:{$channel}";
$hash = "{$prefix}{$appId}";

if ($channel) {
$hash .= ":{$channel}";
}

return $hash;
}
}
32 changes: 32 additions & 0 deletions src/PubSub/ReplicationInterface.php
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,22 @@ public function subscribe($appId, string $channel): bool;
*/
public function unsubscribe($appId, string $channel): bool;

/**
* Subscribe to the app's pubsub keyspace.
*
* @param mixed $appId
* @return bool
*/
public function subscribeToApp($appId): bool;

/**
* Unsubscribe from the app's pubsub keyspace.
*
* @param mixed $appId
* @return bool
*/
public function unsubscribeFromApp($appId): bool;

/**
* Add a member to a channel. To be called when they have
* subscribed to the channel.
Expand Down Expand Up @@ -85,4 +101,20 @@ public function channelMembers($appId, string $channel): PromiseInterface;
* @return PromiseInterface
*/
public function channelMemberCounts($appId, array $channelNames): PromiseInterface;

/**
* Get the amount of unique connections.
*
* @param mixed $appId
* @return null|int
*/
public function getLocalConnectionsCount($appId);

/**
* Get the amount of connections aggregated on multiple instances.
*
* @param mixed $appId
* @return null|int|\React\Promise\PromiseInterface
*/
public function getGlobalConnectionsCount($appId);
}
2 changes: 1 addition & 1 deletion src/Statistics/Logger/MemoryStatisticsLogger.php
Original file line number Diff line number Diff line change
Expand Up @@ -105,7 +105,7 @@ public function save()

$this->createRecord($statistic, $appId);

$currentConnectionCount = $this->channelManager->getConnectionCount($appId);
$currentConnectionCount = $this->channelManager->getGlobalConnectionsCount($appId);

$statistic->reset($currentConnectionCount);
}
Expand Down
Loading