Skip to content

Commit

Permalink
Change Feed Processor: Fixes LeaseLostException leaks on notification…
Browse files Browse the repository at this point in the history
… APIs for Renew scenarios (#3775)

* Adding cases

* Tests
  • Loading branch information
ealsur authored Mar 23, 2023
1 parent 1acfaa2 commit d0c0578
Show file tree
Hide file tree
Showing 2 changed files with 239 additions and 4 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -190,7 +190,17 @@ await this.leaseUpdater.UpdateLeaseAsync(
if (serverLease.Owner != lease.Owner)
{
DefaultTrace.TraceInformation("Lease with token {0} no need to release lease. The lease was already taken by another host '{1}'.", lease.CurrentLeaseToken, serverLease.Owner);
throw new LeaseLostException(lease);
throw new LeaseLostException(
lease,
CosmosExceptionFactory.Create(
statusCode: HttpStatusCode.PreconditionFailed,
message: $"{lease.CurrentLeaseToken} lease token was taken over by owner '{serverLease.Owner}'",
headers: new Headers(),
stackTrace: default,
trace: NoOpTrace.Singleton,
error: default,
innerException: default),
isGone: false);
}
serverLease.Owner = null;
return serverLease;
Expand Down Expand Up @@ -232,7 +242,17 @@ public override async Task<DocumentServiceLease> RenewAsync(DocumentServiceLease
if (serverLease.Owner != lease.Owner)
{
DefaultTrace.TraceInformation("Lease with token {0} was taken over by owner '{1}'", lease.CurrentLeaseToken, serverLease.Owner);
throw new LeaseLostException(lease);
throw new LeaseLostException(
lease,
CosmosExceptionFactory.Create(
statusCode: HttpStatusCode.PreconditionFailed,
message: $"{lease.CurrentLeaseToken} lease token was taken over by owner '{serverLease.Owner}'",
headers: new Headers(),
stackTrace: default,
trace: NoOpTrace.Singleton,
error: default,
innerException: default),
isGone: false);
}
return serverLease;
}).ConfigureAwait(false);
Expand All @@ -245,7 +265,17 @@ public override async Task<DocumentServiceLease> UpdatePropertiesAsync(DocumentS
if (lease.Owner != this.options.HostName)
{
DefaultTrace.TraceInformation("Lease with token '{0}' was taken over by owner '{1}' before lease properties update", lease.CurrentLeaseToken, lease.Owner);
throw new LeaseLostException(lease);
throw new LeaseLostException(
lease,
CosmosExceptionFactory.Create(
statusCode: HttpStatusCode.PreconditionFailed,
message: $"{lease.CurrentLeaseToken} lease token was taken over by owner '{lease.Owner}'",
headers: new Headers(),
stackTrace: default,
trace: NoOpTrace.Singleton,
error: default,
innerException: default),
isGone: false);
}

return await this.leaseUpdater.UpdateLeaseAsync(
Expand All @@ -257,7 +287,17 @@ public override async Task<DocumentServiceLease> UpdatePropertiesAsync(DocumentS
if (serverLease.Owner != lease.Owner)
{
DefaultTrace.TraceInformation("Lease with token '{0}' was taken over by owner '{1}'", lease.CurrentLeaseToken, serverLease.Owner);
throw new LeaseLostException(lease);
throw new LeaseLostException(
lease,
CosmosExceptionFactory.Create(
statusCode: HttpStatusCode.PreconditionFailed,
message: $"{lease.CurrentLeaseToken} lease token was taken over by owner '{serverLease.Owner}'",
headers: new Headers(),
stackTrace: default,
trace: NoOpTrace.Singleton,
error: default,
innerException: default),
isGone: false);
}
serverLease.Properties = lease.Properties;
return serverLease;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -356,6 +356,201 @@ public async Task IfOwnerChangedThrow()
&& innerCosmosException.StatusCode == HttpStatusCode.PreconditionFailed);
}

/// <summary>
/// Verifies that if the renewed read a different Owner from the captured in memory, throws a LeaseLost
/// </summary>
[TestMethod]
public async Task IfOwnerChangedThrowOnRenew()
{
DocumentServiceLeaseStoreManagerOptions options = new DocumentServiceLeaseStoreManagerOptions
{
HostName = Guid.NewGuid().ToString()
};

DocumentServiceLeaseCore lease = new DocumentServiceLeaseCore()
{
LeaseToken = "0",
Owner = Guid.NewGuid().ToString(),
FeedRange = new FeedRangePartitionKeyRange("0")
};

Mock<DocumentServiceLeaseUpdater> mockUpdater = new Mock<DocumentServiceLeaseUpdater>();

Func<Func<DocumentServiceLease, DocumentServiceLease>, bool> validateUpdater = (Func<DocumentServiceLease, DocumentServiceLease> updater) =>
{
// Simulate dirty read from db
DocumentServiceLeaseCore serverLease = new DocumentServiceLeaseCore()
{
LeaseToken = "0",
Owner = Guid.NewGuid().ToString(),
FeedRange = new FeedRangePartitionKeyRange("0")
};
DocumentServiceLease afterUpdateLease = updater(serverLease);
return true;
};

mockUpdater.Setup(c => c.UpdateLeaseAsync(
It.IsAny<DocumentServiceLease>(),
It.IsAny<string>(),
It.IsAny<PartitionKey>(),
It.Is<Func<DocumentServiceLease, DocumentServiceLease>>(f => validateUpdater(f))))
.ReturnsAsync(lease);

ResponseMessage leaseResponse = new ResponseMessage(System.Net.HttpStatusCode.OK)
{
Content = new CosmosJsonDotNetSerializer().ToStream(lease)
};

Mock<ContainerInternal> leaseContainer = new Mock<ContainerInternal>();
leaseContainer.Setup(c => c.ReadItemStreamAsync(
It.IsAny<string>(),
It.IsAny<PartitionKey>(),
It.IsAny<ItemRequestOptions>(),
It.IsAny<CancellationToken>())).ReturnsAsync(leaseResponse);

DocumentServiceLeaseManagerCosmos documentServiceLeaseManagerCosmos = new DocumentServiceLeaseManagerCosmos(
Mock.Of<ContainerInternal>(),
leaseContainer.Object,
mockUpdater.Object,
options,
Mock.Of<RequestOptionsFactory>());

LeaseLostException leaseLost = await Assert.ThrowsExceptionAsync<LeaseLostException>(() => documentServiceLeaseManagerCosmos.RenewAsync(lease));

Assert.IsTrue(leaseLost.InnerException is CosmosException innerCosmosException
&& innerCosmosException.StatusCode == HttpStatusCode.PreconditionFailed);
}

/// <summary>
/// Verifies that if the update properties read a different Owner from the captured in memory, throws a LeaseLost
/// </summary>
[TestMethod]
public async Task IfOwnerChangedThrowOnUpdateProperties()
{
DocumentServiceLeaseCore lease = new DocumentServiceLeaseCore()
{
LeaseToken = "0",
Owner = Guid.NewGuid().ToString(),
FeedRange = new FeedRangePartitionKeyRange("0")
};

DocumentServiceLeaseStoreManagerOptions options = new DocumentServiceLeaseStoreManagerOptions
{
HostName = lease.Owner
};

Mock<DocumentServiceLeaseUpdater> mockUpdater = new Mock<DocumentServiceLeaseUpdater>();

Func<Func<DocumentServiceLease, DocumentServiceLease>, bool> validateUpdater = (Func<DocumentServiceLease, DocumentServiceLease> updater) =>
{
// Simulate dirty read from db
DocumentServiceLeaseCore serverLease = new DocumentServiceLeaseCore()
{
LeaseToken = "0",
Owner = Guid.NewGuid().ToString(),
FeedRange = new FeedRangePartitionKeyRange("0")
};
DocumentServiceLease afterUpdateLease = updater(serverLease);
return true;
};

mockUpdater.Setup(c => c.UpdateLeaseAsync(
It.IsAny<DocumentServiceLease>(),
It.IsAny<string>(),
It.IsAny<PartitionKey>(),
It.Is<Func<DocumentServiceLease, DocumentServiceLease>>(f => validateUpdater(f))))
.ReturnsAsync(lease);

ResponseMessage leaseResponse = new ResponseMessage(System.Net.HttpStatusCode.OK)
{
Content = new CosmosJsonDotNetSerializer().ToStream(lease)
};

Mock<ContainerInternal> leaseContainer = new Mock<ContainerInternal>();
leaseContainer.Setup(c => c.ReadItemStreamAsync(
It.IsAny<string>(),
It.IsAny<PartitionKey>(),
It.IsAny<ItemRequestOptions>(),
It.IsAny<CancellationToken>())).ReturnsAsync(leaseResponse);

DocumentServiceLeaseManagerCosmos documentServiceLeaseManagerCosmos = new DocumentServiceLeaseManagerCosmos(
Mock.Of<ContainerInternal>(),
leaseContainer.Object,
mockUpdater.Object,
options,
Mock.Of<RequestOptionsFactory>());

LeaseLostException leaseLost = await Assert.ThrowsExceptionAsync<LeaseLostException>(() => documentServiceLeaseManagerCosmos.UpdatePropertiesAsync(lease));

Assert.IsTrue(leaseLost.InnerException is CosmosException innerCosmosException
&& innerCosmosException.StatusCode == HttpStatusCode.PreconditionFailed);
}

/// <summary>
/// Verifies that if the update properties read a different Owner from the captured in memory, throws a LeaseLost
/// </summary>
[TestMethod]
public async Task IfOwnerChangedThrowOnRelease()
{
DocumentServiceLeaseStoreManagerOptions options = new DocumentServiceLeaseStoreManagerOptions
{
HostName = Guid.NewGuid().ToString()
};

DocumentServiceLeaseCore lease = new DocumentServiceLeaseCore()
{
LeaseToken = "0",
Owner = Guid.NewGuid().ToString(),
FeedRange = new FeedRangePartitionKeyRange("0")
};

Mock<DocumentServiceLeaseUpdater> mockUpdater = new Mock<DocumentServiceLeaseUpdater>();

Func<Func<DocumentServiceLease, DocumentServiceLease>, bool> validateUpdater = (Func<DocumentServiceLease, DocumentServiceLease> updater) =>
{
// Simulate dirty read from db
DocumentServiceLeaseCore serverLease = new DocumentServiceLeaseCore()
{
LeaseToken = "0",
Owner = Guid.NewGuid().ToString(),
FeedRange = new FeedRangePartitionKeyRange("0")
};
DocumentServiceLease afterUpdateLease = updater(serverLease);
return true;
};

mockUpdater.Setup(c => c.UpdateLeaseAsync(
It.IsAny<DocumentServiceLease>(),
It.IsAny<string>(),
It.IsAny<PartitionKey>(),
It.Is<Func<DocumentServiceLease, DocumentServiceLease>>(f => validateUpdater(f))))
.ReturnsAsync(lease);

ResponseMessage leaseResponse = new ResponseMessage(System.Net.HttpStatusCode.OK)
{
Content = new CosmosJsonDotNetSerializer().ToStream(lease)
};

Mock<ContainerInternal> leaseContainer = new Mock<ContainerInternal>();
leaseContainer.Setup(c => c.ReadItemStreamAsync(
It.IsAny<string>(),
It.IsAny<PartitionKey>(),
It.IsAny<ItemRequestOptions>(),
It.IsAny<CancellationToken>())).ReturnsAsync(leaseResponse);

DocumentServiceLeaseManagerCosmos documentServiceLeaseManagerCosmos = new DocumentServiceLeaseManagerCosmos(
Mock.Of<ContainerInternal>(),
leaseContainer.Object,
mockUpdater.Object,
options,
Mock.Of<RequestOptionsFactory>());

LeaseLostException leaseLost = await Assert.ThrowsExceptionAsync<LeaseLostException>(() => documentServiceLeaseManagerCosmos.ReleaseAsync(lease));

Assert.IsTrue(leaseLost.InnerException is CosmosException innerCosmosException
&& innerCosmosException.StatusCode == HttpStatusCode.PreconditionFailed);
}

/// <summary>
/// When a lease is missing the range information, check that we are adding it
/// </summary>
Expand Down

0 comments on commit d0c0578

Please sign in to comment.