Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions docs/ReleaseNotes.md
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@ Current package versions:
## Unreleased

- Fix [#2951](https://github.com/StackExchange/StackExchange.Redis/issues/2951) - sentinel reconnection failure ([#2956 by mgravell](https://github.com/StackExchange/StackExchange.Redis/pull/2956))
- Mitigate [#2955](https://github.com/StackExchange/StackExchange.Redis/issues/2955) (unbalanced pub/sub routing) ([#2958 by mgravell](https://github.com/StackExchange/StackExchange.Redis/pull/2958))

## 2.9.17

Expand Down
3 changes: 2 additions & 1 deletion src/StackExchange.Redis/Message.cs
Original file line number Diff line number Diff line change
Expand Up @@ -866,7 +866,8 @@ protected CommandChannelBase(int db, CommandFlags flags, RedisCommand command, i

public override string CommandAndKey => Command + " " + Channel;

public override int GetHashSlot(ServerSelectionStrategy serverSelectionStrategy) => serverSelectionStrategy.HashSlot(Channel);
public override int GetHashSlot(ServerSelectionStrategy serverSelectionStrategy)
=> Channel.IsKeyRouted ? serverSelectionStrategy.HashSlot(Channel) : ServerSelectionStrategy.NoSlot;
}

internal abstract class CommandKeyBase : Message
Expand Down
1 change: 1 addition & 0 deletions src/StackExchange.Redis/PublicAPI/PublicAPI.Shipped.txt
Original file line number Diff line number Diff line change
Expand Up @@ -2051,3 +2051,4 @@ StackExchange.Redis.IServer.ExecuteAsync(int? database, string! command, System.
[SER001]static StackExchange.Redis.VectorSetAddRequest.Member(StackExchange.Redis.RedisValue element, System.ReadOnlyMemory<float> values, string? attributesJson = null) -> StackExchange.Redis.VectorSetAddRequest!
[SER001]static StackExchange.Redis.VectorSetSimilaritySearchRequest.ByMember(StackExchange.Redis.RedisValue member) -> StackExchange.Redis.VectorSetSimilaritySearchRequest!
[SER001]static StackExchange.Redis.VectorSetSimilaritySearchRequest.ByVector(System.ReadOnlyMemory<float> vector) -> StackExchange.Redis.VectorSetSimilaritySearchRequest!
StackExchange.Redis.RedisChannel.WithKeyRouting() -> StackExchange.Redis.RedisChannel
107 changes: 79 additions & 28 deletions src/StackExchange.Redis/RedisChannel.cs
Original file line number Diff line number Diff line change
Expand Up @@ -18,10 +18,20 @@ internal enum RedisChannelOptions
None = 0,
Pattern = 1 << 0,
Sharded = 1 << 1,
KeyRouted = 1 << 2,
}

// we don't consider Routed for equality - it's an implementation detail, not a fundamental feature
private const RedisChannelOptions EqualityMask = ~RedisChannelOptions.KeyRouted;

internal RedisCommand PublishCommand => IsSharded ? RedisCommand.SPUBLISH : RedisCommand.PUBLISH;

/// <summary>
/// Should we use cluster routing for this channel? This applies *either* to sharded (SPUBLISH) scenarios,
/// or to scenarios using <see cref="RedisChannel.WithKeyRouting" />.
/// </summary>
internal bool IsKeyRouted => (Options & RedisChannelOptions.KeyRouted) != 0;

/// <summary>
/// Indicates whether the channel-name is either null or a zero-length value.
/// </summary>
Expand Down Expand Up @@ -51,24 +61,44 @@ public static bool UseImplicitAutoPattern
private static PatternMode s_DefaultPatternMode = PatternMode.Auto;

/// <summary>
/// Creates a new <see cref="RedisChannel"/> that does not act as a wildcard subscription.
/// Creates a new <see cref="RedisChannel"/> that does not act as a wildcard subscription. In cluster
/// environments, this channel will be freely routed to any applicable server - different client nodes
/// will generally connect to different servers; this is suitable for distributing pub/sub in scenarios with
/// very few channels. In non-cluster environments, routing is not a consideration.
/// </summary>
public static RedisChannel Literal(string value) => new(value, RedisChannelOptions.None);

/// <summary>
/// Creates a new <see cref="RedisChannel"/> that does not act as a wildcard subscription. In cluster
/// environments, this channel will be freely routed to any applicable server - different client nodes
/// will generally connect to different servers; this is suitable for distributing pub/sub in scenarios with
/// very few channels. In non-cluster environments, routing is not a consideration.
/// </summary>
public static RedisChannel Literal(string value) => new RedisChannel(value, PatternMode.Literal);
public static RedisChannel Literal(byte[] value) => new(value, RedisChannelOptions.None);

/// <summary>
/// Creates a new <see cref="RedisChannel"/> that does not act as a wildcard subscription.
/// In cluster environments, this channel will be routed using similar rules to <see cref="RedisKey"/>, which is suitable
/// for distributing pub/sub in scenarios with lots of channels. In non-cluster environments, routing is not
/// a consideration.
/// </summary>
public static RedisChannel Literal(byte[] value) => new RedisChannel(value, PatternMode.Literal);
/// <remarks>Note that channels from <c>Sharded</c> are always routed.</remarks>
public RedisChannel WithKeyRouting() => new(Value, Options | RedisChannelOptions.KeyRouted);

/// <summary>
/// Creates a new <see cref="RedisChannel"/> that acts as a wildcard subscription.
/// Creates a new <see cref="RedisChannel"/> that acts as a wildcard subscription. In cluster
/// environments, this channel will be freely routed to any applicable server - different client nodes
/// will generally connect to different servers; this is suitable for distributing pub/sub in scenarios with
/// very few channels. In non-cluster environments, routing is not a consideration.
/// </summary>
public static RedisChannel Pattern(string value) => new RedisChannel(value, PatternMode.Pattern);
public static RedisChannel Pattern(string value) => new(value, RedisChannelOptions.Pattern);

/// <summary>
/// Creates a new <see cref="RedisChannel"/> that acts as a wildcard subscription.
/// Creates a new <see cref="RedisChannel"/> that acts as a wildcard subscription. In cluster
/// environments, this channel will be freely routed to any applicable server - different client nodes
/// will generally connect to different servers; this is suitable for distributing pub/sub in scenarios with
/// very few channels. In non-cluster environments, routing is not a consideration.
/// </summary>
public static RedisChannel Pattern(byte[] value) => new RedisChannel(value, PatternMode.Pattern);
public static RedisChannel Pattern(byte[] value) => new(value, RedisChannelOptions.Pattern);

/// <summary>
/// Create a new redis channel from a buffer, explicitly controlling the pattern mode.
Expand All @@ -84,28 +114,45 @@ public RedisChannel(byte[]? value, PatternMode mode) : this(value, DeterminePatt
/// </summary>
/// <param name="value">The string name of the channel to create.</param>
/// <param name="mode">The mode for name matching.</param>
// ReSharper disable once ConditionIsAlwaysTrueOrFalseAccordingToNullableAPIContract
public RedisChannel(string value, PatternMode mode) : this(value is null ? null : Encoding.UTF8.GetBytes(value), mode)
{
}

/// <summary>
/// Create a new redis channel from a buffer, representing a sharded channel.
/// Create a new redis channel from a buffer, representing a sharded channel. In cluster
/// environments, this channel will be routed using similar rules to <see cref="RedisKey"/>, which is suitable
/// for distributing pub/sub in scenarios with lots of channels. In non-cluster environments, routing is not
/// a consideration.
/// </summary>
/// <param name="value">The name of the channel to create.</param>
public static RedisChannel Sharded(byte[]? value) => new(value, RedisChannelOptions.Sharded);
/// <remarks>Note that sharded subscriptions are completely separate to regular subscriptions; subscriptions
/// using sharded channels must also be published with sharded channels (and vice versa).</remarks>
public static RedisChannel Sharded(byte[]? value) => new(value, RedisChannelOptions.Sharded | RedisChannelOptions.KeyRouted);

/// <summary>
/// Create a new redis channel from a string, representing a sharded channel.
/// Create a new redis channel from a string, representing a sharded channel. In cluster
/// environments, this channel will be routed using similar rules to <see cref="RedisKey"/>, which is suitable
/// for distributing pub/sub in scenarios with lots of channels. In non-cluster environments, routing is not
/// a consideration.
/// </summary>
/// <param name="value">The string name of the channel to create.</param>
public static RedisChannel Sharded(string value) => new(value is null ? null : Encoding.UTF8.GetBytes(value), RedisChannelOptions.Sharded);
/// <remarks>Note that sharded subscriptions are completely separate to regular subscriptions; subscriptions
/// using sharded channels must also be published with sharded channels (and vice versa).</remarks>
public static RedisChannel Sharded(string value) => new(value, RedisChannelOptions.Sharded | RedisChannelOptions.KeyRouted);

internal RedisChannel(byte[]? value, RedisChannelOptions options)
{
Value = value;
Options = options;
}

internal RedisChannel(string? value, RedisChannelOptions options)
{
Value = value is null ? null : Encoding.UTF8.GetBytes(value);
Options = options;
}

private static bool DeterminePatternBased(byte[]? value, PatternMode mode) => mode switch
{
PatternMode.Auto => value != null && Array.IndexOf(value, (byte)'*') >= 0,
Expand Down Expand Up @@ -155,15 +202,17 @@ internal RedisChannel(byte[]? value, RedisChannelOptions options)
/// <param name="x">The first <see cref="RedisChannel"/> to compare.</param>
/// <param name="y">The second <see cref="RedisChannel"/> to compare.</param>
public static bool operator ==(RedisChannel x, RedisChannel y) =>
x.Options == y.Options && RedisValue.Equals(x.Value, y.Value);
(x.Options & EqualityMask) == (y.Options & EqualityMask)
&& RedisValue.Equals(x.Value, y.Value);

/// <summary>
/// Indicate whether two channel names are equal.
/// </summary>
/// <param name="x">The first <see cref="RedisChannel"/> to compare.</param>
/// <param name="y">The second <see cref="RedisChannel"/> to compare.</param>
public static bool operator ==(string x, RedisChannel y) =>
RedisValue.Equals(x == null ? null : Encoding.UTF8.GetBytes(x), y.Value);
// ReSharper disable once ConditionIsAlwaysTrueOrFalseAccordingToNullableAPIContract
RedisValue.Equals(x is null ? null : Encoding.UTF8.GetBytes(x), y.Value);

/// <summary>
/// Indicate whether two channel names are equal.
Expand All @@ -178,7 +227,8 @@ internal RedisChannel(byte[]? value, RedisChannelOptions options)
/// <param name="x">The first <see cref="RedisChannel"/> to compare.</param>
/// <param name="y">The second <see cref="RedisChannel"/> to compare.</param>
public static bool operator ==(RedisChannel x, string y) =>
RedisValue.Equals(x.Value, y == null ? null : Encoding.UTF8.GetBytes(y));
// ReSharper disable once ConditionIsAlwaysTrueOrFalseAccordingToNullableAPIContract
RedisValue.Equals(x.Value, y is null ? null : Encoding.UTF8.GetBytes(y));

/// <summary>
/// Indicate whether two channel names are equal.
Expand All @@ -203,10 +253,11 @@ internal RedisChannel(byte[]? value, RedisChannelOptions options)
/// Indicate whether two channel names are equal.
/// </summary>
/// <param name="other">The <see cref="RedisChannel"/> to compare to.</param>
public bool Equals(RedisChannel other) => Options == other.Options && RedisValue.Equals(Value, other.Value);
public bool Equals(RedisChannel other) => (Options & EqualityMask) == (other.Options & EqualityMask)
&& RedisValue.Equals(Value, other.Value);

/// <inheritdoc/>
public override int GetHashCode() => RedisValue.GetHashCode(Value) ^ (int)Options;
public override int GetHashCode() => RedisValue.GetHashCode(Value) ^ (int)(Options & EqualityMask);

/// <summary>
/// Obtains a string representation of the channel name.
Expand Down Expand Up @@ -266,23 +317,21 @@ public enum PatternMode
[Obsolete("It is preferable to explicitly specify a " + nameof(PatternMode) + ", or use the " + nameof(Literal) + "/" + nameof(Pattern) + " methods", error: false)]
public static implicit operator RedisChannel(string key)
{
if (key == null) return default;
// ReSharper disable once ConditionIsAlwaysTrueOrFalseAccordingToNullableAPIContract
if (key is null) return default;
return new RedisChannel(Encoding.UTF8.GetBytes(key), s_DefaultPatternMode);
}

/// <summary>
/// Create a channel name from a <see cref="T:byte[]"/>.
/// Create a channel name from a <c>byte[]</c>.
/// </summary>
/// <param name="key">The byte array to get a channel from.</param>
[Obsolete("It is preferable to explicitly specify a " + nameof(PatternMode) + ", or use the " + nameof(Literal) + "/" + nameof(Pattern) + " methods", error: false)]
public static implicit operator RedisChannel(byte[]? key)
{
if (key == null) return default;
return new RedisChannel(key, s_DefaultPatternMode);
}
=> key is null ? default : new RedisChannel(key, s_DefaultPatternMode);

/// <summary>
/// Obtain the channel name as a <see cref="T:byte[]"/>.
/// Obtain the channel name as a <c>byte[]</c>.
/// </summary>
/// <param name="key">The channel to get a byte[] from.</param>
public static implicit operator byte[]?(RedisChannel key) => key.Value;
Expand All @@ -294,7 +343,7 @@ public static implicit operator RedisChannel(byte[]? key)
public static implicit operator string?(RedisChannel key)
{
var arr = key.Value;
if (arr == null)
if (arr is null)
{
return null;
}
Expand All @@ -303,9 +352,7 @@ public static implicit operator RedisChannel(byte[]? key)
return Encoding.UTF8.GetString(arr);
}
catch (Exception e) when // Only catch exception throwed by Encoding.UTF8.GetString
(e is DecoderFallbackException
|| e is ArgumentException
|| e is ArgumentNullException)
(e is DecoderFallbackException or ArgumentException or ArgumentNullException)
{
return BitConverter.ToString(arr);
}
Expand All @@ -316,8 +363,12 @@ public static implicit operator RedisChannel(byte[]? key)
// giving due consideration to the default pattern mode (UseImplicitAutoPattern)
// (since we don't ship them, we don't need them in release)
[Obsolete("Watch for " + nameof(UseImplicitAutoPattern), error: true)]
// ReSharper disable once UnusedMember.Local
// ReSharper disable once UnusedParameter.Local
private RedisChannel(string value) => throw new NotSupportedException();
[Obsolete("Watch for " + nameof(UseImplicitAutoPattern), error: true)]
// ReSharper disable once UnusedMember.Local
// ReSharper disable once UnusedParameter.Local
private RedisChannel(byte[]? value) => throw new NotSupportedException();
#endif
}
Expand Down
6 changes: 4 additions & 2 deletions src/StackExchange.Redis/RedisDatabase.cs
Original file line number Diff line number Diff line change
Expand Up @@ -1871,14 +1871,16 @@ public long Publish(RedisChannel channel, RedisValue message, CommandFlags flags
{
if (channel.IsNullOrEmpty) throw new ArgumentNullException(nameof(channel));
var msg = Message.Create(-1, flags, channel.PublishCommand, channel, message);
return ExecuteSync(msg, ResultProcessor.Int64);
// if we're actively subscribed: send via that connection (otherwise, follow normal rules)
return ExecuteSync(msg, ResultProcessor.Int64, server: multiplexer.GetSubscribedServer(channel));
}

public Task<long> PublishAsync(RedisChannel channel, RedisValue message, CommandFlags flags = CommandFlags.None)
{
if (channel.IsNullOrEmpty) throw new ArgumentNullException(nameof(channel));
var msg = Message.Create(-1, flags, channel.PublishCommand, channel, message);
return ExecuteAsync(msg, ResultProcessor.Int64);
// if we're actively subscribed: send via that connection (otherwise, follow normal rules)
return ExecuteAsync(msg, ResultProcessor.Int64, server: multiplexer.GetSubscribedServer(channel));
}

public RedisResult Execute(string command, params object[] args)
Expand Down
14 changes: 7 additions & 7 deletions src/StackExchange.Redis/RedisSubscriber.cs
Original file line number Diff line number Diff line change
Expand Up @@ -182,18 +182,16 @@ public Subscription(CommandFlags flags)
/// </summary>
internal Message GetMessage(RedisChannel channel, SubscriptionAction action, CommandFlags flags, bool internalCall)
{
var isPattern = channel.IsPattern;
var isSharded = channel.IsSharded;
var command = action switch
var command = action switch // note that the Routed flag doesn't impact the message here - just the routing
{
SubscriptionAction.Subscribe => channel.Options switch
SubscriptionAction.Subscribe => (channel.Options & ~RedisChannel.RedisChannelOptions.KeyRouted) switch
{
RedisChannel.RedisChannelOptions.None => RedisCommand.SUBSCRIBE,
RedisChannel.RedisChannelOptions.Pattern => RedisCommand.PSUBSCRIBE,
RedisChannel.RedisChannelOptions.Sharded => RedisCommand.SSUBSCRIBE,
_ => Unknown(action, channel.Options),
},
SubscriptionAction.Unsubscribe => channel.Options switch
SubscriptionAction.Unsubscribe => (channel.Options & ~RedisChannel.RedisChannelOptions.KeyRouted) switch
{
RedisChannel.RedisChannelOptions.None => RedisCommand.UNSUBSCRIBE,
RedisChannel.RedisChannelOptions.Pattern => RedisCommand.PUNSUBSCRIBE,
Expand Down Expand Up @@ -384,14 +382,16 @@ public long Publish(RedisChannel channel, RedisValue message, CommandFlags flags
{
ThrowIfNull(channel);
var msg = Message.Create(-1, flags, channel.PublishCommand, channel, message);
return ExecuteSync(msg, ResultProcessor.Int64);
// if we're actively subscribed: send via that connection (otherwise, follow normal rules)
return ExecuteSync(msg, ResultProcessor.Int64, server: multiplexer.GetSubscribedServer(channel));
}

public Task<long> PublishAsync(RedisChannel channel, RedisValue message, CommandFlags flags = CommandFlags.None)
{
ThrowIfNull(channel);
var msg = Message.Create(-1, flags, channel.PublishCommand, channel, message);
return ExecuteAsync(msg, ResultProcessor.Int64);
// if we're actively subscribed: send via that connection (otherwise, follow normal rules)
return ExecuteAsync(msg, ResultProcessor.Int64, server: multiplexer.GetSubscribedServer(channel));
}

void ISubscriber.Subscribe(RedisChannel channel, Action<RedisChannel, RedisValue> handler, CommandFlags flags)
Expand Down
Loading
Loading