Skip to content

Commit

Permalink
Optimization Serialize Performance When Key Expired
Browse files Browse the repository at this point in the history
  • Loading branch information
InCerryGit committed Nov 25, 2022
1 parent aa5ca5c commit 635d92d
Show file tree
Hide file tree
Showing 14 changed files with 740 additions and 87 deletions.
7 changes: 7 additions & 0 deletions src/FasterKv.Cache.Core/Abstractions/ISystemClock.cs
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,8 @@ namespace FasterKv.Cache.Core.Abstractions;
public interface ISystemClock
{
DateTimeOffset Now();

long NowUnixTimestamp();
}

public sealed class DefaultSystemClock : ISystemClock
Expand All @@ -13,4 +15,9 @@ public DateTimeOffset Now()
{
return DateTimeOffset.Now;
}

public long NowUnixTimestamp()
{
return DateTimeOffset.UtcNow.ToUnixTimeMilliseconds();
}
}
53 changes: 49 additions & 4 deletions src/FasterKv.Cache.Core/Abstractions/ValueWrapper.cs
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
using System;
using System.Buffers;
using FasterKv.Cache.Core.Serializers;

namespace FasterKv.Cache.Core;

Expand Down Expand Up @@ -29,9 +30,31 @@ public ValueWrapper(T? data, long? expiryTime = null)
/// <summary>
/// HasExpired
/// </summary>
/// <param name="now">Now</param>
/// <param name="nowTimestamp">Now</param>
/// <returns>value has expired</returns>
public bool HasExpired(DateTimeOffset now) => now.ToUnixTimeMilliseconds() > ExpiryTime;
public bool HasExpired(long nowTimestamp) => nowTimestamp > ExpiryTime;

/// <summary>
/// Get FasterKvSerializerFlags
/// </summary>
/// <param name="nowTimestamp"></param>
/// <returns></returns>
internal FasterKvSerializerFlags GetFlags(long nowTimestamp)
{
var flags = FasterKvSerializerFlags.None;
if (ExpiryTime is not null)
{
flags |= FasterKvSerializerFlags.HasExpiryTime;
}

// don't serializer expired value body
if (Data is not null && HasExpired(nowTimestamp) == false)
{
flags |= FasterKvSerializerFlags.HasBody;
}

return flags;
}
}

internal sealed class ValueWrapper
Expand Down Expand Up @@ -63,9 +86,31 @@ public ValueWrapper(object? data, long? expiryTime = null)
/// <summary>
/// HasExpired
/// </summary>
/// <param name="now">Now</param>
/// <param name="nowTimestamp">Now</param>
/// <returns>value has expired</returns>
public bool HasExpired(DateTimeOffset now) => now.ToUnixTimeMilliseconds() > ExpiryTime;
public bool HasExpired(long nowTimestamp) => nowTimestamp > ExpiryTime;

/// <summary>
/// Get FasterKvSerializerFlags
/// </summary>
/// <param name="nowTimestamp"></param>
/// <returns></returns>
internal FasterKvSerializerFlags GetFlags(long nowTimestamp)
{
var flags = FasterKvSerializerFlags.None;
if (ExpiryTime is not null)
{
flags |= FasterKvSerializerFlags.HasExpiryTime;
}

// don't serializer expired value body
if (Data is not null && HasExpired(nowTimestamp) == false)
{
flags |= FasterKvSerializerFlags.HasBody;
}

return flags;
}

/// <summary>
/// Get TValue From Data or DataBytes
Expand Down
8 changes: 4 additions & 4 deletions src/FasterKv.Cache.Core/FasterKvCache.cs
Original file line number Diff line number Diff line change
Expand Up @@ -52,7 +52,7 @@ public FasterKvCache(string name,
var serializer = new SerializerSettings<string, ValueWrapper>
{
keySerializer = () => new StringSerializer(),
valueSerializer = () => new FasterKvSerializer(_valueSerializer)
valueSerializer = () => new FasterKvSerializer(_valueSerializer, _systemClock)
};

_logSettings = options.GetLogSettings(name);
Expand Down Expand Up @@ -95,7 +95,7 @@ public FasterKvCache(string name,
return default;
}

if (result.output.HasExpired(_systemClock.Now()))
if (result.output.HasExpired(_systemClock.NowUnixTimestamp()))
{
Delete(key);
return default;
Expand Down Expand Up @@ -139,7 +139,7 @@ public void Set<TValue>(string key, TValue value, TimeSpan expiryTime)
return default;
}

if (result.output.HasExpired(_systemClock.Now()))
if (result.output.HasExpired(_systemClock.NowUnixTimestamp()))
{
await DeleteAsync(key, token);
return default;
Expand Down Expand Up @@ -234,7 +234,7 @@ private async Task ExpiryScanLoop()
context.FinalizeRead(out result.status, out result.output);
}

if (result.status.Found && result.output.HasExpired(_systemClock.Now()))
if (result.status.Found && result.output.HasExpired(_systemClock.NowUnixTimestamp()))
{
sessionWrap.Session.Delete(key);
}
Expand Down
8 changes: 4 additions & 4 deletions src/FasterKv.Cache.Core/FasterKvStore.TValue.cs
Original file line number Diff line number Diff line change
Expand Up @@ -50,7 +50,7 @@ public FasterKvCache(string name,
var serializer = new SerializerSettings<string, ValueWrapper<TValue>>
{
keySerializer = () => new StringSerializer(),
valueSerializer = () => new FasterKvSerializer<TValue>(valueSerializer)
valueSerializer = () => new FasterKvSerializer<TValue>(valueSerializer, _systemClock)
};

_logSettings = options.GetLogSettings(name);
Expand Down Expand Up @@ -88,7 +88,7 @@ public FasterKvCache(string name,
context.FinalizeRead(out result.status, out result.output);
}

if (result.output.HasExpired(_systemClock.Now()))
if (result.output.HasExpired(_systemClock.NowUnixTimestamp()))
{
Delete(key);
return default;
Expand Down Expand Up @@ -133,7 +133,7 @@ public void Set(string key, TValue value, TimeSpan expiryTime)
using var scopeSession = GetSessionWrap();
var result = (await scopeSession.Session.ReadAsync(ref key, token: token)).Complete();

if (result.output.HasExpired(_systemClock.Now()))
if (result.output.HasExpired(_systemClock.NowUnixTimestamp()))
{
await DeleteAsync(key, token);
return default;
Expand Down Expand Up @@ -220,7 +220,7 @@ private async Task ExpiryScanLoop()
context.FinalizeRead(out result.status, out result.output);
}

if (result.status.Found && result.output.HasExpired(_systemClock.Now()))
if (result.status.Found && result.output.HasExpired(_systemClock.NowUnixTimestamp()))
{
sessionWrap.Session.Delete(key);
}
Expand Down
69 changes: 41 additions & 28 deletions src/FasterKv.Cache.Core/Serializers/FasterKvSerializer.TValue.cs
Original file line number Diff line number Diff line change
@@ -1,61 +1,74 @@
using System.Buffers;
using FASTER.core;
using FasterKv.Cache.Core.Abstractions;

namespace FasterKv.Cache.Core.Serializers;

internal sealed class FasterKvSerializer<TValue> : BinaryObjectSerializer<ValueWrapper<TValue>>
{
private readonly ISystemClock _systemClock;
private readonly IFasterKvCacheSerializer _serializer;

public FasterKvSerializer(IFasterKvCacheSerializer serializer)
public FasterKvSerializer(IFasterKvCacheSerializer serializer, ISystemClock systemClock)
{
_serializer = serializer.ArgumentNotNull();
_systemClock = systemClock.ArgumentNotNull();
}

public override void Deserialize(out ValueWrapper<TValue> obj)
{
obj = new ValueWrapper<TValue>();
var etNullFlag = reader.ReadByte();
if (etNullFlag == 1)
var flags = (FasterKvSerializerFlags)reader.ReadByte();
if ((flags & FasterKvSerializerFlags.HasExpiryTime) == FasterKvSerializerFlags.HasExpiryTime)
{
obj.ExpiryTime = reader.ReadInt64();
}

var dataLength = reader.ReadInt32();
var buffer = ArrayPool<byte>.Shared.Rent(dataLength);
try
if ((flags & FasterKvSerializerFlags.HasBody) == FasterKvSerializerFlags.HasBody)
{
_ = reader.Read(buffer, 0, dataLength);
obj.Data = _serializer.Deserialize<TValue>(buffer, dataLength);
}
finally
{
ArrayPool<byte>.Shared.Return(buffer);
var dataLength = reader.ReadInt32();
if (obj.HasExpired(_systemClock.NowUnixTimestamp()))
{
reader.BaseStream.Position += dataLength;
}
else
{
var buffer = ArrayPool<byte>.Shared.Rent(dataLength);
try
{
_ = reader.Read(buffer, 0, dataLength);
obj.Data = _serializer.Deserialize<TValue>(buffer, dataLength);
}
finally
{
ArrayPool<byte>.Shared.Return(buffer);
}
}
}

}

public override void Serialize(ref ValueWrapper<TValue> obj)
{
if (obj.ExpiryTime is null)
{
// write Expiry Time is null flag
writer.Write((byte)0);
}
else
var flags = obj.GetFlags(_systemClock.NowUnixTimestamp());
writer.Write((byte)flags);
if ((flags & FasterKvSerializerFlags.HasExpiryTime) == FasterKvSerializerFlags.HasExpiryTime)
{
writer.Write((byte)1);
writer.Write(obj.ExpiryTime.Value);
writer.Write(obj.ExpiryTime!.Value);
}

var beforePos = writer.BaseStream.Position;
var dataPos = writer.BaseStream.Position = writer.BaseStream.Position += sizeof(int);
_serializer.Serialize(writer.BaseStream, obj.Data);
var afterPos = writer.BaseStream.Position;
if ((flags & FasterKvSerializerFlags.HasBody) == FasterKvSerializerFlags.HasBody)
{
var beforePos = writer.BaseStream.Position;
var dataPos = writer.BaseStream.Position = writer.BaseStream.Position += sizeof(int);
_serializer.Serialize(writer.BaseStream, obj.Data);
var afterPos = writer.BaseStream.Position;

var length = (int)(afterPos - dataPos);
writer.BaseStream.Position = beforePos;
var length = (int)(afterPos - dataPos);
writer.BaseStream.Position = beforePos;

writer.Write(length);
writer.BaseStream.Position = afterPos;
writer.Write(length);
writer.BaseStream.Position = afterPos;
}
}
}
68 changes: 45 additions & 23 deletions src/FasterKv.Cache.Core/Serializers/FasterKvSerializer.cs
Original file line number Diff line number Diff line change
@@ -1,53 +1,75 @@
using System.Buffers;
using System;
using System.Buffers;
using FASTER.core;
using FasterKv.Cache.Core.Abstractions;

namespace FasterKv.Cache.Core.Serializers;

internal sealed class FasterKvSerializer : BinaryObjectSerializer<ValueWrapper>
{
private readonly ISystemClock _systemClock;
private readonly IFasterKvCacheSerializer _serializer;

public FasterKvSerializer(IFasterKvCacheSerializer serializer)
public FasterKvSerializer(IFasterKvCacheSerializer serializer, ISystemClock systemClock)
{
_systemClock = systemClock.ArgumentNotNull();
_serializer = serializer.ArgumentNotNull();
}

public override void Deserialize(out ValueWrapper obj)
{
obj = new ValueWrapper();
var etNullFlag = reader.ReadByte();
if (etNullFlag == 1)
var flags = (FasterKvSerializerFlags)reader.ReadByte();
if ((flags & FasterKvSerializerFlags.HasExpiryTime) == FasterKvSerializerFlags.HasExpiryTime)
{
obj.ExpiryTime = reader.ReadInt64();
}

obj.DataByteLength = reader.ReadInt32();
obj.DataBytes = ArrayPool<byte>.Shared.Rent(obj.DataByteLength);
_ = reader.Read(obj.DataBytes, 0, obj.DataByteLength);
if ((flags & FasterKvSerializerFlags.HasBody) == FasterKvSerializerFlags.HasBody)
{
obj.DataByteLength = reader.ReadInt32();
if (obj.HasExpired(_systemClock.NowUnixTimestamp()))
{
reader.BaseStream.Position += obj.DataByteLength;
obj.DataByteLength = 0;
}
else
{
obj.DataBytes = ArrayPool<byte>.Shared.Rent(obj.DataByteLength);
_ = reader.Read(obj.DataBytes, 0, obj.DataByteLength);
}
}
}

public override void Serialize(ref ValueWrapper obj)
{
if (obj.ExpiryTime is null)
{
// write Expiry Time is null flag
writer.Write((byte)0);
}
else
var flags = obj.GetFlags(_systemClock.NowUnixTimestamp());
writer.Write((byte)flags);
if ((flags & FasterKvSerializerFlags.HasExpiryTime) == FasterKvSerializerFlags.HasExpiryTime)
{
writer.Write((byte)1);
writer.Write(obj.ExpiryTime.Value);
writer.Write(obj.ExpiryTime!.Value);
}

var beforePos = writer.BaseStream.Position;
var dataPos = writer.BaseStream.Position = writer.BaseStream.Position += sizeof(int);
_serializer.Serialize(writer.BaseStream, obj.Data);
var afterPos = writer.BaseStream.Position;
if ((flags & FasterKvSerializerFlags.HasBody) == FasterKvSerializerFlags.HasBody)
{
var beforePos = writer.BaseStream.Position;
var dataPos = writer.BaseStream.Position = writer.BaseStream.Position += sizeof(int);
_serializer.Serialize(writer.BaseStream, obj.Data);
var afterPos = writer.BaseStream.Position;

var length = (int)(afterPos - dataPos);
writer.BaseStream.Position = beforePos;
var length = (int)(afterPos - dataPos);
writer.BaseStream.Position = beforePos;

writer.Write(length);
writer.BaseStream.Position = afterPos;
writer.Write(length);
writer.BaseStream.Position = afterPos;
}
}
}

[Flags]
internal enum FasterKvSerializerFlags : byte
{
None = 0,
HasExpiryTime = 1 << 0,
HasBody = 1 << 1
}
Original file line number Diff line number Diff line change
@@ -1,15 +1,17 @@
<Project Sdk="Microsoft.NET.Sdk">

<PropertyGroup>
<TargetFramework>net6.0</TargetFramework>
<TargetFramework>net7.0</TargetFramework>
<ImplicitUsings>enable</ImplicitUsings>
<Nullable>enable</Nullable>

<IsPackable>false</IsPackable>
<AllowUnsafeBlocks>true</AllowUnsafeBlocks>
<LangVersion>11</LangVersion>
</PropertyGroup>

<ItemGroup>
<PackageReference Include="Microsoft.NET.Test.Sdk" Version="17.1.0" />
<PackageReference Include="Moq" Version="4.18.2" />
<PackageReference Include="xunit" Version="2.4.1" />
<PackageReference Include="xunit.runner.visualstudio" Version="2.4.3">
<IncludeAssets>runtime; build; native; contentfiles; analyzers; buildtransitive</IncludeAssets>
Expand Down
Loading

0 comments on commit 635d92d

Please sign in to comment.