Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion docs/Timeouts.md
Original file line number Diff line number Diff line change
Expand Up @@ -88,7 +88,7 @@ By default Redis Timeout exception(s) includes useful information, which can hel
|qs | Queue-Awaiting-Response : {int}|There are x operations currently awaiting replies from redis server.|
|aw | Active-Writer: {bool}||
|bw | Backlog-Writer: {enum} | Possible values are Inactive, Started, CheckingForWork, CheckingForTimeout, RecordingTimeout, WritingMessage, Flushing, MarkingInactive, RecordingWriteFailure, RecordingFault, SettingIdle, SpinningDown, Faulted|
|rs | Read-State: {enum}|Possible values are NotStarted, Init, RanToCompletion, Faulted, ReadSync, ReadAsync, UpdateWriteTime, ProcessBuffer, MarkProcessed, TryParseResult, MatchResult, PubSubMessage, PubSubPMessage, Reconfigure, InvokePubSub, DequeueResult, ComputeResult, CompletePendingMessage, NA|
|rs | Read-State: {enum}|Possible values are NotStarted, Init, RanToCompletion, Faulted, ReadSync, ReadAsync, UpdateWriteTime, ProcessBuffer, MarkProcessed, TryParseResult, MatchResult, PubSubMessage, PubSubSMessage, PubSubPMessage, Reconfigure, InvokePubSub, DequeueResult, ComputeResult, CompletePendingMessage, NA|
|ws | Write-State: {enum}| Possible values are Initializing, Idle, Writing, Flushing, Flushed, NA|
|in | Inbound-Bytes : {long}|there are x bytes waiting to be read from the input stream from redis|
|in-pipe | Inbound-Pipe-Bytes: {long}|Bytes waiting to be read|
Expand Down
8 changes: 7 additions & 1 deletion src/StackExchange.Redis/ClientInfo.cs
Original file line number Diff line number Diff line change
Expand Up @@ -129,10 +129,15 @@ public sealed class ClientInfo
public string? Name { get; private set; }

/// <summary>
/// Number of pattern matching subscriptions.
/// Number of pattern-matching subscriptions.
/// </summary>
public int PatternSubscriptionCount { get; private set; }

/// <summary>
/// Number of sharded subscriptions.
/// </summary>
public int ShardedSubscriptionCount { get; private set; }

/// <summary>
/// The port of the client.
/// </summary>
Expand Down Expand Up @@ -236,6 +241,7 @@ internal static bool TryParse(string? input, [NotNullWhen(true)] out ClientInfo[
case "name": client.Name = value; break;
case "sub": client.SubscriptionCount = Format.ParseInt32(value); break;
case "psub": client.PatternSubscriptionCount = Format.ParseInt32(value); break;
case "ssub": client.ShardedSubscriptionCount = Format.ParseInt32(value); break;
case "multi": client.TransactionCommandLength = Format.ParseInt32(value); break;
case "cmd": client.LastCommand = value; break;
case "flags":
Expand Down
6 changes: 4 additions & 2 deletions src/StackExchange.Redis/CommandMap.cs
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,7 @@ public sealed class CommandMap

RedisCommand.BLPOP, RedisCommand.BRPOP, RedisCommand.BRPOPLPUSH, // yeah, me neither!

RedisCommand.PSUBSCRIBE, RedisCommand.PUBLISH, RedisCommand.PUNSUBSCRIBE, RedisCommand.SUBSCRIBE, RedisCommand.UNSUBSCRIBE,
RedisCommand.PSUBSCRIBE, RedisCommand.PUBLISH, RedisCommand.PUNSUBSCRIBE, RedisCommand.SUBSCRIBE, RedisCommand.UNSUBSCRIBE, RedisCommand.SPUBLISH, RedisCommand.SSUBSCRIBE, RedisCommand.SUNSUBSCRIBE,

RedisCommand.DISCARD, RedisCommand.EXEC, RedisCommand.MULTI, RedisCommand.UNWATCH, RedisCommand.WATCH,

Expand All @@ -57,7 +57,9 @@ public sealed class CommandMap

RedisCommand.BLPOP, RedisCommand.BRPOP, RedisCommand.BRPOPLPUSH, // yeah, me neither!

RedisCommand.PSUBSCRIBE, RedisCommand.PUNSUBSCRIBE, RedisCommand.SUBSCRIBE, RedisCommand.UNSUBSCRIBE,
RedisCommand.PSUBSCRIBE, RedisCommand.PUBLISH, RedisCommand.PUNSUBSCRIBE, RedisCommand.SUBSCRIBE, RedisCommand.UNSUBSCRIBE, RedisCommand.SPUBLISH, RedisCommand.SSUBSCRIBE, RedisCommand.SUNSUBSCRIBE,

RedisCommand.DISCARD, RedisCommand.EXEC, RedisCommand.MULTI, RedisCommand.UNWATCH, RedisCommand.WATCH,

RedisCommand.SCRIPT,

Expand Down
6 changes: 6 additions & 0 deletions src/StackExchange.Redis/Enums/RedisCommand.cs
Original file line number Diff line number Diff line change
Expand Up @@ -181,13 +181,16 @@ internal enum RedisCommand
SORT,
SORT_RO,
SPOP,
SPUBLISH,
SRANDMEMBER,
SREM,
STRLEN,
SUBSCRIBE,
SUNION,
SUNIONSTORE,
SSCAN,
SSUBSCRIBE,
SUNSUBSCRIBE,
SWAPDB,
SYNC,

Expand Down Expand Up @@ -447,10 +450,13 @@ internal static bool IsPrimaryOnly(this RedisCommand command)
case RedisCommand.SMEMBERS:
case RedisCommand.SMISMEMBER:
case RedisCommand.SORT_RO:
case RedisCommand.SPUBLISH:
case RedisCommand.SRANDMEMBER:
case RedisCommand.SSUBSCRIBE:
case RedisCommand.STRLEN:
case RedisCommand.SUBSCRIBE:
case RedisCommand.SUNION:
case RedisCommand.SUNSUBSCRIBE:
case RedisCommand.SSCAN:
case RedisCommand.SYNC:
case RedisCommand.TIME:
Expand Down
3 changes: 3 additions & 0 deletions src/StackExchange.Redis/Message.cs
Original file line number Diff line number Diff line change
Expand Up @@ -569,6 +569,9 @@ internal static bool RequiresDatabase(RedisCommand command)
case RedisCommand.SLAVEOF:
case RedisCommand.SLOWLOG:
case RedisCommand.SUBSCRIBE:
case RedisCommand.SPUBLISH:
case RedisCommand.SSUBSCRIBE:
case RedisCommand.SUNSUBSCRIBE:
case RedisCommand.SWAPDB:
case RedisCommand.SYNC:
case RedisCommand.TIME:
Expand Down
33 changes: 23 additions & 10 deletions src/StackExchange.Redis/PhysicalConnection.cs
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,7 @@ internal sealed partial class PhysicalConnection : IDisposable

private const int DefaultRedisDatabaseCount = 16;

private static readonly CommandBytes message = "message", pmessage = "pmessage";
private static readonly CommandBytes message = "message", pmessage = "pmessage", smessage = "smessage";

private static readonly Message[] ReusableChangeDatabaseCommands = Enumerable.Range(0, DefaultRedisDatabaseCount).Select(
i => Message.Create(i, CommandFlags.FireAndForget, RedisCommand.SELECT)).ToArray();
Expand Down Expand Up @@ -384,7 +384,7 @@ public void RecordConnectionFailed(
bool isInitialConnect = false,
IDuplexPipe? connectingPipe = null)
{
bool weAskedForThis = false;
bool weAskedForThis;
Exception? outerException = innerException;
IdentifyFailureType(innerException, ref failureType);
var bridge = BridgeCouldBeNull;
Expand Down Expand Up @@ -1644,9 +1644,9 @@ private void MatchResult(in RawResult result)

// out of band message does not match to a queued message
var items = result.GetItems();
if (items.Length >= 3 && items[0].IsEqual(message))
if (items.Length >= 3 && (items[0].IsEqual(message) || items[0].IsEqual(smessage)))
{
_readStatus = ReadStatus.PubSubMessage;
_readStatus = items[0].IsEqual(message) ? ReadStatus.PubSubMessage : ReadStatus.PubSubSMessage;

// special-case the configuration change broadcasts (we don't keep that in the usual pub/sub registry)
var configChanged = muxer.ConfigurationChangedChannel;
Expand All @@ -1668,8 +1668,17 @@ private void MatchResult(in RawResult result)
}

// invoke the handlers
var channel = items[1].AsRedisChannel(ChannelPrefix, RedisChannel.PatternMode.Literal);
Trace("MESSAGE: " + channel);
RedisChannel channel;
if (items[0].IsEqual(message))
{
channel = items[1].AsRedisChannel(ChannelPrefix, RedisChannel.RedisChannelOptions.None);
Trace("MESSAGE: " + channel);
}
else
{
channel = items[1].AsRedisChannel(ChannelPrefix, RedisChannel.RedisChannelOptions.Sharded);
Trace("SMESSAGE: " + channel);
}
if (!channel.IsNull)
{
if (TryGetPubSubPayload(items[2], out var payload))
Expand All @@ -1690,27 +1699,30 @@ private void MatchResult(in RawResult result)
{
_readStatus = ReadStatus.PubSubPMessage;

var channel = items[2].AsRedisChannel(ChannelPrefix, RedisChannel.PatternMode.Literal);
var channel = items[2].AsRedisChannel(ChannelPrefix, RedisChannel.RedisChannelOptions.Pattern);

Trace("PMESSAGE: " + channel);
if (!channel.IsNull)
{
if (TryGetPubSubPayload(items[3], out var payload))
{
var sub = items[1].AsRedisChannel(ChannelPrefix, RedisChannel.PatternMode.Pattern);
var sub = items[1].AsRedisChannel(ChannelPrefix, RedisChannel.RedisChannelOptions.Pattern);

_readStatus = ReadStatus.InvokePubSub;
muxer.OnMessage(sub, channel, payload);
}
else if (TryGetMultiPubSubPayload(items[3], out var payloads))
{
var sub = items[1].AsRedisChannel(ChannelPrefix, RedisChannel.PatternMode.Pattern);
var sub = items[1].AsRedisChannel(ChannelPrefix, RedisChannel.RedisChannelOptions.Pattern);

_readStatus = ReadStatus.InvokePubSub;
muxer.OnMessage(sub, channel, payloads);
}
}
return; // AND STOP PROCESSING!
}

// if it didn't look like "[p]message", then we still need to process the pending queue
// if it didn't look like "[p|s]message", then we still need to process the pending queue
}
Trace("Matching result...");

Expand Down Expand Up @@ -2110,6 +2122,7 @@ internal enum ReadStatus
MatchResult,
PubSubMessage,
PubSubPMessage,
PubSubSMessage,
Reconfigure,
InvokePubSub,
ResponseSequenceCheck, // high-integrity mode only
Expand Down
5 changes: 5 additions & 0 deletions src/StackExchange.Redis/PublicAPI/PublicAPI.Shipped.txt
Original file line number Diff line number Diff line change
Expand Up @@ -1309,6 +1309,7 @@ StackExchange.Redis.RedisChannel
StackExchange.Redis.RedisChannel.Equals(StackExchange.Redis.RedisChannel other) -> bool
StackExchange.Redis.RedisChannel.IsNullOrEmpty.get -> bool
StackExchange.Redis.RedisChannel.IsPattern.get -> bool
StackExchange.Redis.RedisChannel.IsSharded.get -> bool
StackExchange.Redis.RedisChannel.PatternMode
StackExchange.Redis.RedisChannel.PatternMode.Auto = 0 -> StackExchange.Redis.RedisChannel.PatternMode
StackExchange.Redis.RedisChannel.PatternMode.Literal = 1 -> StackExchange.Redis.RedisChannel.PatternMode
Expand Down Expand Up @@ -1893,4 +1894,8 @@ virtual StackExchange.Redis.RedisResult.Length.get -> int
virtual StackExchange.Redis.RedisResult.this[int index].get -> StackExchange.Redis.RedisResult!
StackExchange.Redis.ConnectionMultiplexer.AddLibraryNameSuffix(string! suffix) -> void
StackExchange.Redis.IConnectionMultiplexer.AddLibraryNameSuffix(string! suffix) -> void
StackExchange.Redis.RedisFeatures.ShardedPubSub.get -> bool
static StackExchange.Redis.RedisChannel.Sharded(byte[]? value) -> StackExchange.Redis.RedisChannel
static StackExchange.Redis.RedisChannel.Sharded(string! value) -> StackExchange.Redis.RedisChannel
StackExchange.Redis.ClientInfo.ShardedSubscriptionCount.get -> int
StackExchange.Redis.ConfigurationOptions.SetUserPfxCertificate(string! userCertificatePath, string? password = null) -> void
7 changes: 4 additions & 3 deletions src/StackExchange.Redis/RawResult.cs
Original file line number Diff line number Diff line change
Expand Up @@ -161,20 +161,21 @@ public bool MoveNext()
}
public ReadOnlySequence<byte> Current { get; private set; }
}
internal RedisChannel AsRedisChannel(byte[]? channelPrefix, RedisChannel.PatternMode mode)
internal RedisChannel AsRedisChannel(byte[]? channelPrefix, RedisChannel.RedisChannelOptions options)
{
switch (Resp2TypeBulkString)
{
case ResultType.SimpleString:
case ResultType.BulkString:
if (channelPrefix == null)
{
return new RedisChannel(GetBlob(), mode);
return new RedisChannel(GetBlob(), options);
}
if (StartsWith(channelPrefix))
{
byte[] copy = Payload.Slice(channelPrefix.Length).ToArray();
return new RedisChannel(copy, mode);

return new RedisChannel(copy, options);
}
return default;
default:
Expand Down
52 changes: 42 additions & 10 deletions src/StackExchange.Redis/RedisChannel.cs
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,18 @@ namespace StackExchange.Redis
public readonly struct RedisChannel : IEquatable<RedisChannel>
{
internal readonly byte[]? Value;
internal readonly bool _isPatternBased;

internal readonly RedisChannelOptions Options;

[Flags]
internal enum RedisChannelOptions
{
None = 0,
Pattern = 1 << 0,
Sharded = 1 << 1,
}

internal RedisCommand PublishCommand => IsSharded ? RedisCommand.SPUBLISH : RedisCommand.PUBLISH;

/// <summary>
/// Indicates whether the channel-name is either null or a zero-length value.
Expand All @@ -19,7 +30,12 @@ namespace StackExchange.Redis
/// <summary>
/// Indicates whether this channel represents a wildcard pattern (see <c>PSUBSCRIBE</c>).
/// </summary>
public bool IsPattern => _isPatternBased;
public bool IsPattern => (Options & RedisChannelOptions.Pattern) != 0;

/// <summary>
/// Indicates whether this channel represents a shard channel (see <c>SSUBSCRIBE</c>).
/// </summary>
public bool IsSharded => (Options & RedisChannelOptions.Sharded) != 0;

internal bool IsNull => Value == null;

Expand Down Expand Up @@ -59,19 +75,35 @@ public static bool UseImplicitAutoPattern
/// </summary>
/// <param name="value">The name of the channel to create.</param>
/// <param name="mode">The mode for name matching.</param>
public RedisChannel(byte[]? value, PatternMode mode) : this(value, DeterminePatternBased(value, mode)) { }
public RedisChannel(byte[]? value, PatternMode mode) : this(value, DeterminePatternBased(value, mode) ? RedisChannelOptions.Pattern : RedisChannelOptions.None)
{
}

/// <summary>
/// Create a new redis channel from a string, explicitly controlling the pattern mode.
/// </summary>
/// <param name="value">The string name of the channel to create.</param>
/// <param name="mode">The mode for name matching.</param>
public RedisChannel(string value, PatternMode mode) : this(value == null ? null : Encoding.UTF8.GetBytes(value), mode) { }
public RedisChannel(string value, PatternMode mode) : this(value is null ? null : Encoding.UTF8.GetBytes(value), mode)
{
}

/// <summary>
/// Create a new redis channel from a buffer, representing a sharded channel.
/// </summary>
/// <param name="value">The name of the channel to create.</param>
public static RedisChannel Sharded(byte[]? value) => new(value, RedisChannelOptions.Sharded);

/// <summary>
/// Create a new redis channel from a string, representing a sharded channel.
/// </summary>
/// <param name="value">The string name of the channel to create.</param>
public static RedisChannel Sharded(string value) => new(value is null ? null : Encoding.UTF8.GetBytes(value), RedisChannelOptions.Sharded);

private RedisChannel(byte[]? value, bool isPatternBased)
internal RedisChannel(byte[]? value, RedisChannelOptions options)
{
Value = value;
_isPatternBased = isPatternBased;
Options = options;
}

private static bool DeterminePatternBased(byte[]? value, PatternMode mode) => mode switch
Expand Down Expand Up @@ -123,7 +155,7 @@ private RedisChannel(byte[]? value, bool isPatternBased)
/// <param name="x">The first <see cref="RedisChannel"/> to compare.</param>
/// <param name="y">The second <see cref="RedisChannel"/> to compare.</param>
public static bool operator ==(RedisChannel x, RedisChannel y) =>
x._isPatternBased == y._isPatternBased && RedisValue.Equals(x.Value, y.Value);
x.Options == y.Options && RedisValue.Equals(x.Value, y.Value);

/// <summary>
/// Indicate whether two channel names are equal.
Expand Down Expand Up @@ -171,10 +203,10 @@ private RedisChannel(byte[]? value, bool isPatternBased)
/// Indicate whether two channel names are equal.
/// </summary>
/// <param name="other">The <see cref="RedisChannel"/> to compare to.</param>
public bool Equals(RedisChannel other) => _isPatternBased == other._isPatternBased && RedisValue.Equals(Value, other.Value);
public bool Equals(RedisChannel other) => Options == other.Options && RedisValue.Equals(Value, other.Value);

/// <inheritdoc/>
public override int GetHashCode() => RedisValue.GetHashCode(Value) + (_isPatternBased ? 1 : 0);
public override int GetHashCode() => RedisValue.GetHashCode(Value) ^ (int)Options;

/// <summary>
/// Obtains a string representation of the channel name.
Expand Down Expand Up @@ -203,7 +235,7 @@ internal RedisChannel Clone()
return this;
}
var copy = (byte[])Value.Clone(); // defensive array copy
return new RedisChannel(copy, _isPatternBased);
return new RedisChannel(copy, Options);
}

/// <summary>
Expand Down
4 changes: 2 additions & 2 deletions src/StackExchange.Redis/RedisDatabase.cs
Original file line number Diff line number Diff line change
Expand Up @@ -1575,14 +1575,14 @@ public Task<LCSMatchResult> StringLongestCommonSubsequenceWithMatchesAsync(Redis
public long Publish(RedisChannel channel, RedisValue message, CommandFlags flags = CommandFlags.None)
{
if (channel.IsNullOrEmpty) throw new ArgumentNullException(nameof(channel));
var msg = Message.Create(-1, flags, RedisCommand.PUBLISH, channel, message);
var msg = Message.Create(-1, flags, channel.PublishCommand, channel, message);
return ExecuteSync(msg, ResultProcessor.Int64);
}

public Task<long> PublishAsync(RedisChannel channel, RedisValue message, CommandFlags flags = CommandFlags.None)
{
if (channel.IsNullOrEmpty) throw new ArgumentNullException(nameof(channel));
var msg = Message.Create(-1, flags, RedisCommand.PUBLISH, channel, message);
var msg = Message.Create(-1, flags, channel.PublishCommand, channel, message);
return ExecuteAsync(msg, ResultProcessor.Int64);
}

Expand Down
5 changes: 5 additions & 0 deletions src/StackExchange.Redis/RedisFeatures.cs
Original file line number Diff line number Diff line change
Expand Up @@ -186,6 +186,11 @@ public RedisFeatures(Version version)
/// </summary>
public bool SetVaradicAddRemove => Version.IsAtLeast(v2_4_0);

/// <summary>
/// Are <see href="https://redis.io/commands/ssubscribe/">SSUBSCRIBE</see> and <see href="https://redis.io/commands/spublish/">SPUBLISH</see> available?
/// </summary>
public bool ShardedPubSub => Version.IsAtLeast(v7_0_0_rc1);

/// <summary>
/// Are <see href="https://redis.io/commands/zpopmin/">ZPOPMIN</see> and <see href="https://redis.io/commands/zpopmax/">ZPOPMAX</see> available?
/// </summary>
Expand Down
Loading
Loading