Skip to content

Commit

Permalink
Samples: Adds sample to demonstrate reencryption using Always Encrypt…
Browse files Browse the repository at this point in the history
…ed Cosmos SDK. (Azure#2825)

Sample/Driver - demonstrates, how reencryption of encrypted data in Cosmos DB can be carried out using Always Encrypted CosmosDB SDK (Client-Side encryption). This can be used to change/rotate data encryption keys or change the client encryption policy.

Initially there will be no support for reencrypting the data when there are active changes taking place in the source containers, so the flag IsFFChangeFeedSupported(in Constants.cs file) value has been set to false. Full Fidelity change feed is in Preview mode and has to be enabled on an account to use the feature. This allows for reencryption
to be carried out, when the source container is still receiving changes. This can be set to true when the feature is available or is enabled on the account.
  • Loading branch information
kr-santosh authored Jan 7, 2022
1 parent 1aa7582 commit ec8b8bb
Show file tree
Hide file tree
Showing 16 changed files with 1,564 additions and 0 deletions.
6 changes: 6 additions & 0 deletions Microsoft.Azure.Cosmos.Samples/Usage/Cosmos.Samples.Usage.sln
Original file line number Diff line number Diff line change
Expand Up @@ -47,6 +47,8 @@ Project("{9A19103F-16F7-4668-BE54-9A1E7A4F7556}") = "Geospatial", "Geospatial\Ge
EndProject
Project("{9A19103F-16F7-4668-BE54-9A1E7A4F7556}") = "SystemTextJson", "SystemTextJson\SystemTextJson.csproj", "{C0C25501-9C49-4B6F-BB7D-40E4FB47CE7B}"
EndProject
Project("{9A19103F-16F7-4668-BE54-9A1E7A4F7556}") = "ReEncryption", "ReEncryption\ReEncryption.csproj", "{AD6DEC94-EF8E-4C42-8A99-B98E101F933D}"
EndProject
Global
GlobalSection(SolutionConfigurationPlatforms) = preSolution
Debug|Any CPU = Debug|Any CPU
Expand Down Expand Up @@ -141,6 +143,10 @@ Global
{C0C25501-9C49-4B6F-BB7D-40E4FB47CE7B}.Debug|Any CPU.Build.0 = Debug|Any CPU
{C0C25501-9C49-4B6F-BB7D-40E4FB47CE7B}.Release|Any CPU.ActiveCfg = Release|Any CPU
{C0C25501-9C49-4B6F-BB7D-40E4FB47CE7B}.Release|Any CPU.Build.0 = Release|Any CPU
{AD6DEC94-EF8E-4C42-8A99-B98E101F933D}.Debug|Any CPU.ActiveCfg = Debug|Any CPU
{AD6DEC94-EF8E-4C42-8A99-B98E101F933D}.Debug|Any CPU.Build.0 = Debug|Any CPU
{AD6DEC94-EF8E-4C42-8A99-B98E101F933D}.Release|Any CPU.ActiveCfg = Release|Any CPU
{AD6DEC94-EF8E-4C42-8A99-B98E101F933D}.Release|Any CPU.Build.0 = Release|Any CPU
EndGlobalSection
GlobalSection(SolutionProperties) = preSolution
HideSolutionNode = FALSE
Expand Down
Loading
Sorry, something went wrong. Reload?
Sorry, we cannot display this file.
Sorry, this file is invalid so it cannot be displayed.
435 changes: 435 additions & 0 deletions Microsoft.Azure.Cosmos.Samples/Usage/ReEncryption/Program.cs

Large diffs are not rendered by default.

97 changes: 97 additions & 0 deletions Microsoft.Azure.Cosmos.Samples/Usage/ReEncryption/README.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,97 @@
# Microsoft Azure Cosmos DB Reencryption Tool

Microsoft Azure Cosmos DB Reencryption Tool consists of a sample or driver code to demonstrates, how re-encryption of encrypted data in CosmosDB can be carried out using Always Encrypted CosmosDB SDK Client-Side encryption.
This can be used to change/rotate Data Encryption Keys or change the Client Encryption Policy. This can also be used to migrate your data from regular container to an encryption container.
The sample code shows how the APIs which help with re-encryption can be used. The core library for these APIs are located in ReEncryptionSupport folder within the same project.

## IMPORTANT

* The sample code just demonstrates how the exposed APIs can be used to carry out re-encryption of data. This is meant to be used only as tutorial to understand the usage of the below said APIs. It does not cover cases like retries when failures occur
or optimization of re-encryption like parallelization of the individual tasks etc.

* The tool currently does not support reencrytion if the source container is receiving active changes.

* There is no Service Level Agreement for this.


## Getting Started

## How to use the sample

The sample creates separate re-encryption tasks for each of the feed range and saves the continuation token if the task is interrupted in between or if the user decides to stop the re-encryption and decides to continue it later.
The user gets the option to use an existing continuationToken/bookmark from the last saved checkpoint which was saved earlier in a file. If the re-encryption activity is finished the user can discard the file.



![ReadMe_Image01.png (1098×403) (github.com)](./Images/ReadMe_Image01.png)



The following fields in the sample code pick up the required details from the application settings file(AppSettings.json).

* SrcDatabase - Database containing the containers that needs to be reencrypted with new data encryption key or if a change in encryption policy is required.

* SrcContainer - Container Id which requires a re-encryption of data or change in encryption policy(you might want to add additional paths to be encrypted etc.). The can be the regular container which you want to migrate to a encryption container.

* DstContainer - Destination container(should be created in advance), with new encryption policy set. This container will now house the reencrypted data.

If you wish to change the data encryption key then, you can use the same policy, but you have to change the keys for each of the policy paths. The new keys should be created prior its usage in the policy. The destination container
should be created with the new policy before you use it in this sample/driver code.


### Under the hood and available APIs for re-encryption.

- Uses change feed APIs (pull model) to read data from the source container. As the data is read, Always Encrypted CosmosDB decrypts the data (if the source container has encrypted data) and this data is then
written(Always Encrypted CosmosDB handles the encryption) into the destination container using bulk operation. Hence please make sure bulk execution is enabled on the client.

- The core library exposes an iterator model API which provides you with an iterator which is exposed as an extension method on container.

```csharp
/// <summary>
/// Gets an iterator for reencrypting the data.
/// The source container should have no data changes during reEncryption operation or should have changefeed full fidelity enabled.
/// </summary>
/// <param name="container"> Source container object. </param>
/// <param name="destinationContainerName"> Destination Container configured with new policy or key. </param>
/// <param name="checkIfWritesHaveStoppedCb"> Callback to check if writes have stopped.The called function should return true if writes have stopped.If FullFidelity change feed is not enabled,return true by default. </param>
/// <param name="changeFeedRequestOptions"> (Optional) Request options. </param>
/// <param name="sourceFeedRange"> (Optional) The range to start from. </param>
/// <param name="continuationToken"> (Optional) continuationToken: The continuation to resume from. </param>
/// <param name="cancellationToken"> (Optional) System.Threading.CancellationToken representing request cancellation. </param>
/// <returns> Returns a ReEncryption Iterator. </returns>
public static async Task<ReEncryptionIterator> GetReEncryptionIteratorAsync(
this Container container,
string destinationContainerName,
CosmosClient encryptionCosmosClient,
Func<bool> checkIfWritesHaveStoppedCb,
ChangeFeedRequestOptions changeFeedRequestOptions = null,
FeedRange sourceFeedRange = null,
string continuationToken = null,
CancellationToken cancellationToken = default)
```

This API can be used with the source container object and you can pass the destination container name. The method also accepts a callback delegate which gets called by the core library to check if the source container is not receiving
anymore writes. This is used by the core library to set the required flag (HasMoreResults) in the iterator which will indicate if there are anymore further results/changes that need to re-encrypted.
In addition to these parameters, there are few optional parameters which can help you with parallelization of re-encryption tasks by accepting changes and reencrypting data in a specific feed range using "sourceFeedRange" parameter, there by having
separate tasks for each feedrange that can help in improving the performance. The "continuationToken" can be used as a bookmark and can be passed to the iterator to pick up from where you left off from earlier re-encryption activity.

- The iterator has method EncryptNextAsync on it which is the core function that does the re-encryption, reading from the source container and writing the data to the destination container. The ReEncryptionResponseMessage class
provides you with ReEncryptionBulkOperationResponse which has important information which includes total documents that were successfully re-encrypted, gets you list of all failures and returns the documents,corresponding exception and other important information.

```csharp
/// <summary>
/// EncryptNextAsync.
/// </summary>
/// <param name="cancellationToken"> cancellationTOken. </param>
/// <returns> Response Message. </returns>
public async Task<ReEncryptionResponseMessage> EncryptNextAsync(
CancellationToken cancellationToken = default)
```







Original file line number Diff line number Diff line change
@@ -0,0 +1,23 @@
<Project Sdk="Microsoft.NET.Sdk">

<PropertyGroup>
<OutputType>Exe</OutputType>
<TargetFramework>netcoreapp3.1</TargetFramework>
<AssemblyName>Cosmos.Samples.ReEncryption</AssemblyName>
<RootNamespace>Cosmos.Samples.ReEncryption</RootNamespace>
<LangVersion>latest</LangVersion>
</PropertyGroup>
<ItemGroup>
<PackageReference Include="Microsoft.Azure.Cosmos.Encryption" Version="1.0.0-previewV18" />
<PackageReference Include="Microsoft.Data.Encryption.AzureKeyVaultProvider" Version="0.2.0-pre" />
<PackageReference Include="Microsoft.Extensions.Configuration" Version="2.2.0" />
<PackageReference Include="Microsoft.Extensions.Configuration.FileExtensions" Version="2.2.0" />
<PackageReference Include="Microsoft.Extensions.Configuration.Json" Version="2.2.0" />
<PackageReference Include="Newtonsoft.Json" Version="12.0.2" />
</ItemGroup>
<ItemGroup>
<None Include="AppSettings.json">
<CopyToOutputDirectory>PreserveNewest</CopyToOutputDirectory>
</None>
</ItemGroup>
</Project>
Original file line number Diff line number Diff line change
@@ -0,0 +1,21 @@
//------------------------------------------------------------
// Copyright (c) Microsoft Corporation. All rights reserved.
//------------------------------------------------------------

namespace Cosmos.Samples.ReEncryption
{
internal static class Constants
{
public const string DocumentsResourcePropertyName = "Documents";
public const string DocumentIdPropertyName = "id";
public const string DocumentRidPropertyName = "_rid";

// Full Fidelity Change feed is in preview. We do not support this currently.
public const bool IsFFChangeFeedSupported = false;

public const string LsnPropertyName = "_lsn";
public const string MetadataPropertyName = "_metadata";
public const string OperationTypePropertyName = "operationType";
public const string PreviousImagePropertyName = "previousImage";
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,207 @@
// ------------------------------------------------------------
// Copyright (c) Microsoft Corporation. All rights reserved.
// ------------------------------------------------------------

namespace Cosmos.Samples.ReEncryption
{
using System;
using System.Collections.Generic;
using System.Linq;
using System.Threading;
using System.Threading.Tasks;
using Microsoft.Azure.Cosmos;
using Newtonsoft.Json;
using Newtonsoft.Json.Linq;

internal class ReEncryptionBulkOperationBuilder
{
private readonly Container container;
private readonly string partitionKey;
private readonly ReEncryptionJsonSerializer reEncryptionJsonSerializer;

public ReEncryptionBulkOperationBuilder(
Container container,
string partitionKeyPath)
{
this.container = container ?? throw new ArgumentNullException(nameof(container));
this.partitionKey = string.IsNullOrEmpty(partitionKeyPath) ? throw new ArgumentNullException(nameof(partitionKeyPath)) : partitionKeyPath[1..];
this.reEncryptionJsonSerializer = new ReEncryptionJsonSerializer(
new JsonSerializerSettings()
{
DateParseHandling = DateParseHandling.None,
});
}

/// <summary>
/// Builds a bulk operation request and excutes the operation.
/// </summary>
/// <param name="response"> Response containing documents read from changefeed. </param>
/// <param name="cancellationToken"> cancellation token. </param>
/// <returns> ReEncryptionBulkOperationResponse. </returns>
public async Task<ReEncryptionBulkOperationResponse<JObject>> ExecuteAsync(ResponseMessage response, CancellationToken cancellationToken)
{
Dictionary<string, List<JObject>> changeFeedChangesBatcher = this.PopulateChangeFeedChanges(response);

if (!changeFeedChangesBatcher.Any())
{
throw new InvalidOperationException("PopulateChangeFeedChanges returned empty list of changes. ");
}

ReEncryptionBulkOperationResponse<JObject> bulkOperationResponse = null;
List<JObject> bulkOperationList = this.GetChangesForBulkOperations(changeFeedChangesBatcher);

if (bulkOperationList.Count > 0)
{
ReEncryptionBulkOperations<JObject> bulkOperations = new ReEncryptionBulkOperations<JObject>(bulkOperationList.Count);

foreach (JObject document in bulkOperationList)
{
JObject metadata = document.GetValue(Constants.MetadataPropertyName).ToObject<JObject>();
string operationType = metadata.GetValue("operationType").ToString();
if (operationType.Equals("delete"))
{
JObject previousImage = metadata.GetValue(Constants.PreviousImagePropertyName).ToObject<JObject>();

if (previousImage == null)
{
throw new InvalidOperationException("Missing previous image for document with delete operation type. ");
}

string id = previousImage.GetValue("id").ToString();
string pkvalue = previousImage.GetValue(this.partitionKey).ToString();

bulkOperations.Tasks.Add(this.container.DeleteItemAsync<JObject>(
id,
new PartitionKey(pkvalue),
cancellationToken: cancellationToken).CaptureReEncryptionOperationResponseAsync(document));
}
else
{
document.Remove(Constants.MetadataPropertyName);
document.Remove(Constants.LsnPropertyName);
bulkOperations.Tasks.Add(this.container.UpsertItemAsync(
item: document,
new PartitionKey(document.GetValue(this.partitionKey).ToString()),
cancellationToken: cancellationToken).CaptureReEncryptionOperationResponseAsync(document));
}
}

bulkOperationResponse = await bulkOperations.ExecuteAsync();
}

return bulkOperationResponse;
}

/// <summary>
/// Iterates over all the changes for each document. If there are multiple changes for the same document,
/// only the last change is picked up.
/// <param name="changeFeedChangesBatcher"> List containing the changes. </param>
/// </summary>
/// <returns> List of documents. </returns>
private List<JObject> GetChangesForBulkOperations(Dictionary<string, List<JObject>> changeFeedChangesBatcher)
{
List<JObject> bulkOperationList = new List<JObject>();
foreach (KeyValuePair<string, List<JObject>> keyValuePairOps in changeFeedChangesBatcher)
{
// get the last operation if there are multiple changes corresponding to same doc id.
if (keyValuePairOps.Value.Count > 1)
{
JObject lastDocument = keyValuePairOps.Value.ElementAt(keyValuePairOps.Value.Count - 1);
bulkOperationList.Add(lastDocument);
}
else if (keyValuePairOps.Value.Count == 1)
{
bulkOperationList.Add(keyValuePairOps.Value.FirstOrDefault());
}
}

return bulkOperationList;
}

/// <summary>
/// Builds a dictionary of list of documents and hashes them by the document id.
/// If a document has multiple changes then a list of changes(chain) is made corresponding to that id
/// which serves as a key.
/// </summary>
/// <param name="response">Response containing documents read from changefeed .</param>
/// <returns> HashTable/Array of list hashed/key by document Id. </returns>
private Dictionary<string, List<JObject>> PopulateChangeFeedChanges(ResponseMessage response)
{
JObject contentJObj = this.reEncryptionJsonSerializer.FromStream<JObject>(response.Content);
if (!(contentJObj.SelectToken(Constants.DocumentsResourcePropertyName) is JArray documents))
{
throw new InvalidOperationException("Feed Response body contract was violated. Feed response did not have an array of Documents. ");
}

Dictionary<string, List<JObject>> changeFeedChangesBatcher = new Dictionary<string, List<JObject>>();
if (documents.Count == 0)
{
return changeFeedChangesBatcher;
}

foreach (JToken value in documents)
{
if (value is not JObject document)
{
continue;
}

JObject metadata = document.GetValue(Constants.MetadataPropertyName).ToObject<JObject>();

if (metadata == null)
{
throw new InvalidOperationException("Metadata property missing in the document. ");
}

string operationType = metadata.GetValue(Constants.OperationTypePropertyName).ToString();
if (operationType.Equals("delete"))
{
JObject previousImage = metadata.GetValue(Constants.PreviousImagePropertyName).ToObject<JObject>();
if (previousImage == null)
{
throw new InvalidOperationException();
}

string rid = previousImage.GetValue(Constants.DocumentRidPropertyName).ToString();

if (changeFeedChangesBatcher.ContainsKey(rid))
{
List<JObject> operationToAdd = changeFeedChangesBatcher[rid];
operationToAdd.Add(document);
changeFeedChangesBatcher[rid] = operationToAdd;
}
else
{
List<JObject> operationToAdd = new List<JObject>
{
document,
};

changeFeedChangesBatcher.Add(rid, operationToAdd);
}
}
else
{
string rid = document.GetValue(Constants.DocumentRidPropertyName).ToString();
if (changeFeedChangesBatcher.ContainsKey(rid))
{
List<JObject> operationToAdd = changeFeedChangesBatcher[rid];
operationToAdd.Add(document);
changeFeedChangesBatcher[rid] = operationToAdd;
}
else
{
List<JObject> operationToAdd = new List<JObject>
{
document,
};

changeFeedChangesBatcher.Add(rid, operationToAdd);
}
}
}

return changeFeedChangesBatcher;
}
}
}
Loading

0 comments on commit ec8b8bb

Please sign in to comment.