Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Change Feed Processor: Adds backward compatibility of lease store #1733

Merged
merged 5 commits into from
Jul 29, 2020
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -132,7 +132,7 @@ private async Task ProcessPartitionAsync(PartitionSupervisor partitionSupervisor
catch (Exception e)
{
Extensions.TraceException(e);
DefaultTrace.TraceWarning("Lease with token {0}: processing failed", e, lease.CurrentLeaseToken);
DefaultTrace.TraceWarning("Lease with token {0}: processing failed", lease.CurrentLeaseToken);
}

await this.RemoveLeaseAsync(lease).ConfigureAwait(false);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,9 @@ internal sealed class DocumentServiceLeaseCore : DocumentServiceLease
{
private static readonly DateTime UnixStartTime = new DateTime(1970, 1, 1, 0, 0, 0, 0, DateTimeKind.Utc);

// Used to detect if the user is migrating from a V2 CFP schema
private bool isMigratingFromV2 = false;

public DocumentServiceLeaseCore()
{
}
Expand Down Expand Up @@ -41,12 +44,23 @@ public DocumentServiceLeaseCore(DocumentServiceLeaseCore other)
[JsonProperty("LeaseToken")]
public string LeaseToken { get; set; }

[JsonProperty("PartitionId")]
[JsonProperty("PartitionId", NullValueHandling = NullValueHandling.Ignore)]
private string PartitionId
{
get
{
if (this.isMigratingFromV2)
ealsur marked this conversation as resolved.
Show resolved Hide resolved
{
// If the user migrated the lease from V2 schema, we maintain the PartitionId property for backward compatibility
return this.LeaseToken;
}

return null;
}
set
{
this.LeaseToken = value;
this.isMigratingFromV2 = true;
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@ namespace Microsoft.Azure.Cosmos.SDK.EmulatorTests.ChangeFeed
using Microsoft.Azure.Cosmos.Query.Core;
using Microsoft.Azure.Cosmos.Resource.CosmosExceptions;
using Microsoft.VisualStudio.TestTools.UnitTesting;
using Newtonsoft.Json.Linq;

[TestClass]
[TestCategory("ChangeFeed")]
Expand Down Expand Up @@ -64,6 +65,140 @@ public async Task WritesTriggerDelegate_WithLeaseContainer()
CollectionAssert.AreEqual(expectedIds.ToList(), receivedIds);
}

[TestMethod]
public async Task Schema_DefaultsToNoPartitionId()
{
ChangeFeedProcessor processor = this.Container
.GetChangeFeedProcessorBuilder("test", (IReadOnlyCollection<TestClass> docs, CancellationToken token) =>
{
return Task.CompletedTask;
})
.WithInstanceName("random")
.WithLeaseContainer(this.LeaseContainer).Build();

await processor.StartAsync();
// Letting processor initialize
await Task.Delay(BaseChangeFeedClientHelper.ChangeFeedSetupTime);

// Verify that no leases have PartitionId (V2 contract)
using FeedIterator<dynamic> iterator = this.LeaseContainer.GetItemQueryIterator<dynamic>();
while (iterator.HasMoreResults)
{
FeedResponse<dynamic> page = await iterator.ReadNextAsync();
foreach (dynamic lease in page)
{
string leaseId = lease.id;
if (leaseId.Contains(".info") || leaseId.Contains(".lock"))
{
// These are the store initialization marks
continue;
}

Assert.IsNotNull(lease.LeaseToken);
Assert.IsNull(lease.PartitionId);
}
}

await processor.StopAsync();
}

/// <summary>
/// When the user migrates from V2 CFP, the leases contain PartitionId.
/// To allow for backward compatibility (V2 -> V3 -> V2) we need to honor the existence of PartitionId and maintain its value.
/// </summary>
[TestMethod]
public async Task Schema_OnV2MigrationMaintainPartitionId()
{
IEnumerable<int> expectedIds = Enumerable.Range(0, 20);
List<int> receivedIds = new List<int>();
ChangeFeedProcessor processor = this.Container
.GetChangeFeedProcessorBuilder("test", (IReadOnlyCollection<TestClass> docs, CancellationToken token) =>
{
foreach (TestClass doc in docs)
{
receivedIds.Add(int.Parse(doc.id));
}

return Task.CompletedTask;
})
.WithInstanceName("random")
.WithLeaseContainer(this.LeaseContainer).Build();

await processor.StartAsync();
// Letting processor initialize
await Task.Delay(BaseChangeFeedClientHelper.ChangeFeedSetupTime);

// Inserting some documents
foreach (int id in expectedIds.Take(10))
{
await this.Container.CreateItemAsync<dynamic>(new { id = id.ToString() });
}

// Waiting on all notifications to finish
await Task.Delay(BaseChangeFeedClientHelper.ChangeFeedCleanupTime);
await processor.StopAsync();

// At this point we have leases for V3, so we will simulate V2 by manually adding PartitionId and removing LeaseToken
using FeedIterator<JObject> iterator = this.LeaseContainer.GetItemQueryIterator<JObject>();
while (iterator.HasMoreResults)
{
FeedResponse<JObject> page = await iterator.ReadNextAsync();
foreach (JObject lease in page)
{
string leaseId = lease.Value<string>("id");
if (leaseId.Contains(".info") || leaseId.Contains(".lock"))
{
// These are the store initialization marks
continue;
}

// create the PartitionId property
lease.Add("PartitionId", lease.Value<string>("LeaseToken"));

lease.Remove("LeaseToken");

await this.LeaseContainer.UpsertItemAsync<JObject>(lease);
}
}

// Now all leases are V2 leases, create the rest of the documents
foreach (int id in expectedIds.TakeLast(10))
{
await this.Container.CreateItemAsync<dynamic>(new { id = id.ToString() });
}

await processor.StartAsync();
// Letting processor initialize
await Task.Delay(BaseChangeFeedClientHelper.ChangeFeedSetupTime);

// Waiting on all notifications to finish, should be using PartitionId from the V2 lease
await Task.Delay(BaseChangeFeedClientHelper.ChangeFeedCleanupTime);
await processor.StopAsync();

// Verify we processed all items (including when using the V2 leases)
CollectionAssert.AreEqual(expectedIds.ToList(), receivedIds);

// Verify the after-migration leases have both PartitionId and LeaseToken with the same value
using FeedIterator<dynamic> iteratorAfter = this.LeaseContainer.GetItemQueryIterator<dynamic>();
while (iteratorAfter.HasMoreResults)
{
FeedResponse<dynamic> page = await iteratorAfter.ReadNextAsync();
foreach (dynamic lease in page)
{
string leaseId = lease.id;
if (leaseId.Contains(".info") || leaseId.Contains(".lock"))
{
// These are the store initialization marks
continue;
}

Assert.IsNotNull(lease.LeaseToken, "LeaseToken is missing after migration of lease schema");
Assert.IsNotNull(lease.PartitionId, "PartitionId is missing after migration of lease schema");
Assert.AreEqual(lease.LeaseToken, lease.PartitionId, "LeaseToken and PartitionId should be equal after migration");
}
}
}

[TestMethod]
public async Task NotExistentLeaseContainer()
{
Expand Down