Skip to content

Commit

Permalink
RavenDB-17054 We need to get the write lock when accessing compressio…
Browse files Browse the repository at this point in the history
…n.buffers during ZeroCompressionBuffer() call. In RavenDB we call it on our own if a database is encrypted from the tx merger and an index instance so we need to ensure it won't be called concurrently.
  • Loading branch information
arekpalinski authored and ppekrol committed Aug 5, 2021
1 parent 9e4c8f8 commit ed5815d
Show file tree
Hide file tree
Showing 2 changed files with 102 additions and 5 deletions.
41 changes: 36 additions & 5 deletions src/Voron/Impl/Journal/WriteAheadJournal.cs
Original file line number Diff line number Diff line change
Expand Up @@ -1946,7 +1946,7 @@ private AbstractPager CreateCompressionPager(long initialSize)
private CompressionAccelerationStats _lastCompressionAccelerationInfo = new CompressionAccelerationStats();
private readonly bool _is32Bit;

public void ReduceSizeOfCompressionBufferIfNeeded(bool forceReduce = false)
private void ReduceSizeOfCompressionBufferIfNeeded(bool forceReduce = false)
{
var maxSize = _env.Options.MaxScratchBufferSize;
if (ShouldReduceSizeOfCompressionPager(maxSize, forceReduce) == false)
Expand All @@ -1973,16 +1973,32 @@ public void ReduceSizeOfCompressionBufferIfNeeded(bool forceReduce = false)
_lastCompressionBufferReduceCheck = DateTime.UtcNow;

_compressionPager.Dispose();

_forTestingPurposes?.OnReduceSizeOfCompressionBufferIfNeeded_RightAfterDisposingCompressionPager?.Invoke();

_compressionPager = CreateCompressionPager(maxSize);
}

public void ZeroCompressionBuffer(IPagerLevelTransactionState tx)
{
var compressionBufferSize = _compressionPager.NumberOfAllocatedPages * Constants.Storage.PageSize;
_compressionPager.EnsureMapped(tx, 0, checked((int)_compressionPager.NumberOfAllocatedPages));
var pagePointer = _compressionPager.AcquirePagePointer(tx, 0);
var lockTaken = false;

if (Monitor.IsEntered(_writeLock) == false)
Monitor.Enter(_writeLock, ref lockTaken);

Sodium.sodium_memzero(pagePointer, (UIntPtr)compressionBufferSize);
try
{
var compressionBufferSize = _compressionPager.NumberOfAllocatedPages * Constants.Storage.PageSize;
_compressionPager.EnsureMapped(tx, 0, checked((int)_compressionPager.NumberOfAllocatedPages));
var pagePointer = _compressionPager.AcquirePagePointer(tx, 0);

Sodium.sodium_memzero(pagePointer, (UIntPtr)compressionBufferSize);
}
finally
{
if (lockTaken)
Monitor.Exit(_writeLock);
}
}

private bool ShouldReduceSizeOfCompressionPager(long maxSize, bool forceReduce)
Expand Down Expand Up @@ -2021,6 +2037,21 @@ public void TryReduceSizeOfCompressionBufferIfNeeded()
Monitor.Exit(_writeLock);
}
}

private TestingStuff _forTestingPurposes;

internal TestingStuff ForTestingPurposesOnly()
{
if (_forTestingPurposes != null)
return _forTestingPurposes;

return _forTestingPurposes = new TestingStuff();
}

internal class TestingStuff
{
internal Action OnReduceSizeOfCompressionBufferIfNeeded_RightAfterDisposingCompressionPager;
}
}

public unsafe struct CompressedPagesResult
Expand Down
66 changes: 66 additions & 0 deletions test/SlowTests/Voron/Issues/RavenDB_17054.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,66 @@
using System;
using System.Threading;
using FastTests.Voron;
using Sparrow.Server;
using Voron;
using Xunit;
using Xunit.Abstractions;

namespace SlowTests.Voron.Issues
{
public class RavenDB_17054 : StorageTest
{
public RavenDB_17054(ITestOutputHelper output) : base(output)
{
}

protected override void Configure(StorageEnvironmentOptions options)
{
options.Encryption.MasterKey = Sodium.GenerateRandomBuffer((int)Sodium.crypto_aead_xchacha20poly1305_ietf_keybytes());
options.MaxScratchBufferSize = 65536 - 1; // to make ShouldReduceSizeOfCompressionPager() return true
options.Encryption.RegisterForJournalCompressionHandler();
}

[Fact]
public void WeNeedToTakeWriteLockWhenZeroCompressionBufferIsCalled()
{
var testingActionWasCalled = false;

Exception startTransactionException = null;

var zeroCompressionBufferThread = new Thread(() =>
{
try
{
using (var tx = Env.WriteTransaction())
{
Env.Journal.ZeroCompressionBuffer(tx.LowLevelTransaction); // this can be called by an index e.g. when forceMemoryCleanup is true
}
}
catch (Exception e)
{
startTransactionException = e;
}
});

Env.Journal.ForTestingPurposesOnly().OnReduceSizeOfCompressionBufferIfNeeded_RightAfterDisposingCompressionPager += () =>
{
testingActionWasCalled = true;
zeroCompressionBufferThread.Start();
Thread.Sleep(1000); // give the thread more time to ensure that ZeroCompressionBuffer will wait for _writeLock
};

// in RavenDB we call TryReduceSizeOfCompressionBufferIfNeeded when a database is idle, then we call IndexStore?.RunIdleOperations(mode)
// although an index can still run ZeroCompressionBuffer concurrently e.g. when forceMemoryCleanup is true
Env.Journal.TryReduceSizeOfCompressionBufferIfNeeded();

Assert.True(testingActionWasCalled);

Assert.True(zeroCompressionBufferThread.Join(TimeSpan.FromSeconds(30)), "zeroCompressionBufferThread.Join(TimeSpan.FromSeconds(30))");

Assert.Null(startTransactionException);
}
}
}

0 comments on commit ed5815d

Please sign in to comment.