Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Change Feed Processor: Fixes noisy error notification when lease is stolen by other host #3124

Merged
merged 5 commits into from
Mar 31, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -69,7 +69,7 @@ public override async Task AddOrUpdateLeaseAsync(DocumentServiceLease lease)
}
catch (Exception ex)
{
await this.RemoveLeaseAsync(lease).ConfigureAwait(false);
await this.RemoveLeaseAsync(lease: lease, wasAcquired: false).ConfigureAwait(false);
await this.monitor.NotifyErrorAsync(lease.CurrentLeaseToken, ex);
throw;
}
Expand Down Expand Up @@ -98,7 +98,7 @@ private async Task LoadLeasesAsync()
await Task.WhenAll(addLeaseTasks.ToArray()).ConfigureAwait(false);
}

private async Task RemoveLeaseAsync(DocumentServiceLease lease)
private async Task RemoveLeaseAsync(DocumentServiceLease lease, bool wasAcquired)
{
if (!this.currentlyOwnedPartitions.TryRemove(lease.CurrentLeaseToken, out TaskCompletionSource<bool> worker))
{
Expand All @@ -111,6 +111,15 @@ private async Task RemoveLeaseAsync(DocumentServiceLease lease)

await this.monitor.NotifyLeaseReleaseAsync(lease.CurrentLeaseToken);
}
catch (LeaseLostException)
{
if (wasAcquired)
{
await this.monitor.NotifyLeaseReleaseAsync(lease.CurrentLeaseToken);
}

DefaultTrace.TraceVerbose("Lease with token {0}: taken by another host during release");
}
catch (Exception ex)
{
await this.monitor.NotifyErrorAsync(lease.CurrentLeaseToken, ex);
Expand Down Expand Up @@ -142,7 +151,7 @@ private async Task ProcessPartitionAsync(PartitionSupervisor partitionSupervisor
DefaultTrace.TraceWarning("Lease with token {0}: processing failed", lease.CurrentLeaseToken);
}

await this.RemoveLeaseAsync(lease).ConfigureAwait(false);
await this.RemoveLeaseAsync(lease: lease, wasAcquired: true).ConfigureAwait(false);
}

private async Task HandlePartitionGoneAsync(DocumentServiceLease lease, string lastContinuationToken)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -268,6 +268,53 @@ public async Task Controller_ShouldNotify_IfProcessingFails()

Mock.Get(this.leaseManager)
.Verify(manager => manager.ReleaseAsync(this.lease), Times.Once);

this.healthMonitor
.Verify(m => m.NotifyLeaseReleaseAsync(this.lease.CurrentLeaseToken), Times.Once);
}

[TestMethod]
public async Task Controller_ShouldNotify_IfProcessingFails_EvenOnLeaseLost()
{
Mock.Get(this.partitionProcessor)
.Reset();

Mock<PartitionSupervisor> supervisor = new Mock<PartitionSupervisor>();

Exception exception = new NotImplementedException();

ManualResetEvent manualResetEvent = new ManualResetEvent(false);

// Fail on Release
Mock.Get(this.leaseManager)
.Setup(manager => manager.ReleaseAsync(this.lease))
.ThrowsAsync(new Exceptions.LeaseLostException());

supervisor
.Setup(s => s.RunAsync(It.IsAny<CancellationToken>()))
.Callback((CancellationToken ct) =>
{
manualResetEvent.Set();
throw exception;
});

Mock.Get(this.partitionSupervisorFactory)
.Setup(f => f.Create(this.lease))
.Returns(supervisor.Object);

await this.sut.AddOrUpdateLeaseAsync(this.lease).ConfigureAwait(false);

bool timeout = manualResetEvent.WaitOne(100);
Assert.IsTrue(timeout, "Partition supervisor not started");

this.healthMonitor
.Verify(m => m.NotifyErrorAsync(this.lease.CurrentLeaseToken, exception), Times.Once);

Mock.Get(this.leaseManager)
.Verify(manager => manager.ReleaseAsync(this.lease), Times.Once);

this.healthMonitor
.Verify(m => m.NotifyLeaseReleaseAsync(this.lease.CurrentLeaseToken), Times.Once);
}

[TestMethod]
Expand Down Expand Up @@ -328,6 +375,80 @@ public async Task AddLease_ShouldntRunObserver_IfLeaseAcquireThrows()
.Verify(m => m.NotifyLeaseReleaseAsync(this.lease.CurrentLeaseToken), Times.Once);
}

[TestMethod]
public async Task AddLease_ShouldNotify_IfLeaseReleaseThrowsUnknown()
{
Mock.Get(this.partitionProcessor)
.Reset();

Mock.Get(this.leaseManager)
.Reset();

// Fail on Acquire to trigger Release
Mock.Get(this.leaseManager)
.Setup(manager => manager.AcquireAsync(this.lease))
.ThrowsAsync(new NullReferenceException());

// Fail on Release
Mock.Get(this.leaseManager)
.Setup(manager => manager.ReleaseAsync(this.lease))
.ThrowsAsync(new ArgumentException());

await Assert.ThrowsExceptionAsync<NullReferenceException>(() => this.sut.AddOrUpdateLeaseAsync(this.lease)).ConfigureAwait(false);

Mock.Get(this.partitionProcessor)
.Verify(processor => processor.RunAsync(It.IsAny<CancellationToken>()), Times.Never);

// Notify the Acquire error
this.healthMonitor
.Verify(m => m.NotifyErrorAsync(this.lease.CurrentLeaseToken, It.Is<Exception>(ex => ex is NullReferenceException)), Times.Once);

// Notify the Release error
this.healthMonitor
.Verify(m => m.NotifyErrorAsync(this.lease.CurrentLeaseToken, It.Is<Exception>(ex => ex is ArgumentException)), Times.Once);

this.healthMonitor
.Verify(m => m.NotifyLeaseAcquireAsync(this.lease.CurrentLeaseToken), Times.Never);

this.healthMonitor
.Verify(m => m.NotifyLeaseReleaseAsync(this.lease.CurrentLeaseToken), Times.Never);
}

[TestMethod]
public async Task AddLease_ShouldNotify_IfLeaseReleaseThrowsLeaseLost()
{
Mock.Get(this.partitionProcessor)
.Reset();

Mock.Get(this.leaseManager)
.Reset();

// Fail on Acquire to trigger Release
Mock.Get(this.leaseManager)
.Setup(manager => manager.AcquireAsync(this.lease))
.ThrowsAsync(new NullReferenceException());

// Fail on Release
Mock.Get(this.leaseManager)
.Setup(manager => manager.ReleaseAsync(this.lease))
.ThrowsAsync(new Exceptions.LeaseLostException());

await Assert.ThrowsExceptionAsync<NullReferenceException>(() => this.sut.AddOrUpdateLeaseAsync(this.lease)).ConfigureAwait(false);

Mock.Get(this.partitionProcessor)
.Verify(processor => processor.RunAsync(It.IsAny<CancellationToken>()), Times.Never);

// Notify the Acquire error
this.healthMonitor
.Verify(m => m.NotifyErrorAsync(this.lease.CurrentLeaseToken, It.Is<Exception>(ex => ex is NullReferenceException)), Times.Once);

this.healthMonitor
.Verify(m => m.NotifyLeaseAcquireAsync(this.lease.CurrentLeaseToken), Times.Never);

this.healthMonitor
.Verify(m => m.NotifyLeaseReleaseAsync(this.lease.CurrentLeaseToken), Times.Never);
}

[TestMethod]
public async Task Shutdown_ShouldNotify_Monitor()
{
Expand Down