Skip to content
This repository was archived by the owner on Feb 7, 2024. It is now read-only.

Commit 1e2838f

Browse files
committed
Scope pub/sub channels in Redis by appId to avoid crosstalk between apps
1 parent e58cfea commit 1e2838f

File tree

3 files changed

+35
-24
lines changed

3 files changed

+35
-24
lines changed

src/PubSub/Redis/RedisClient.php

Lines changed: 29 additions & 21 deletions
Original file line numberDiff line numberDiff line change
@@ -78,40 +78,45 @@ public function boot(LoopInterface $loop): ReplicationInterface
7878
*
7979
* @param string $redisChannel
8080
* @param string $payload
81-
* @return bool
8281
*/
8382
protected function onMessage(string $redisChannel, string $payload)
8483
{
8584
$payload = json_decode($payload);
8685

8786
// Ignore messages sent by ourselves
8887
if (isset($payload->serverId) && $this->serverId === $payload->serverId) {
89-
return false;
88+
return;
9089
}
9190

92-
// We need to put the channel name in the payload
93-
$payload->channel = $redisChannel;
91+
// Pull out the app ID. See RedisPusherBroadcaster
92+
$appId = $payload->appId;
93+
94+
// We need to put the channel name in the payload.
95+
// We strip the app ID from the channel name, websocket clients
96+
// expect the channel name to not include the app ID.
97+
$payload->channel = Str::after($redisChannel, "$appId:");
9498

9599
/* @var $channelManager ChannelManager */
96100
$channelManager = app(ChannelManager::class);
97101

98102
// Load the Channel instance, if any
99-
$channel = $channelManager->find($payload->appId, $payload->channel);
100-
if ($channel === null) {
101-
return false;
103+
$channel = $channelManager->find($appId, $payload->channel);
104+
105+
// If no channel is found, none of our connections want to
106+
// receive this message, so we ignore it.
107+
if (! $channel) {
108+
return;
102109
}
103110

104-
$socket = $payload->socket;
111+
$socket = $payload->socket ?? null;
105112

106-
// Remove the internal keys from the payload
113+
// Remove fields intended for internal use from the payload
107114
unset($payload->socket);
108115
unset($payload->serverId);
109116
unset($payload->appId);
110117

111118
// Push the message out to connected websocket clients
112119
$channel->broadcastToEveryoneExcept($payload, $socket);
113-
114-
return true;
115120
}
116121

117122
/**
@@ -123,13 +128,13 @@ protected function onMessage(string $redisChannel, string $payload)
123128
*/
124129
public function subscribe(string $appId, string $channel): bool
125130
{
126-
if (! isset($this->subscribedChannels[$channel])) {
131+
if (! isset($this->subscribedChannels["$appId:$channel"])) {
127132
// We're not subscribed to the channel yet, subscribe and set the count to 1
128-
$this->subscribeClient->__call('subscribe', [$channel]);
129-
$this->subscribedChannels[$channel] = 1;
133+
$this->subscribeClient->__call('subscribe', ["$appId:$channel"]);
134+
$this->subscribedChannels["$appId:$channel"] = 1;
130135
} else {
131136
// Increment the subscribe count if we've already subscribed
132-
$this->subscribedChannels[$channel]++;
137+
$this->subscribedChannels["$appId:$channel"]++;
133138
}
134139

135140
return true;
@@ -144,17 +149,17 @@ public function subscribe(string $appId, string $channel): bool
144149
*/
145150
public function unsubscribe(string $appId, string $channel): bool
146151
{
147-
if (! isset($this->subscribedChannels[$channel])) {
152+
if (! isset($this->subscribedChannels["$appId:$channel"])) {
148153
return false;
149154
}
150155

151156
// Decrement the subscription count for this channel
152-
$this->subscribedChannels[$channel]--;
157+
$this->subscribedChannels["$appId:$channel"]--;
153158

154159
// If we no longer have subscriptions to that channel, unsubscribe
155-
if ($this->subscribedChannels[$channel] < 1) {
156-
$this->subscribeClient->__call('unsubscribe', [$channel]);
157-
unset($this->subscribedChannels[$channel]);
160+
if ($this->subscribedChannels["$appId:$channel"] < 1) {
161+
$this->subscribeClient->__call('unsubscribe', ["$appId:$channel"]);
162+
unset($this->subscribedChannels["$appId:$channel"]);
158163
}
159164

160165
return true;
@@ -173,7 +178,10 @@ public function publish(string $appId, string $channel, stdClass $payload): bool
173178
$payload->appId = $appId;
174179
$payload->serverId = $this->serverId;
175180

176-
$this->publishClient->__call('publish', [$channel, json_encode($payload)]);
181+
$this->publishClient->__call('publish', ["$appId:$channel", json_encode($payload)]);
182+
183+
return true;
184+
}
177185

178186
return true;
179187
}

src/PubSub/Redis/RedisPusherBroadcaster.php

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -144,7 +144,7 @@ public function broadcast(array $channels, $event, array $payload = [])
144144
]);
145145

146146
foreach ($this->formatChannels($channels) as $channel) {
147-
$connection->publish($channel, $payload);
147+
$connection->publish("{$this->appId}:$channel", $payload);
148148
}
149149
}
150150
}

src/WebSockets/Channels/Channel.php

Lines changed: 5 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -107,11 +107,14 @@ public function broadcastToOthers(ConnectionInterface $connection, $payload)
107107
->publish($connection->app->id, $payload);
108108
}
109109

110-
$this->broadcastToEveryoneExcept($payload, $connection->socketId, $connection->app->id);
110+
$this->broadcastToEveryoneExcept($payload, $connection->socketId);
111111
}
112112

113-
public function broadcastToEveryoneExcept($payload, ?string $socketId = null, ?string $appId = null)
113+
public function broadcastToEveryoneExcept($payload, ?string $socketId = null)
114114
{
115+
// Performance optimization, if we don't have a socket ID,
116+
// then we avoid running the if condition in the foreach loop below
117+
// by calling broadcast() instead.
115118
if (is_null($socketId)) {
116119
$this->broadcast($payload);
117120

0 commit comments

Comments
 (0)