Skip to content

Commit

Permalink
Client Encryption: Adds ReadMany API support for encryption package. (A…
Browse files Browse the repository at this point in the history
…zure#2572)

This PR adds ReadMany API support for encryption package.
  • Loading branch information
kr-santosh authored Jul 13, 2021
1 parent c4545be commit db5f2c9
Show file tree
Hide file tree
Showing 5 changed files with 220 additions and 81 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -991,15 +991,41 @@ public override Task<ResponseMessage> ReadManyItemsStreamAsync(
ReadManyRequestOptions readManyRequestOptions = null,
CancellationToken cancellationToken = default)
{
throw new NotImplementedException();
return this.ReadManyItemsHelperAsync(
items,
readManyRequestOptions,
cancellationToken);
}

public override Task<FeedResponse<T>> ReadManyItemsAsync<T>(
public override async Task<FeedResponse<T>> ReadManyItemsAsync<T>(
IReadOnlyList<(string id, PartitionKey partitionKey)> items,
ReadManyRequestOptions readManyRequestOptions = null,
CancellationToken cancellationToken = default)
{
throw new NotImplementedException();
ResponseMessage responseMessage = await this.ReadManyItemsHelperAsync(
items,
readManyRequestOptions,
cancellationToken);

return this.ResponseFactory.CreateItemFeedResponse<T>(responseMessage);
}

private async Task<ResponseMessage> ReadManyItemsHelperAsync(
IReadOnlyList<(string id, PartitionKey partitionKey)> items,
ReadManyRequestOptions readManyRequestOptions = null,
CancellationToken cancellationToken = default)
{
ResponseMessage responseMessage = await this.container.ReadManyItemsStreamAsync(
items,
readManyRequestOptions,
cancellationToken);

Stream decryptedContent = await EncryptionProcessor.DeserializeAndDecryptResponseAsync(
responseMessage.Content,
this.Encryptor,
cancellationToken);

return new DecryptedResponseMessage(responseMessage, decryptedContent);
}

private async Task<List<T>> DecryptChangeFeedDocumentsAsync<T>(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -572,7 +572,6 @@ internal static async Task<Stream> DeserializeAndDecryptResponseAsync(
CancellationToken cancellationToken)
{
JObject contentJObj = EncryptionProcessor.BaseSerializer.FromStream<JObject>(content);
JArray result = new JArray();

if (!(contentJObj.SelectToken(Constants.DocumentsResourcePropertyName) is JArray documents))
{
Expand All @@ -583,7 +582,6 @@ internal static async Task<Stream> DeserializeAndDecryptResponseAsync(
{
if (!(value is JObject document))
{
result.Add(value);
continue;
}

Expand All @@ -595,25 +593,12 @@ internal static async Task<Stream> DeserializeAndDecryptResponseAsync(
encryptor,
diagnosticsContext,
cancellationToken);

result.Add(decryptedDocument);
}
}

JObject decryptedResponse = new JObject();
foreach (JProperty property in contentJObj.Properties())
{
if (property.Name.Equals(Constants.DocumentsResourcePropertyName))
{
decryptedResponse.Add(property.Name, (JToken)result);
}
else
{
decryptedResponse.Add(property.Name, property.Value);
}
}

return EncryptionProcessor.BaseSerializer.ToStream(decryptedResponse);
// the contents of contentJObj get decrypted in place for MDE algorithm model, and for legacy model _ei property is removed
// and corresponding decrypted properties are added back in the documents.
return EncryptionProcessor.BaseSerializer.ToStream(contentJObj);
}
}
}
170 changes: 110 additions & 60 deletions Microsoft.Azure.Cosmos.Encryption/src/EncryptionContainer.cs
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,6 @@ namespace Microsoft.Azure.Cosmos.Encryption
using System.Threading;
using System.Threading.Tasks;
using Microsoft.Azure.Cosmos;
using Microsoft.Data.Encryption.Cryptography;
using Newtonsoft.Json.Linq;

internal sealed class EncryptionContainer : Container
Expand Down Expand Up @@ -857,15 +856,23 @@ public override Task<ResponseMessage> ReadManyItemsStreamAsync(
ReadManyRequestOptions readManyRequestOptions = null,
CancellationToken cancellationToken = default)
{
throw new NotImplementedException();
return this.ReadManyItemsHelperAsync(
items,
readManyRequestOptions,
cancellationToken);
}

public override Task<FeedResponse<T>> ReadManyItemsAsync<T>(
public override async Task<FeedResponse<T>> ReadManyItemsAsync<T>(
IReadOnlyList<(string id, PartitionKey partitionKey)> items,
ReadManyRequestOptions readManyRequestOptions = null,
CancellationToken cancellationToken = default)
{
throw new NotImplementedException();
ResponseMessage responseMessage = await this.ReadManyItemsHelperAsync(
items,
readManyRequestOptions,
cancellationToken);

return this.ResponseFactory.CreateItemFeedResponse<T>(responseMessage);
}

public async Task<EncryptionSettings> GetOrUpdateEncryptionSettingsFromCacheAsync(
Expand All @@ -879,6 +886,59 @@ public async Task<EncryptionSettings> GetOrUpdateEncryptionSettingsFromCacheAsyn
cancellationToken: cancellationToken);
}

internal async Task<Stream> DeserializeAndDecryptResponseAsync(
Stream content,
EncryptionSettings encryptionSettings,
CancellationToken cancellationToken)
{
if (!encryptionSettings.PropertiesToEncrypt.Any())
{
return content;
}

JObject contentJObj = EncryptionProcessor.BaseSerializer.FromStream<JObject>(content);

if (!(contentJObj.SelectToken(Constants.DocumentsResourcePropertyName) is JArray documents))
{
throw new InvalidOperationException("Feed Response body contract was violated. Feed response did not have an array of Documents. ");
}

foreach (JToken value in documents)
{
if (value is not JObject document)
{
continue;
}

try
{
JObject decryptedDocument = await EncryptionProcessor.DecryptAsync(
document,
encryptionSettings,
cancellationToken);
}

// we cannot rely currently on a specific exception, this is due to the fact that the run time issue can be variable,
// we can hit issue with either Json serialization say an item was not encrypted but the policy shows it as encrypted,
// or we could hit a MicrosoftDataEncryptionException from MDE lib etc.
catch (Exception)
{
// most likely the encryption policy has changed.
encryptionSettings = await this.GetOrUpdateEncryptionSettingsFromCacheAsync(
obsoleteEncryptionSettings: encryptionSettings,
cancellationToken: cancellationToken);

JObject decryptedDocument = await EncryptionProcessor.DecryptAsync(
document,
encryptionSettings,
cancellationToken);
}
}

// the contents get decrypted in place by DecryptAsync.
return EncryptionProcessor.BaseSerializer.ToStream(contentJObj);
}

/// <summary>
/// Returns a cloned copy of the passed RequestOptions if passed else creates a new ItemRequestOptions.
/// </summary>
Expand Down Expand Up @@ -926,7 +986,7 @@ private async Task<ResponseMessage> CreateItemHelperAsync(

ItemRequestOptions clonedRequestOptions = requestOptions;

// only clone it on the first try.
// Clone(once) the request options since we modify it to set AddRequestHeaders to add additional headers.
if (!isRetry)
{
clonedRequestOptions = GetClonedItemRequestOptions(requestOptions);
Expand Down Expand Up @@ -1001,7 +1061,7 @@ private async Task<ResponseMessage> ReadItemHelperAsync(

ItemRequestOptions clonedRequestOptions = requestOptions;

// only clone it on the first try.
// Clone(once) the request options since we modify it to set AddRequestHeaders to add additional headers.
if (!isRetry)
{
clonedRequestOptions = GetClonedItemRequestOptions(requestOptions);
Expand Down Expand Up @@ -1075,7 +1135,7 @@ private async Task<ResponseMessage> ReplaceItemHelperAsync(

ItemRequestOptions clonedRequestOptions = requestOptions;

// only clone it on the first try.
// Clone(once) the request options since we modify it to set AddRequestHeaders to add additional headers.
if (!isRetry)
{
clonedRequestOptions = GetClonedItemRequestOptions(requestOptions);
Expand Down Expand Up @@ -1151,7 +1211,7 @@ private async Task<ResponseMessage> UpsertItemHelperAsync(

ItemRequestOptions clonedRequestOptions = requestOptions;

// only clone it on the first try.
// Clone(once) the request options since we modify it to set AddRequestHeaders to add additional headers.
if (!isRetry)
{
clonedRequestOptions = GetClonedItemRequestOptions(requestOptions);
Expand Down Expand Up @@ -1269,75 +1329,65 @@ private async Task<List<T>> DecryptChangeFeedDocumentsAsync<T>(
return decryptedItems;
}

internal async Task<Stream> DeserializeAndDecryptResponseAsync(
Stream content,
EncryptionSettings encryptionSettings,
CancellationToken cancellationToken)
private async Task<ResponseMessage> ReadManyItemsHelperAsync(
IReadOnlyList<(string id, PartitionKey partitionKey)> items,
ReadManyRequestOptions readManyRequestOptions = null,
CancellationToken cancellationToken = default,
bool isRetry = false)
{
EncryptionSettings encryptionSettings = await this.GetOrUpdateEncryptionSettingsFromCacheAsync(
obsoleteEncryptionSettings: null,
cancellationToken: cancellationToken);

if (!encryptionSettings.PropertiesToEncrypt.Any())
{
return content;
return await this.container.ReadManyItemsStreamAsync(
items,
readManyRequestOptions,
cancellationToken);
}

JObject contentJObj = EncryptionProcessor.BaseSerializer.FromStream<JObject>(content);
JArray results = new JArray();
ReadManyRequestOptions clonedRequestOptions = readManyRequestOptions;

if (!(contentJObj.SelectToken(Constants.DocumentsResourcePropertyName) is JArray documents))
{
throw new InvalidOperationException("Feed Response body contract was violated. Feed response did not have an array of Documents. ");
}

foreach (JToken value in documents)
// Clone(once) the request options since we modify it to set AddRequestHeaders to add additional headers.
if (!isRetry)
{
if (value is not JObject document)
if (readManyRequestOptions != null)
{
results.Add(value);
continue;
clonedRequestOptions = (ReadManyRequestOptions)readManyRequestOptions.ShallowCopy();
}

try
else
{
JObject decryptedDocument = await EncryptionProcessor.DecryptAsync(
document,
encryptionSettings,
cancellationToken);

results.Add(decryptedDocument);
clonedRequestOptions = new ReadManyRequestOptions();
}
}

// we cannot rely currently on a specific exception, this is due to the fact that the run time issue can be variable,
// we can hit issue with either Json serialization say an item was not encrypted but the policy shows it as encrypted,
// or we could hit a MicrosoftDataEncryptionException from MDE lib etc.
catch (Exception)
{
// most likely the encryption policy has changed.
encryptionSettings = await this.GetOrUpdateEncryptionSettingsFromCacheAsync(
obsoleteEncryptionSettings: encryptionSettings,
cancellationToken: cancellationToken);

JObject decryptedDocument = await EncryptionProcessor.DecryptAsync(
document,
encryptionSettings,
cancellationToken);
encryptionSettings.SetRequestHeaders(clonedRequestOptions);

results.Add(decryptedDocument);
}
}
ResponseMessage responseMessage = await this.container.ReadManyItemsStreamAsync(
items,
clonedRequestOptions,
cancellationToken);

JObject decryptedResponse = new JObject();
foreach (JProperty property in contentJObj.Properties())
if (!isRetry &&
responseMessage.StatusCode == HttpStatusCode.BadRequest &&
string.Equals(responseMessage.Headers.Get(Constants.SubStatusHeader), Constants.IncorrectContainerRidSubStatus))
{
if (property.Name.Equals(Constants.DocumentsResourcePropertyName))
{
decryptedResponse.Add(property.Name, (JToken)results);
}
else
{
decryptedResponse.Add(property.Name, property.Value);
}
// get the latest encryption settings.
await this.GetOrUpdateEncryptionSettingsFromCacheAsync(
obsoleteEncryptionSettings: encryptionSettings,
cancellationToken: cancellationToken);

return await this.ReadManyItemsHelperAsync(
items,
clonedRequestOptions,
cancellationToken,
isRetry: true);
}

return EncryptionProcessor.BaseSerializer.ToStream(decryptedResponse);
Stream decryptedContent = await this.DeserializeAndDecryptResponseAsync(responseMessage.Content, encryptionSettings, cancellationToken);

return new DecryptedResponseMessage(responseMessage, decryptedContent);
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ namespace Microsoft.Azure.Cosmos.Encryption.EmulatorTests
using static Microsoft.Azure.Cosmos.Encryption.EmulatorTests.LegacyEncryptionTests;
using EncryptionKeyWrapMetadata = Custom.EncryptionKeyWrapMetadata;
using DataEncryptionKey = Custom.DataEncryptionKey;
using Newtonsoft.Json.Linq;

[TestClass]
public class MdeCustomEncryptionTests
Expand Down Expand Up @@ -348,6 +349,45 @@ public async Task ValidateCachingOfProtectedDataEncryptionKey()
Assert.AreEqual(1, unwrapcount);
}

[TestMethod]
public async Task EncryptionReadManyItemAsync()
{
TestDoc testDoc = await MdeCustomEncryptionTests.CreateItemAsync(MdeCustomEncryptionTests.encryptionContainer, MdeCustomEncryptionTests.dekId, TestDoc.PathsToEncrypt);

TestDoc testDoc2 = await MdeCustomEncryptionTests.CreateItemAsync(MdeCustomEncryptionTests.encryptionContainer, MdeCustomEncryptionTests.dekId, TestDoc.PathsToEncrypt);

List<(string, PartitionKey)> itemList = new List<(string, PartitionKey)>
{
(testDoc.Id, new PartitionKey(testDoc.PK)),
(testDoc2.Id, new PartitionKey(testDoc2.PK))
};

FeedResponse<TestDoc> response = await encryptionContainer.ReadManyItemsAsync<TestDoc>(itemList);

Assert.AreEqual(HttpStatusCode.OK, response.StatusCode);
Assert.AreEqual(2, response.Count);
VerifyExpectedDocResponse(testDoc, response.Resource.ElementAt(0));
VerifyExpectedDocResponse(testDoc2, response.Resource.ElementAt(1));

// stream test.
ResponseMessage responseStream = await encryptionContainer.ReadManyItemsStreamAsync(itemList);

Assert.IsTrue(responseStream.IsSuccessStatusCode);
Assert.AreEqual(HttpStatusCode.OK, response.StatusCode);

JObject contentJObjects = TestCommon.FromStream<JObject>(responseStream.Content);

if (contentJObjects.SelectToken(Constants.DocumentsResourcePropertyName) is JArray documents)
{
VerifyExpectedDocResponse(testDoc, documents.ElementAt(0).ToObject<TestDoc>());
VerifyExpectedDocResponse(testDoc2, documents.ElementAt(1).ToObject<TestDoc>());
}
else
{
Assert.Fail("ResponseMessage from ReadManyItemsStreamAsync did not have a valid response. ");
}
}

[TestMethod]
public async Task EncryptionCreateItem()
{
Expand Down
Loading

0 comments on commit db5f2c9

Please sign in to comment.