Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Invoke custom commands from custom proc/txn #597

Merged
merged 26 commits into from
Nov 22, 2024
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
Show all changes
26 commits
Select commit Hold shift + click to select a range
d5037fb
Invoke custom raw string cmd from custom proc/txn
yrajas Aug 16, 2024
f83460f
Support cmd name matching
yrajas Oct 21, 2024
0115785
API changes.
yrajas Oct 21, 2024
57f0fce
Added custom object command support.
yrajas Oct 22, 2024
bc45d28
Cleanup.
yrajas Oct 22, 2024
c4b57c4
Updated tests.
yrajas Oct 22, 2024
1f57daf
Format fix.
yrajas Oct 22, 2024
4b31426
Added tests.
yrajas Oct 22, 2024
2bc94b4
Unified API. Added test for invalid command.
yrajas Oct 22, 2024
daa4932
Fixed formatting.
yrajas Oct 22, 2024
3e9022c
Merged latest changes.
yrajas Oct 23, 2024
3c6b5a9
Separate parsing to a separate API to avoid parsing in the hot path.
yrajas Nov 1, 2024
4f7699b
Merge branch 'main' into yrajas/proctocmd
yrajas Nov 1, 2024
59d062f
Merge branch 'main' into yrajas/proctocmd
yrajas Nov 11, 2024
ca36414
Renamed to CustomProcedureFactory
yrajas Nov 11, 2024
51f5f42
Merge branch 'main' into yrajas/proctocmd
yrajas Nov 12, 2024
352aeea
Merge branch 'main' into yrajas/proctocmd
yrajas Nov 12, 2024
3263dc8
Made session maps as private.
yrajas Nov 16, 2024
226786c
Merge branch 'main' into yrajas/proctocmd
yrajas Nov 16, 2024
0c5d51d
Merge branch 'main' into yrajas/proctocmd
TalZaccai Nov 18, 2024
315a15d
Adding an additional parse state for custom commands use
TalZaccai Nov 21, 2024
39771cd
merging with latest main
TalZaccai Nov 21, 2024
162926d
Merge branch 'main' into yrajas/proctocmd
yrajas Nov 22, 2024
d398fa1
Merge branch 'main' into yrajas/proctocmd
yrajas Nov 22, 2024
effe25d
Merge branch 'main' into yrajas/proctocmd
yrajas Nov 22, 2024
5b712a6
Merge branch 'main' into yrajas/proctocmd
yrajas Nov 22, 2024
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Prev Previous commit
Next Next commit
Unified API. Added test for invalid command.
  • Loading branch information
yrajas committed Oct 31, 2024
commit 2bc94b4fabb2487098df48c9f82c2ec87a44be16
19 changes: 11 additions & 8 deletions libs/server/Custom/CustomFunctions.cs
Original file line number Diff line number Diff line change
Expand Up @@ -210,16 +210,19 @@ protected static unsafe ArgSlice GetNextArg(ref CustomProcedureInput procInput,
return GetNextArg(ref procInput.parseState, ref idx);
}

protected void ExecuteCustomRawStringCommand<TGarnetApi>(TGarnetApi garnetApi, string cmd, ArgSlice key, ArgSlice[] input, out ArgSlice output)
where TGarnetApi : IGarnetApi
{
respServerSession.InvokeCustomRawStringCommand(ref garnetApi, cmd, key, input, out output);
}

protected void ExecuteCustomObjectCommand<TGarnetApi>(TGarnetApi garnetApi, string cmd, ArgSlice key, ArgSlice[] input, out ArgSlice output)
/// <summary>
/// Execute custom raw string or object command
/// </summary>
/// <param name="garnetApi"></param>
/// <param name="cmd">Command to execute</param>
/// <param name="key">Key argument</param>
/// <param name="input">Array of arguments to command</param>
/// <param name="output">Output from command</param>
/// <returns>True if command found, else false</returns>
protected bool ExecuteCustomCommand<TGarnetApi>(TGarnetApi garnetApi, string cmd, ArgSlice key, ArgSlice[] input, out ArgSlice output)
where TGarnetApi : IGarnetApi
{
respServerSession.InvokeCustomObjectCommand(ref garnetApi, cmd, key, input, out output);
return respServerSession.InvokeCustomCommand(ref garnetApi, cmd, key, input, out output);
}
}
}
266 changes: 137 additions & 129 deletions libs/server/Custom/CustomRespCommands.cs
Original file line number Diff line number Diff line change
Expand Up @@ -10,8 +10,8 @@
namespace Garnet.server
{

/// <summary>
/// Server session for RESP protocol - basic commands are in this file
var inputArg = expirationTicks > 0 ? DateTimeOffset.UtcNow.Ticks + expirationTicks : expirationTicks;
var input = new RawStringInput(cmd, ref parseState, 1, -1, inputArg);
/// </summary>
internal sealed unsafe partial class RespServerSession : ServerSessionBase
{
Expand Down Expand Up @@ -82,66 +82,6 @@ private void TryCustomProcedure(CustomProcedure proc, int startIdx = 0)
}
}

public bool InvokeCustomRawStringCommand<TGarnetApi>(ref TGarnetApi storageApi, string cmd, ArgSlice key, ArgSlice[] args, out ArgSlice output)
where TGarnetApi : IGarnetAdvancedApi
{
if (!storeWrapper.customCommandManager.Match(new ReadOnlySpan<byte>(Encoding.UTF8.GetBytes(cmd)), out CustomRawStringCommand customCommand))
{
output = scratchBufferManager.CreateArgSlice(CmdStrings.RESP_ERR_GENERIC_UNK_CMD);
return false;
}

var sbKey = key.SpanByte;

var inputArg = customCommand.expirationTicks > 0 ? DateTimeOffset.UtcNow.Ticks + customCommand.expirationTicks : customCommand.expirationTicks;
var sessionParseState = new SessionParseState();
sessionParseState.InitializeWithArguments(args);
var rawStringInput = new RawStringInput(customCommand.GetRespCommand(), ref sessionParseState, arg1: inputArg);

var _output = new SpanByteAndMemory(null);
GarnetStatus status;
if (customCommand.type == CommandType.ReadModifyWrite)
{
status = storageApi.RMW_MainStore(ref sbKey, ref rawStringInput, ref _output);
Debug.Assert(!_output.IsSpanByte);

if (_output.Memory != null)
{
output = scratchBufferManager.FormatScratch(0, _output.AsReadOnlySpan());
_output.Memory.Dispose();
}
else
{
output = scratchBufferManager.CreateArgSlice(CmdStrings.RESP_OK);
}
}
else
{
status = storageApi.Read_MainStore(ref sbKey, ref rawStringInput, ref _output);
Debug.Assert(!_output.IsSpanByte);

if (status == GarnetStatus.OK)
{
if (_output.Memory != null)
{
output = scratchBufferManager.FormatScratch(0, _output.AsReadOnlySpan());
_output.Memory.Dispose();
}
else
{
output = scratchBufferManager.CreateArgSlice(CmdStrings.RESP_OK);
}
}
else
{
Debug.Assert(_output.Memory == null);
output = scratchBufferManager.CreateArgSlice(CmdStrings.RESP_ERRNOTFOUND);
}
}

return true;
}

/// <summary>
/// Custom command
/// </summary>
Expand Down Expand Up @@ -190,70 +130,6 @@ private bool TryCustomRawStringCommand<TGarnetApi>(RespCommand cmd, long expirat
return true;
}

public bool InvokeCustomObjectCommand<TGarnetApi>(ref TGarnetApi storageApi, string cmd, ArgSlice key, ArgSlice[] args, out ArgSlice output)
where TGarnetApi : IGarnetAdvancedApi
{
output = default;
if (!storeWrapper.customCommandManager.Match(new ReadOnlySpan<byte>(Encoding.UTF8.GetBytes(cmd)), out CustomObjectCommand customObjCommand))
{
output = scratchBufferManager.CreateArgSlice(CmdStrings.RESP_ERR_GENERIC_UNK_CMD);
return false;
}

var keyBytes = key.ToArray();

// Prepare input
var header = new RespInputHeader(customObjCommand.GetRespCommand()) { SubId = customObjCommand.subid };
var sessionParseState = new SessionParseState();
sessionParseState.InitializeWithArguments(args);
var input = new ObjectInput(header, ref sessionParseState);

var _output = new GarnetObjectStoreOutput { spanByteAndMemory = new SpanByteAndMemory(null) };
GarnetStatus status;
if (customObjCommand.type == CommandType.ReadModifyWrite)
{
status = storageApi.RMW_ObjectStore(ref keyBytes, ref input, ref _output);
Debug.Assert(!_output.spanByteAndMemory.IsSpanByte);

switch (status)
{
case GarnetStatus.WRONGTYPE:
output = scratchBufferManager.CreateArgSlice(CmdStrings.RESP_ERR_WRONG_TYPE);
break;
default:
if (_output.spanByteAndMemory.Memory != null)
output = scratchBufferManager.FormatScratch(0, _output.spanByteAndMemory.AsReadOnlySpan());
else
output = scratchBufferManager.CreateArgSlice(CmdStrings.RESP_OK);
break;
}
}
else
{
status = storageApi.Read_ObjectStore(ref keyBytes, ref input, ref _output);
Debug.Assert(!_output.spanByteAndMemory.IsSpanByte);

switch (status)
{
case GarnetStatus.OK:
if (_output.spanByteAndMemory.Memory != null)
output = scratchBufferManager.FormatScratch(0, _output.spanByteAndMemory.AsReadOnlySpan());
else
output = scratchBufferManager.CreateArgSlice(CmdStrings.RESP_OK);
break;
case GarnetStatus.NOTFOUND:
Debug.Assert(_output.spanByteAndMemory.Memory == null);
output = scratchBufferManager.CreateArgSlice(CmdStrings.RESP_ERRNOTFOUND);
break;
case GarnetStatus.WRONGTYPE:
output = scratchBufferManager.CreateArgSlice(CmdStrings.RESP_ERR_WRONG_TYPE);
break;
}
}

return true;
}

/// <summary>
/// Custom object command
/// </summary>
Expand All @@ -269,10 +145,10 @@ private bool TryCustomObjectCommand<TGarnetApi>(GarnetObjectType objType, byte s

var output = new GarnetObjectStoreOutput { spanByteAndMemory = new SpanByteAndMemory(null) };

GarnetStatus status;
var header = new RespInputHeader(objType) { SubId = subid };
var input = new ObjectInput(header, ref parseState, 1);

if (type == CommandType.ReadModifyWrite)
{
var output = new GarnetObjectStoreOutput { spanByteAndMemory = new SpanByteAndMemory(null) };
status = storageApi.RMW_ObjectStore(ref keyBytes, ref input, ref output);
Debug.Assert(!output.spanByteAndMemory.IsSpanByte);

Expand Down Expand Up @@ -319,5 +195,137 @@ private bool TryCustomObjectCommand<TGarnetApi>(GarnetObjectType objType, byte s

return true;
}

public bool InvokeCustomCommand<TGarnetApi>(ref TGarnetApi storageApi, string cmd, ArgSlice key, ArgSlice[] args, out ArgSlice output)
where TGarnetApi : IGarnetAdvancedApi
{
if (storeWrapper.customCommandManager.Match(new ReadOnlySpan<byte>(Encoding.UTF8.GetBytes(cmd)), out CustomRawStringCommand customCommand))
{
return InvokeCustomRawStringCommand(ref storageApi, customCommand, key, args, out output);
}
else if (storeWrapper.customCommandManager.Match(new ReadOnlySpan<byte>(Encoding.UTF8.GetBytes(cmd)), out CustomObjectCommand customObjCommand))
{
return InvokeCustomObjectCommand(ref storageApi, customObjCommand, key, args, out output);
}
else
{
output = scratchBufferManager.CreateArgSlice(CmdStrings.RESP_ERR_GENERIC_UNK_CMD);
return false;
}
}

private bool InvokeCustomRawStringCommand<TGarnetApi>(ref TGarnetApi storageApi, CustomRawStringCommand customCommand, ArgSlice key, ArgSlice[] args, out ArgSlice output)
where TGarnetApi : IGarnetAdvancedApi
{
var sbKey = key.SpanByte;

var inputArg = customCommand.expirationTicks > 0 ? DateTimeOffset.UtcNow.Ticks + customCommand.expirationTicks : customCommand.expirationTicks;
var sessionParseState = new SessionParseState();
sessionParseState.InitializeWithArguments(args);
var rawStringInput = new RawStringInput(customCommand.GetRespCommand(), ref sessionParseState, arg1: inputArg);

var _output = new SpanByteAndMemory(null);
GarnetStatus status;
if (customCommand.type == CommandType.ReadModifyWrite)
{
status = storageApi.RMW_MainStore(ref sbKey, ref rawStringInput, ref _output);
Debug.Assert(!_output.IsSpanByte);

if (_output.Memory != null)
{
output = scratchBufferManager.FormatScratch(0, _output.AsReadOnlySpan());
_output.Memory.Dispose();
}
else
{
output = scratchBufferManager.CreateArgSlice(CmdStrings.RESP_OK);
}
}
else
{
status = storageApi.Read_MainStore(ref sbKey, ref rawStringInput, ref _output);
Debug.Assert(!_output.IsSpanByte);

if (status == GarnetStatus.OK)
{
if (_output.Memory != null)
{
output = scratchBufferManager.FormatScratch(0, _output.AsReadOnlySpan());
_output.Memory.Dispose();
}
else
{
output = scratchBufferManager.CreateArgSlice(CmdStrings.RESP_OK);
}
}
else
{
Debug.Assert(_output.Memory == null);
output = scratchBufferManager.CreateArgSlice(CmdStrings.RESP_ERRNOTFOUND);
}
}

return true;
}

private bool InvokeCustomObjectCommand<TGarnetApi>(ref TGarnetApi storageApi, CustomObjectCommand customObjCommand, ArgSlice key, ArgSlice[] args, out ArgSlice output)
where TGarnetApi : IGarnetAdvancedApi
{
output = default;

var keyBytes = key.ToArray();

// Prepare input
var header = new RespInputHeader(customObjCommand.GetObjectType()) { SubId = customObjCommand.subid };
var sessionParseState = new SessionParseState();
sessionParseState.InitializeWithArguments(args);
var input = new ObjectInput(header, ref sessionParseState);

var _output = new GarnetObjectStoreOutput { spanByteAndMemory = new SpanByteAndMemory(null) };
GarnetStatus status;
if (customObjCommand.type == CommandType.ReadModifyWrite)
{
status = storageApi.RMW_ObjectStore(ref keyBytes, ref input, ref _output);
Debug.Assert(!_output.spanByteAndMemory.IsSpanByte);

switch (status)
{
case GarnetStatus.WRONGTYPE:
output = scratchBufferManager.CreateArgSlice(CmdStrings.RESP_ERR_WRONG_TYPE);
break;
default:
if (_output.spanByteAndMemory.Memory != null)
output = scratchBufferManager.FormatScratch(0, _output.spanByteAndMemory.AsReadOnlySpan());
else
output = scratchBufferManager.CreateArgSlice(CmdStrings.RESP_OK);
break;
}
}
else
{
status = storageApi.Read_ObjectStore(ref keyBytes, ref input, ref _output);
Debug.Assert(!_output.spanByteAndMemory.IsSpanByte);

switch (status)
{
case GarnetStatus.OK:
if (_output.spanByteAndMemory.Memory != null)
output = scratchBufferManager.FormatScratch(0, _output.spanByteAndMemory.AsReadOnlySpan());
else
output = scratchBufferManager.CreateArgSlice(CmdStrings.RESP_OK);
break;
case GarnetStatus.NOTFOUND:
Debug.Assert(_output.spanByteAndMemory.Memory == null);
output = scratchBufferManager.CreateArgSlice(CmdStrings.RESP_ERRNOTFOUND);
break;
case GarnetStatus.WRONGTYPE:
output = scratchBufferManager.CreateArgSlice(CmdStrings.RESP_ERR_WRONG_TYPE);
break;
}
}

return true;
}

}
}
2 changes: 1 addition & 1 deletion main/GarnetServer/Extensions/ProcCustomCmd.cs
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@ public override unsafe bool Execute<TGarnetApi>(TGarnetApi garnetApi, ref Custom
args[0] = GetNextArg(ref procInput, ref offset); // value to set
args[1] = GetNextArg(ref procInput, ref offset); // prefix to match

ExecuteCustomRawStringCommand(garnetApi, "SETIFPM", key, args, out var _output);
ExecuteCustomCommand(garnetApi, "SETIFPM", key, args, out var _output);
return true;
}
}
Expand Down
2 changes: 1 addition & 1 deletion main/GarnetServer/Extensions/TxnCustomCmd.cs
Original file line number Diff line number Diff line change
Expand Up @@ -41,7 +41,7 @@ public override void Main<TGarnetApi>(TGarnetApi api, ref CustomProcedureInput p
args[0] = myDictField;
args[1] = myDictValue;

ExecuteCustomObjectCommand(api, "MYDICTSET", myDictKey, args, out var _output);
ExecuteCustomCommand(api, "MYDICTSET", myDictKey, args, out var _output);

WriteSimpleString(ref output, "OK");
}
Expand Down
Loading