Skip to content

Commit

Permalink
[Internal] Change Feed: Adds change feed retention policy to containe…
Browse files Browse the repository at this point in the history
…r configuration (#2020)

Adding internal APIs for retention policy
  • Loading branch information
ealsur authored Jan 6, 2021
1 parent 1fe9697 commit 29c960b
Show file tree
Hide file tree
Showing 7 changed files with 244 additions and 2 deletions.
Original file line number Diff line number Diff line change
@@ -0,0 +1,43 @@
//------------------------------------------------------------
// Copyright (c) Microsoft Corporation. All rights reserved.
//------------------------------------------------------------

namespace Microsoft.Azure.Cosmos.Fluent
{
using System;

/// <summary>
/// <see cref="ChangeFeedPolicy"/> fluent definition.
/// </summary>
internal class ChangeFeedPolicyDefinition
{
private readonly ContainerBuilder parent;
private readonly Action<ChangeFeedPolicy> attachCallback;
private TimeSpan changeFeedPolicyRetention;

internal ChangeFeedPolicyDefinition(
ContainerBuilder parent,
TimeSpan retention,
Action<ChangeFeedPolicy> attachCallback)
{
this.parent = parent ?? throw new ArgumentNullException(nameof(parent));
this.attachCallback = attachCallback ?? throw new ArgumentNullException(nameof(attachCallback));
this.changeFeedPolicyRetention = retention;
}

/// <summary>
/// Applies the current definition to the parent.
/// </summary>
/// <returns>An instance of the parent.</returns>
public ContainerBuilder Attach()
{
ChangeFeedPolicy resolutionPolicy = new ChangeFeedPolicy
{
FullFidelityRetention = this.changeFeedPolicyRetention
};

this.attachCallback(resolutionPolicy);
return this.parent;
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,7 @@ internal ConflictResolutionDefinition(
/// Defines the path used to resolve LastWrtierWins resolution mode <see cref="ConflictResolutionPolicy"/>.
/// </summary>
/// <param name="conflictResolutionPath"> sets the path which is present in each item in the Azure Cosmos DB service for last writer wins conflict-resolution. <see cref="ConflictResolutionPolicy.ResolutionPath"/>.</param>
/// <returns>An instance of the current <see cref="UniqueKeyDefinition"/>.</returns>
/// <returns>An instance of the current <see cref="ConflictResolutionDefinition"/>.</returns>
public ConflictResolutionDefinition WithLastWriterWinsResolution(string conflictResolutionPath)
{
if (string.IsNullOrEmpty(conflictResolutionPath))
Expand All @@ -46,7 +46,7 @@ public ConflictResolutionDefinition WithLastWriterWinsResolution(string conflict
/// </summary>
/// <param name="conflictResolutionProcedure"> Sets the stored procedure's name to be used for conflict-resolution.</param>
/// <remarks>The stored procedure can be created later on, but needs to honor the name specified here.</remarks>
/// <returns>An instance of the current <see cref="UniqueKeyDefinition"/>.</returns>
/// <returns>An instance of the current <see cref="ConflictResolutionDefinition"/>.</returns>
/// <example>
/// This example below creates a <see cref="Container"/> with a Conflict Resolution policy that uses a stored procedure to resolve conflicts:
/// <code language="c#">
Expand Down
24 changes: 24 additions & 0 deletions Microsoft.Azure.Cosmos/src/Fluent/Settings/ContainerBuilder.cs
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ public class ContainerBuilder : ContainerDefinition<ContainerBuilder>
private readonly Uri containerUri;
private UniqueKeyPolicy uniqueKeyPolicy;
private ConflictResolutionPolicy conflictResolutionPolicy;
private ChangeFeedPolicy changeFeedPolicy;

/// <summary>
/// Creates an instance for unit-testing
Expand Down Expand Up @@ -60,6 +61,19 @@ public ConflictResolutionDefinition WithConflictResolution()
(conflictPolicy) => this.AddConflictResolution(conflictPolicy));
}

/// <summary>
/// Defined the change feed policy for this Azure Cosmos container
/// </summary>
/// <param name="retention"> Indicates for how long operation logs have to be retained. <see cref="ChangeFeedPolicy.FullFidelityRetention"/>.</param>
/// <returns>An instance of <see cref="ChangeFeedPolicyDefinition"/>.</returns>
internal ChangeFeedPolicyDefinition WithChangeFeedPolicy(TimeSpan retention)
{
return new ChangeFeedPolicyDefinition(
this,
retention,
(changeFeedPolicy) => this.AddChangeFeedPolicy(changeFeedPolicy));
}

/// <summary>
/// Creates a container with the current fluent definition.
/// </summary>
Expand Down Expand Up @@ -156,6 +170,11 @@ public async Task<ContainerResponse> CreateIfNotExistsAsync(
containerProperties.ConflictResolutionPolicy = this.conflictResolutionPolicy;
}

if (this.changeFeedPolicy != null)
{
containerProperties.ChangeFeedPolicy = this.changeFeedPolicy;
}

return containerProperties;
}

Expand All @@ -180,5 +199,10 @@ private void AddConflictResolution(ConflictResolutionPolicy conflictResolutionPo

this.conflictResolutionPolicy = conflictResolutionPolicy;
}

private void AddChangeFeedPolicy(ChangeFeedPolicy changeFeedPolicy)
{
this.changeFeedPolicy = changeFeedPolicy;
}
}
}
67 changes: 67 additions & 0 deletions Microsoft.Azure.Cosmos/src/Resource/Settings/ChangeFeedPolicy.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,67 @@
//------------------------------------------------------------
// Copyright (c) Microsoft Corporation. All rights reserved.
//------------------------------------------------------------

namespace Microsoft.Azure.Cosmos
{
using System;
using Microsoft.Azure.Documents;
using Newtonsoft.Json;

/// <summary>
/// Represents the change feed policy configuration for a container in the Azure Cosmos DB service.
/// </summary>
/// <example>
/// The example below creates a new container with a custom change feed policy.
/// <code language="c#">
/// <![CDATA[
/// ContainerProperties containerProperties = new ContainerProperties("MyCollection", "/country");
/// containerProperties.ChangeFeedPolicy.RetentionDuration = TimeSpan.FromMinutes(5);
///
/// CosmosContainerResponse containerCreateResponse = await client.GetDatabase("dbName").CreateContainerAsync(containerProperties, 5000);
/// ContainerProperties createdContainerProperties = containerCreateResponse.Container;
/// ]]>
/// </code>
/// </example>
/// <seealso cref="ContainerProperties"/>
internal sealed class ChangeFeedPolicy
{
[JsonProperty(PropertyName = Constants.Properties.LogRetentionDuration)]
private int retentionDurationInMinutes = 0;

/// <summary>
/// Gets or sets a value that indicates for how long operation logs have to be retained.
/// </summary>
/// <remarks>
/// Minimum granularity supported is minutes.
/// </remarks>
/// <value>
/// Value is in TimeSpan.
/// </value>
[JsonIgnore]
public TimeSpan FullFidelityRetention
{
get => TimeSpan.FromMinutes(this.retentionDurationInMinutes);
set
{
if (value.Seconds > 0
|| value.Milliseconds > 0)
{
throw new ArgumentOutOfRangeException(nameof(this.FullFidelityRetention), "Retention's granularity is minutes.");
}

if (value.TotalMilliseconds < 0)
{
throw new ArgumentOutOfRangeException(nameof(this.FullFidelityRetention), "Retention cannot be negative.");
}

this.retentionDurationInMinutes = (int)value.TotalMinutes;
}
}

/// <summary>
/// Disables the retention log.
/// </summary>
public static TimeSpan FullFidelityNoRetention => TimeSpan.Zero;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -58,6 +58,9 @@ public class ContainerProperties
{
private static readonly char[] partitionKeyTokenDelimeter = new char[] { '/' };

[JsonProperty(PropertyName = Constants.Properties.ChangeFeedPolicy, NullValueHandling = NullValueHandling.Ignore)]
private ChangeFeedPolicy changeFeedPolicyInternal;

[JsonProperty(PropertyName = Constants.Properties.IndexingPolicy, NullValueHandling = NullValueHandling.Ignore)]
private IndexingPolicy indexingPolicyInternal;

Expand Down Expand Up @@ -261,6 +264,27 @@ public IndexingPolicy IndexingPolicy
}
}

/// <summary>
/// Gets the <see cref="ChangeFeedPolicy"/> associated with the container from the Azure Cosmos DB service.
/// </summary>
/// <value>
/// The change feed policy associated with the container.
/// </value>
[JsonIgnore]
internal ChangeFeedPolicy ChangeFeedPolicy
{
get
{
if (this.changeFeedPolicyInternal == null)
{
this.changeFeedPolicyInternal = new ChangeFeedPolicy();
}

return this.changeFeedPolicyInternal;
}
set => this.changeFeedPolicyInternal = value;
}

/// <summary>
/// Gets the <see cref="GeospatialConfig"/> associated with the collection from the Azure Cosmos DB service.
/// </summary>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -343,6 +343,38 @@ await databaseForConflicts.DefineContainer(containerName, partitionKeyPath)
}
}

[TestMethod]
[Ignore] // Until emulator with support is released
public async Task TestChangeFeedPolicy()
{
Database databaseForChangeFeed = await this.cosmosClient.CreateDatabaseAsync("changeFeedRetentionContainerTest",
cancellationToken: this.cancellationToken);

try
{
string containerName = "changeFeedRetentionContainerTest";
string partitionKeyPath = "/users";
TimeSpan retention = TimeSpan.FromMinutes(10);

ContainerResponse containerResponse =
await databaseForChangeFeed.DefineContainer(containerName, partitionKeyPath)
.WithChangeFeedPolicy(retention)
.Attach()
.CreateAsync();

Assert.AreEqual(HttpStatusCode.Created, containerResponse.StatusCode);
Assert.AreEqual(containerName, containerResponse.Resource.Id);
Assert.AreEqual(partitionKeyPath, containerResponse.Resource.PartitionKey.Paths.First());
ContainerProperties containerSettings = containerResponse.Resource;
Assert.IsNotNull(containerSettings.ChangeFeedPolicy);
Assert.AreEqual(retention.TotalMinutes, containerSettings.ChangeFeedPolicy.FullFidelityRetention.TotalMinutes);
}
finally
{
await databaseForChangeFeed.DeleteAsync();
}
}

[TestMethod]
public async Task WithIndexingPolicy()
{
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -354,6 +354,7 @@ public void ContainerSettingsDefaults()
"Id",
"UniqueKeyPolicy",
"DefaultTimeToLive",
"ChangeFeedPolicy",
"AnalyticalStoreTimeToLiveInSeconds",
"IndexingPolicy",
"GeospatialConfig",
Expand All @@ -374,6 +375,7 @@ public void ContainerSettingsDefaults()
Assert.IsNull(cosmosContainerSettings.DefaultTimeToLive);

Assert.IsNotNull(cosmosContainerSettings.IndexingPolicy);
Assert.IsNotNull(cosmosContainerSettings.ChangeFeedPolicy);
Assert.IsNotNull(cosmosContainerSettings.ConflictResolutionPolicy);
Assert.IsTrue(object.ReferenceEquals(cosmosContainerSettings.IndexingPolicy, cosmosContainerSettings.IndexingPolicy));
Assert.IsNotNull(cosmosContainerSettings.IndexingPolicy.IncludedPaths);
Expand Down Expand Up @@ -609,6 +611,56 @@ public void ConflictSettingsDeSerializeTest()
Assert.AreEqual("Conflict1", conflictSettings.Id);
}

[TestMethod]
public void ChangeFeedPolicySerialization()
{
ContainerProperties containerSettings = new ContainerProperties("TestContainer", "/partitionKey");
string serialization = JsonConvert.SerializeObject(containerSettings);
Assert.IsFalse(serialization.Contains(Constants.Properties.ChangeFeedPolicy), "Change Feed Policy should not be included by default");

TimeSpan desiredTimeSpan = TimeSpan.FromHours(1);
containerSettings.ChangeFeedPolicy = new Cosmos.ChangeFeedPolicy() { FullFidelityRetention = desiredTimeSpan };
string serializationWithValues = JsonConvert.SerializeObject(containerSettings);
Assert.IsTrue(serializationWithValues.Contains(Constants.Properties.ChangeFeedPolicy), "Change Feed Policy should be included");
Assert.IsTrue(serializationWithValues.Contains(Constants.Properties.LogRetentionDuration), "Change Feed Policy retention should be included");

JObject parsed = JObject.Parse(serializationWithValues);
JToken retentionValue = parsed[Constants.Properties.ChangeFeedPolicy][Constants.Properties.LogRetentionDuration];
Assert.AreEqual(JTokenType.Integer, retentionValue.Type, "Change Feed Policy serialized retention should be an integer");
Assert.AreEqual((int)desiredTimeSpan.TotalMinutes, retentionValue.Value<int>(), "Change Feed Policy serialized retention value incorrect");
}

[TestMethod]
public void ChangeFeedPolicySerialization_Disabled()
{
ContainerProperties containerSettings = new ContainerProperties("TestContainer", "/partitionKey");
string serialization = JsonConvert.SerializeObject(containerSettings);
Assert.IsFalse(serialization.Contains(Constants.Properties.ChangeFeedPolicy), "Change Feed Policy should not be included by default");

TimeSpan desiredTimeSpan = TimeSpan.FromHours(1);
containerSettings.ChangeFeedPolicy = new Cosmos.ChangeFeedPolicy() { FullFidelityRetention = Cosmos.ChangeFeedPolicy.FullFidelityNoRetention };
string serializationWithValues = JsonConvert.SerializeObject(containerSettings);
Assert.IsTrue(serializationWithValues.Contains(Constants.Properties.ChangeFeedPolicy), "Change Feed Policy should be included");
Assert.IsTrue(serializationWithValues.Contains(Constants.Properties.LogRetentionDuration), "Change Feed Policy retention should be included");

JObject parsed = JObject.Parse(serializationWithValues);
JToken retentionValue = parsed[Constants.Properties.ChangeFeedPolicy][Constants.Properties.LogRetentionDuration];
Assert.AreEqual(JTokenType.Integer, retentionValue.Type, "Change Feed Policy serialized retention should be an integer");
Assert.AreEqual(0, retentionValue.Value<int>(), "Change Feed Policy serialized retention value incorrect");
}

[TestMethod]
public void ChangeFeedPolicySerialization_InvalidValues()
{
ContainerProperties containerSettings = new ContainerProperties("TestContainer", "/partitionKey");
string serialization = JsonConvert.SerializeObject(containerSettings);
Assert.IsFalse(serialization.Contains(Constants.Properties.ChangeFeedPolicy), "Change Feed Policy should not be included by default");

Assert.ThrowsException<ArgumentOutOfRangeException>(() => new Cosmos.ChangeFeedPolicy() { FullFidelityRetention = TimeSpan.FromSeconds(10) });
Assert.ThrowsException<ArgumentOutOfRangeException>(() => new Cosmos.ChangeFeedPolicy() { FullFidelityRetention = TimeSpan.FromMilliseconds(10) });
Assert.ThrowsException<ArgumentOutOfRangeException>(() => new Cosmos.ChangeFeedPolicy() { FullFidelityRetention = TimeSpan.FromSeconds(-10) });
}

private static T CosmosDeserialize<T>(string payload)
{
using (MemoryStream ms = new MemoryStream())
Expand Down

0 comments on commit 29c960b

Please sign in to comment.