Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions docs/ReleaseNotes.md
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,8 @@ Current package versions:

## Unreleased

- Fix `SSUBSCRIBE` routing during slot migrations ([#2969 by mgravell](https://github.com/StackExchange/StackExchange.Redis/pull/2969))

## 2.9.25

- (build) Fix SNK on non-Windows builds ([#2963 by mgravell](https://github.com/StackExchange/StackExchange.Redis/pull/2963))
Expand Down
2 changes: 1 addition & 1 deletion src/StackExchange.Redis/Message.cs
Original file line number Diff line number Diff line change
Expand Up @@ -856,7 +856,7 @@ protected override void WriteImpl(PhysicalConnection physical)

internal abstract class CommandChannelBase : Message
{
protected readonly RedisChannel Channel;
internal readonly RedisChannel Channel;

protected CommandChannelBase(int db, CommandFlags flags, RedisCommand command, in RedisChannel channel) : base(db, flags, command)
{
Expand Down
270 changes: 204 additions & 66 deletions src/StackExchange.Redis/PhysicalConnection.cs
Original file line number Diff line number Diff line change
Expand Up @@ -29,8 +29,6 @@ internal sealed partial class PhysicalConnection : IDisposable

private const int DefaultRedisDatabaseCount = 16;

private static readonly CommandBytes message = "message", pmessage = "pmessage", smessage = "smessage";

private static readonly Message[] ReusableChangeDatabaseCommands = Enumerable.Range(0, DefaultRedisDatabaseCount).Select(
i => Message.Create(i, CommandFlags.FireAndForget, RedisCommand.SELECT)).ToArray();

Expand Down Expand Up @@ -1669,6 +1667,130 @@ internal async ValueTask<bool> ConnectedAsync(Socket? socket, ILogger? log, Sock
}
}

private enum PushKind
{
None,
Message,
PMessage,
SMessage,
Subscribe,
PSubscribe,
SSubscribe,
Unsubscribe,
PUnsubscribe,
SUnsubscribe,
}
private PushKind GetPushKind(in Sequence<RawResult> result, out RedisChannel channel)
{
var len = result.Length;
if (len < 2)
{
// for supported cases, we demand at least the kind and the subscription channel
channel = default;
return PushKind.None;
}

const int MAX_LEN = 16;
Debug.Assert(MAX_LEN >= Enumerable.Max(
[
PushMessage.Length, PushPMessage.Length, PushSMessage.Length,
PushSubscribe.Length, PushPSubscribe.Length, PushSSubscribe.Length,
PushUnsubscribe.Length, PushPUnsubscribe.Length, PushSUnsubscribe.Length,
]));
ref readonly RawResult pushKind = ref result[0];
var multiSegmentPayload = pushKind.Payload;
if (multiSegmentPayload.Length <= MAX_LEN)
{
var span = multiSegmentPayload.IsSingleSegment
? multiSegmentPayload.First.Span
: CopyTo(stackalloc byte[MAX_LEN], multiSegmentPayload);

var hash = FastHash.Hash64(span);
RedisChannel.RedisChannelOptions channelOptions = RedisChannel.RedisChannelOptions.None;
PushKind kind;
switch (hash)
{
case PushMessage.Hash when PushMessage.Is(hash, span) & len >= 3:
kind = PushKind.Message;
break;
case PushPMessage.Hash when PushPMessage.Is(hash, span) & len >= 4:
channelOptions = RedisChannel.RedisChannelOptions.Pattern;
kind = PushKind.PMessage;
break;
case PushSMessage.Hash when PushSMessage.Is(hash, span) & len >= 3:
channelOptions = RedisChannel.RedisChannelOptions.Sharded;
kind = PushKind.SMessage;
break;
case PushSubscribe.Hash when PushSubscribe.Is(hash, span):
kind = PushKind.Subscribe;
break;
case PushPSubscribe.Hash when PushPSubscribe.Is(hash, span):
channelOptions = RedisChannel.RedisChannelOptions.Pattern;
kind = PushKind.PSubscribe;
break;
case PushSSubscribe.Hash when PushSSubscribe.Is(hash, span):
channelOptions = RedisChannel.RedisChannelOptions.Sharded;
kind = PushKind.SSubscribe;
break;
case PushUnsubscribe.Hash when PushUnsubscribe.Is(hash, span):
kind = PushKind.Unsubscribe;
break;
case PushPUnsubscribe.Hash when PushPUnsubscribe.Is(hash, span):
channelOptions = RedisChannel.RedisChannelOptions.Pattern;
kind = PushKind.PUnsubscribe;
break;
case PushSUnsubscribe.Hash when PushSUnsubscribe.Is(hash, span):
channelOptions = RedisChannel.RedisChannelOptions.Sharded;
kind = PushKind.SUnsubscribe;
break;
default:
kind = PushKind.None;
break;
}
if (kind != PushKind.None)
{
// the channel is always the second element
channel = result[1].AsRedisChannel(ChannelPrefix, channelOptions);
return kind;
}
}
channel = default;
return PushKind.None;

static ReadOnlySpan<byte> CopyTo(Span<byte> target, in ReadOnlySequence<byte> source)
{
source.CopyTo(target);
return target.Slice(0, (int)source.Length);
}
}

[FastHash("message")]
private static partial class PushMessage { }

[FastHash("pmessage")]
private static partial class PushPMessage { }

[FastHash("smessage")]
private static partial class PushSMessage { }

[FastHash("subscribe")]
private static partial class PushSubscribe { }

[FastHash("psubscribe")]
private static partial class PushPSubscribe { }

[FastHash("ssubscribe")]
private static partial class PushSSubscribe { }

[FastHash("unsubscribe")]
private static partial class PushUnsubscribe { }

[FastHash("punsubscribe")]
private static partial class PushPUnsubscribe { }

[FastHash("sunsubscribe")]
private static partial class PushSUnsubscribe { }

private void MatchResult(in RawResult result)
{
// check to see if it could be an out-of-band pubsub message
Expand All @@ -1679,85 +1801,87 @@ private void MatchResult(in RawResult result)

// out of band message does not match to a queued message
var items = result.GetItems();
if (items.Length >= 3 && (items[0].IsEqual(message) || items[0].IsEqual(smessage)))
var kind = GetPushKind(items, out var subscriptionChannel);
switch (kind)
{
_readStatus = items[0].IsEqual(message) ? ReadStatus.PubSubMessage : ReadStatus.PubSubSMessage;
case PushKind.Message:
case PushKind.SMessage:
_readStatus = kind is PushKind.Message ? ReadStatus.PubSubMessage : ReadStatus.PubSubSMessage;

// special-case the configuration change broadcasts (we don't keep that in the usual pub/sub registry)
var configChanged = muxer.ConfigurationChangedChannel;
if (configChanged != null && items[1].IsEqual(configChanged))
{
EndPoint? blame = null;
try
// special-case the configuration change broadcasts (we don't keep that in the usual pub/sub registry)
var configChanged = muxer.ConfigurationChangedChannel;
if (configChanged != null && items[1].IsEqual(configChanged))
{
if (!items[2].IsEqual(CommonReplies.wildcard))
EndPoint? blame = null;
try
{
// We don't want to fail here, just trying to identify
_ = Format.TryParseEndPoint(items[2].GetString(), out blame);
if (!items[2].IsEqual(CommonReplies.wildcard))
{
// We don't want to fail here, just trying to identify
_ = Format.TryParseEndPoint(items[2].GetString(), out blame);
}
}
catch
{
/* no biggie */
}
}
catch { /* no biggie */ }
Trace("Configuration changed: " + Format.ToString(blame));
_readStatus = ReadStatus.Reconfigure;
muxer.ReconfigureIfNeeded(blame, true, "broadcast");
}

// invoke the handlers
RedisChannel channel;
if (items[0].IsEqual(message))
{
channel = items[1].AsRedisChannel(ChannelPrefix, RedisChannel.RedisChannelOptions.None);
Trace("MESSAGE: " + channel);
}
else // see check on outer-if that restricts to message / smessage
{
channel = items[1].AsRedisChannel(ChannelPrefix, RedisChannel.RedisChannelOptions.Sharded);
Trace("SMESSAGE: " + channel);
}
if (!channel.IsNull)
{
if (TryGetPubSubPayload(items[2], out var payload))
{
_readStatus = ReadStatus.InvokePubSub;
muxer.OnMessage(channel, channel, payload);
Trace("Configuration changed: " + Format.ToString(blame));
_readStatus = ReadStatus.Reconfigure;
muxer.ReconfigureIfNeeded(blame, true, "broadcast");
}
// could be multi-message: https://github.com/StackExchange/StackExchange.Redis/issues/2507
else if (TryGetMultiPubSubPayload(items[2], out var payloads))

// invoke the handlers
if (!subscriptionChannel.IsNull)
{
_readStatus = ReadStatus.InvokePubSub;
muxer.OnMessage(channel, channel, payloads);
Trace($"{kind}: {subscriptionChannel}");
if (TryGetPubSubPayload(items[2], out var payload))
{
_readStatus = ReadStatus.InvokePubSub;
muxer.OnMessage(subscriptionChannel, subscriptionChannel, payload);
}
// could be multi-message: https://github.com/StackExchange/StackExchange.Redis/issues/2507
else if (TryGetMultiPubSubPayload(items[2], out var payloads))
{
_readStatus = ReadStatus.InvokePubSub;
muxer.OnMessage(subscriptionChannel, subscriptionChannel, payloads);
}
}
}
return; // AND STOP PROCESSING!
}
else if (items.Length >= 4 && items[0].IsEqual(pmessage))
{
_readStatus = ReadStatus.PubSubPMessage;
return; // and stop processing
case PushKind.PMessage:
_readStatus = ReadStatus.PubSubPMessage;

var channel = items[2].AsRedisChannel(ChannelPrefix, RedisChannel.RedisChannelOptions.Pattern);

Trace("PMESSAGE: " + channel);
if (!channel.IsNull)
{
if (TryGetPubSubPayload(items[3], out var payload))
var messageChannel = items[2].AsRedisChannel(ChannelPrefix, RedisChannel.RedisChannelOptions.None);
if (!messageChannel.IsNull)
{
var sub = items[1].AsRedisChannel(ChannelPrefix, RedisChannel.RedisChannelOptions.Pattern);

_readStatus = ReadStatus.InvokePubSub;
muxer.OnMessage(sub, channel, payload);
Trace($"{kind}: {messageChannel} via {subscriptionChannel}");
if (TryGetPubSubPayload(items[3], out var payload))
{
_readStatus = ReadStatus.InvokePubSub;
muxer.OnMessage(subscriptionChannel, messageChannel, payload);
}
else if (TryGetMultiPubSubPayload(items[3], out var payloads))
{
_readStatus = ReadStatus.InvokePubSub;
muxer.OnMessage(subscriptionChannel, messageChannel, payloads);
}
}
else if (TryGetMultiPubSubPayload(items[3], out var payloads))
return; // and stop processing
case PushKind.SUnsubscribe when !PeekChannelMessage(RedisCommand.SUNSUBSCRIBE, subscriptionChannel):
// then it was *unsolicited* - this probably means the slot was migrated
// (otherwise, we'll let the command-processor deal with it)
_readStatus = ReadStatus.PubSubUnsubscribe;
var server = BridgeCouldBeNull?.ServerEndPoint;
if (server is not null && muxer.TryGetSubscription(subscriptionChannel, out var subscription))
{
var sub = items[1].AsRedisChannel(ChannelPrefix, RedisChannel.RedisChannelOptions.Pattern);

_readStatus = ReadStatus.InvokePubSub;
muxer.OnMessage(sub, channel, payloads);
// wipe and reconnect; but: to where?
// counter-intuitively, the only server we *know* already knows the new route is:
// the outgoing server, since it had to change to MIGRATING etc; the new INCOMING server
// knows, but *we don't know who that is*, and other nodes: aren't guaranteed to know (yet)
muxer.DefaultSubscriber.ResubscribeToServer(subscription, subscriptionChannel, server, cause: PushSUnsubscribe.Text);
}
}
return; // AND STOP PROCESSING!
return; // and STOP PROCESSING; unsolicited
}

// if it didn't look like "[p|s]message", then we still need to process the pending queue
}
Trace("Matching result...");

Expand Down Expand Up @@ -1875,6 +1999,19 @@ static bool TryGetMultiPubSubPayload(in RawResult value, out Sequence<RawResult>
}
}

private bool PeekChannelMessage(RedisCommand command, RedisChannel channel)
{
Message? msg;
bool haveMsg;
lock (_writtenAwaitingResponse)
{
haveMsg = _writtenAwaitingResponse.TryPeek(out msg);
}

return haveMsg && msg is CommandChannelBase typed
&& typed.Command == command && typed.Channel == channel;
}

private volatile Message? _activeMessage;

internal void GetHeadMessages(out Message? now, out Message? next)
Expand Down Expand Up @@ -2168,6 +2305,7 @@ internal enum ReadStatus
MatchResultComplete,
ResetArena,
ProcessBufferComplete,
PubSubUnsubscribe,
NA = -1,
}
private volatile ReadStatus _readStatus;
Expand Down
Loading
Loading