Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 0 additions & 5 deletions libs/host/Configuration/Options.cs
Original file line number Diff line number Diff line change
Expand Up @@ -606,10 +606,6 @@ internal sealed class Options : ICloneable
[Option("fail-on-recovery-error", Required = false, HelpText = "Server bootup should fail if errors happen during bootup of AOF and checkpointing")]
public bool? FailOnRecoveryError { get; set; }

[OptionValidation]
[Option("skip-rdb-restore-checksum-validation", Required = false, HelpText = "Skip RDB restore checksum validation")]
public bool? SkipRDBRestoreChecksumValidation { get; set; }

[OptionValidation]
[Option("lua-memory-management-mode", Required = false, HelpText = "Memory management mode for Lua scripts, must be set to Tracked or Managed to impose script limits")]
public LuaMemoryManagementMode LuaMemoryManagementMode { get; set; }
Expand Down Expand Up @@ -956,7 +952,6 @@ public GarnetServerOptions GetServerOptions(ILogger logger = null)
IndexResizeThreshold = IndexResizeThreshold,
LoadModuleCS = LoadModuleCS,
FailOnRecoveryError = FailOnRecoveryError.GetValueOrDefault(),
SkipRDBRestoreChecksumValidation = SkipRDBRestoreChecksumValidation.GetValueOrDefault(),
LuaOptions = EnableLua.GetValueOrDefault() ? new LuaOptions(LuaMemoryManagementMode, LuaScriptMemoryLimit, LuaScriptTimeoutMs == 0 ? Timeout.InfiniteTimeSpan : TimeSpan.FromMilliseconds(LuaScriptTimeoutMs), LuaLoggingMode, LuaAllowedFunctions, logger) : null,
UnixSocketPath = UnixSocketPath,
UnixSocketPermission = unixSocketPermissions,
Expand Down
3 changes: 0 additions & 3 deletions libs/host/defaults.conf
Original file line number Diff line number Diff line change
Expand Up @@ -419,9 +419,6 @@
/* Fails if encounters error during AOF replay or checkpointing */
"FailOnRecoveryError": false,

/* Skips crc64 validation in restore command */
"SkipRDBRestoreChecksumValidation": false,

/* Lua uses the default, unmanaged and untracked, allocator */
"LuaMemoryManagementMode": "Native",

Expand Down
71 changes: 58 additions & 13 deletions libs/server/Resp/HyperLogLog/HyperLogLog.cs
Original file line number Diff line number Diff line change
Expand Up @@ -88,16 +88,6 @@ public unsafe class HyperLogLog
/// </summary>
public readonly int SparseBytes;

/// <summary>
/// Return bits used for indexing
/// </summary>
public byte PBit => pbit;

/// <summary>
/// Return bits used for clz
/// </summary>
public byte QBit => qbit;

/// <summary>
/// Default hyperloglog instance
/// </summary>
Expand All @@ -113,7 +103,7 @@ public unsafe class HyperLogLog
/// Custom Garnet HyperLogLog Constructor
/// </summary>
/// <param name="pbit"></param>
public HyperLogLog(byte pbit)
private HyperLogLog(byte pbit)
{
this.pbit = pbit;
this.qbit = (byte)(hbit - pbit);
Expand Down Expand Up @@ -217,8 +207,63 @@ private void _set_register(byte* reg, ushort idx, byte val)

private bool IsValidHLLLength(byte* ptr, int length)
{
return (IsSparse(ptr) && SparseInitialLength(1) <= length || length <= SparseSizeMaxCap) ||
(IsDense(ptr) && length == this.DenseBytes);
// Dense HLL must match exact byte count (12,304 bytes)
if (IsDense(ptr))
return length == this.DenseBytes;

// Must be valid HLL type (sparse or dense)
if (!IsSparse(ptr))
return false;

// Sparse length must be within [min initial size, 4096 byte cap]
if (length < SparseInitialLength(1) || length > SparseSizeMaxCap)
return false;

// RLE stream must fit within available payload and be structurally valid
var sparsePayloadBytes = length - SparseHeaderSize;
return GetSparseRLESize(ptr) <= sparsePayloadBytes && IsValidSparseStream(ptr);
}

/// <summary>
/// Validates sparse opcode stream semantics.
/// Ensures each opcode is well-formed, register values are in range,
/// and total sparse coverage matches the expected register count.
/// </summary>
/// <param name="ptr">Pointer to HLL value bytes.</param>
/// <returns>True if sparse stream is structurally valid; otherwise false.</returns>
private bool IsValidSparseStream(byte* ptr)
{
var rleSize = GetSparseRLESize(ptr);
var curr = ptr + SparseHeaderSize;
var end = curr + rleSize;

var coveredRegisters = 0;

while (curr != end)
{
if (IsZeroRange(curr))
{
coveredRegisters += ZeroRangeLen(curr);
}
else
{
var nonZero = GetNonZero(curr);
// Non-zero opcode encodes leading-zero count in [1, qbit + 1].
if (nonZero == 0 || nonZero > (qbit + 1))
return false;

coveredRegisters += 1;
}

// Reject streams that advance beyond logical register space.
if (coveredRegisters > mcnt)
return false;

curr++;
}

// Sparse stream must cover all registers exactly once.
return coveredRegisters == mcnt;
}

/// <summary>
Expand Down
23 changes: 10 additions & 13 deletions libs/server/Resp/KeyAdminCommands.cs
Original file line number Diff line number Diff line change
Expand Up @@ -69,21 +69,18 @@ bool NetworkRESTORE<TGarnetApi>(ref TGarnetApi storageApi)
return true;
}

if (storeWrapper.serverOptions.SkipRDBRestoreChecksumValidation)
{
// crc is calculated over the encoded payload length, payload and the rdb version bytes
// skip's the value type byte and crc64 bytes
var calculatedCrc = new ReadOnlySpan<byte>(Crc64.Hash(valueSpan.Slice(0, valueSpan.Length - 8)));
// crc is calculated over the encoded payload length, payload and the rdb version bytes
// skip's the value type byte and crc64 bytes
var calculatedCrc = new ReadOnlySpan<byte>(Crc64.Hash(valueSpan.Slice(0, valueSpan.Length - 8)));

// skip's rdb version bytes
var payloadCrc = footer[2..];
// skip's rdb version bytes
var payloadCrc = footer[2..];

if (calculatedCrc.SequenceCompareTo(payloadCrc) != 0)
{
while (!RespWriteUtils.TryWriteError("ERR DUMP payload version or checksum are wrong", ref dcurr, dend))
SendAndReset();
return true;
}
if (calculatedCrc.SequenceCompareTo(payloadCrc) != 0)
{
while (!RespWriteUtils.TryWriteError("ERR DUMP payload version or checksum are wrong", ref dcurr, dend))
SendAndReset();
return true;
}

// decode the length of payload
Expand Down
5 changes: 0 additions & 5 deletions libs/server/Servers/ServerOptions.cs
Original file line number Diff line number Diff line change
Expand Up @@ -110,11 +110,6 @@ public class ServerOptions
/// </summary>
public bool FailOnRecoveryError = false;

/// <summary>
/// Skip RDB restore checksum validation
/// </summary>
public bool SkipRDBRestoreChecksumValidation = false;

/// <summary>
/// Logger
/// </summary>
Expand Down
5 changes: 3 additions & 2 deletions libs/server/Storage/Session/MainStore/HyperLogLogOps.cs
Original file line number Diff line number Diff line change
Expand Up @@ -122,8 +122,8 @@ public unsafe GarnetStatus HyperLogLogLength<TContext>(ref RawStringInput input,
sectorAlignedMemoryPoolAlignment);
var srcReadBuffer = sectorAlignedMemoryHll1.GetValidPointer();
var dstReadBuffer = sectorAlignedMemoryHll2.GetValidPointer();
var dstMergeBuffer = new SpanByteAndMemory(srcReadBuffer, hllBufferSize);
var srcMergeBuffer = new SpanByteAndMemory(dstReadBuffer, hllBufferSize);
var srcMergeBuffer = new SpanByteAndMemory(srcReadBuffer, hllBufferSize);
var dstMergeBuffer = new SpanByteAndMemory(dstReadBuffer, hllBufferSize);
var isFirst = false;

for (var i = 0; i < input.parseState.Count; i++)
Expand Down Expand Up @@ -227,6 +227,7 @@ public unsafe GarnetStatus HyperLogLogMerge(ref RawStringInput input, out bool e
// Handle case merging source key does not exist
if (status == GarnetStatus.NOTFOUND)
continue;

// Invalid Type
if (*(long*)readBuffer == -1)
{
Expand Down
15 changes: 15 additions & 0 deletions test/Garnet.test.cluster/ClusterTestContext.cs
Original file line number Diff line number Diff line change
Expand Up @@ -179,6 +179,7 @@ public void RegisterCustomTxn(string name, Func<CustomTransactionProcedure> proc
/// <param name="OnDemandCheckpoint"></param>
/// <param name="AofMemorySize"></param>
/// <param name="CommitFrequencyMs"></param>
/// <param name="useAofNullDevice"></param>
/// <param name="DisableStorageTier"></param>
/// <param name="EnableIncrementalSnapshots"></param>
/// <param name="FastCommit"></param>
Expand All @@ -193,11 +194,25 @@ public void RegisterCustomTxn(string name, Func<CustomTransactionProcedure> proc
/// <param name="enableLua"></param>
/// <param name="asyncReplay"></param>
/// <param name="enableDisklessSync"></param>
/// <param name="replicaDisklessSyncDelay"></param>
/// <param name="replicaDisklessSyncFullSyncAofThreshold"></param>
/// <param name="luaMemoryMode"></param>
/// <param name="luaMemoryLimit"></param>
/// <param name="useHostname"></param>
/// <param name="luaTransactionMode"></param>
/// <param name="deviceType"></param>
/// <param name="clusterReplicationReestablishmentTimeout"></param>
/// <param name="aofSizeLimit"></param>
/// <param name="compactionFrequencySecs"></param>
/// <param name="compactionType"></param>
/// <param name="latencyMonitory"></param>
/// <param name="loggingFrequencySecs"></param>
/// <param name="checkpointThrottleFlushDelayMs"></param>
/// <param name="clusterReplicaResumeWithData"></param>
/// <param name="replicaSyncTimeout"></param>
/// <param name="expiredObjectCollectionFrequencySecs"></param>
/// <param name="clusterPreferredEndpointType"></param>
/// <param name="useClusterAnnounceHostname"></param>
public void CreateInstances(
int shards,
bool enableCluster = true,
Expand Down
29 changes: 29 additions & 0 deletions test/Garnet.test.cluster/RedirectTests/BaseCommand.cs
Original file line number Diff line number Diff line change
Expand Up @@ -47,6 +47,11 @@ public abstract class BaseCommand
/// </summary>
public virtual bool RequiresExistingKey => false;

/// <summary>
/// Check if command requires object parameters to avoid ASCII encoding loss.
/// </summary>
public virtual bool RequiresObjectParameters => false;

/// <summary>
/// Command name
/// </summary>
Expand Down Expand Up @@ -80,6 +85,12 @@ public BaseCommand()
/// <returns></returns>
public abstract string[] GetSingleSlotRequest();

/// <summary>
/// Generate a request for this command that references a single slot.
/// </summary>
/// <returns></returns>
public virtual object[] GetSingleSlotObjectRequest() => throw new NotImplementedException();

/// <summary>
/// Generate a request for this command that references at least two slots
/// NOTE: available only for multi-key operations
Expand Down Expand Up @@ -684,6 +695,7 @@ internal class RESTORE : BaseCommand
{
private int counter = -1;

public override bool RequiresObjectParameters => true;
public override bool IsArrayCommand => false;
public override bool ArrayResponse => false;
public override string Command => nameof(RESTORE);
Expand All @@ -705,6 +717,23 @@ public override string[] GetSingleSlotRequest()
return [$"{ssk[0]}-{counter}", "0", Encoding.ASCII.GetString(payload)];
}

public override object[] GetSingleSlotObjectRequest()
{
counter += 1;

var payload = new byte[]
{
0x00, // value type
0x03, // length of payload
0x76, 0x61, 0x6C, // 'v', 'a', 'l'
0x0B, 0x00, // RDB version
0xDB, 0x82, 0x3C, 0x30, 0x38, 0x78, 0x5A, 0x99 // Crc64
};

var ssk = GetSingleSlotKeys;
return [$"{ssk[0]}-{counter}", "0", payload];
}

public override string[] GetCrossSlotRequest() => throw new NotImplementedException();

public override ArraySegment<string>[] SetupSingleSlotRequest() => throw new NotImplementedException();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -332,7 +332,14 @@ void SERedisOKTest(BaseCommand command)
{
try
{
_ = context.clusterTestUtils.GetServer(requestNodeIndex).Execute(command.Command, command.GetSingleSlotRequest());
if (command.RequiresObjectParameters)
{
_ = context.clusterTestUtils.GetServer(requestNodeIndex).Execute(command.Command, command.GetSingleSlotObjectRequest());
}
else
{
_ = context.clusterTestUtils.GetServer(requestNodeIndex).Execute(command.Command, command.GetSingleSlotRequest());
}
}
catch (Exception ex)
{
Expand All @@ -346,10 +353,13 @@ void GarnetClientSessionOK(BaseCommand command)
var client = context.clusterTestUtils.GetGarnetClientSession(requestNodeIndex);
try
{
if (command.ArrayResponse)
_ = client.ExecuteForArrayAsync(command.GetSingleSlotRequestWithCommand).GetAwaiter().GetResult();
else
_ = client.ExecuteAsync(command.GetSingleSlotRequestWithCommand).GetAwaiter().GetResult();
if (!command.RequiresObjectParameters)
{
if (command.ArrayResponse)
_ = client.ExecuteForArrayAsync(command.GetSingleSlotRequestWithCommand).GetAwaiter().GetResult();
else
_ = client.ExecuteAsync(command.GetSingleSlotRequestWithCommand).GetAwaiter().GetResult();
}
}
catch (Exception ex)
{
Expand Down
Loading
Loading