Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Client Encryption: Adds ReadMany API support for encryption package. #2572

Merged
merged 9 commits into from
Jul 13, 2021
Original file line number Diff line number Diff line change
Expand Up @@ -991,15 +991,41 @@ public override Task<ResponseMessage> ReadManyItemsStreamAsync(
ReadManyRequestOptions readManyRequestOptions = null,
CancellationToken cancellationToken = default)
{
throw new NotImplementedException();
return this.ReadManyItemsHelperAsync(
items,
readManyRequestOptions,
cancellationToken);
}

public override Task<FeedResponse<T>> ReadManyItemsAsync<T>(
public override async Task<FeedResponse<T>> ReadManyItemsAsync<T>(
IReadOnlyList<(string id, PartitionKey partitionKey)> items,
ReadManyRequestOptions readManyRequestOptions = null,
CancellationToken cancellationToken = default)
{
throw new NotImplementedException();
ResponseMessage responseMessage = await this.ReadManyItemsHelperAsync(
items,
readManyRequestOptions,
cancellationToken);

return this.ResponseFactory.CreateItemFeedResponse<T>(responseMessage);
}

private async Task<ResponseMessage> ReadManyItemsHelperAsync(
IReadOnlyList<(string id, PartitionKey partitionKey)> items,
ReadManyRequestOptions readManyRequestOptions = null,
CancellationToken cancellationToken = default)
{
ResponseMessage responseMessage = await this.container.ReadManyItemsStreamAsync(
items,
readManyRequestOptions,
cancellationToken);

Stream decryptedContent = await EncryptionProcessor.DeserializeAndDecryptResponseAsync(
responseMessage.Content,
this.Encryptor,
cancellationToken);

return new DecryptedResponseMessage(responseMessage, decryptedContent);
}

private async Task<List<T>> DecryptChangeFeedDocumentsAsync<T>(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -572,7 +572,6 @@ internal static async Task<Stream> DeserializeAndDecryptResponseAsync(
CancellationToken cancellationToken)
{
JObject contentJObj = EncryptionProcessor.BaseSerializer.FromStream<JObject>(content);
JArray result = new JArray();

if (!(contentJObj.SelectToken(Constants.DocumentsResourcePropertyName) is JArray documents))
{
Expand All @@ -583,7 +582,6 @@ internal static async Task<Stream> DeserializeAndDecryptResponseAsync(
{
if (!(value is JObject document))
{
result.Add(value);
continue;
}

Expand All @@ -595,25 +593,12 @@ internal static async Task<Stream> DeserializeAndDecryptResponseAsync(
encryptor,
diagnosticsContext,
cancellationToken);

result.Add(decryptedDocument);
}
}

JObject decryptedResponse = new JObject();
foreach (JProperty property in contentJObj.Properties())
{
if (property.Name.Equals(Constants.DocumentsResourcePropertyName))
{
decryptedResponse.Add(property.Name, (JToken)result);
}
else
{
decryptedResponse.Add(property.Name, property.Value);
}
}

return EncryptionProcessor.BaseSerializer.ToStream(decryptedResponse);
// the contents of contentJObj get decrypted in place for MDE algorithm model, and for legacy model _ei property is removed
// and corresponding decrypted properties are added back in the documents.
return EncryptionProcessor.BaseSerializer.ToStream(contentJObj);
}
}
}
170 changes: 110 additions & 60 deletions Microsoft.Azure.Cosmos.Encryption/src/EncryptionContainer.cs
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,6 @@ namespace Microsoft.Azure.Cosmos.Encryption
using System.Threading;
using System.Threading.Tasks;
using Microsoft.Azure.Cosmos;
using Microsoft.Data.Encryption.Cryptography;
using Newtonsoft.Json.Linq;

internal sealed class EncryptionContainer : Container
Expand Down Expand Up @@ -857,15 +856,23 @@ public override Task<ResponseMessage> ReadManyItemsStreamAsync(
ReadManyRequestOptions readManyRequestOptions = null,
CancellationToken cancellationToken = default)
{
throw new NotImplementedException();
return this.ReadManyItemsHelperAsync(
items,
readManyRequestOptions,
cancellationToken);
}

public override Task<FeedResponse<T>> ReadManyItemsAsync<T>(
public override async Task<FeedResponse<T>> ReadManyItemsAsync<T>(
IReadOnlyList<(string id, PartitionKey partitionKey)> items,
ReadManyRequestOptions readManyRequestOptions = null,
CancellationToken cancellationToken = default)
{
throw new NotImplementedException();
ResponseMessage responseMessage = await this.ReadManyItemsHelperAsync(
items,
readManyRequestOptions,
cancellationToken);

return this.ResponseFactory.CreateItemFeedResponse<T>(responseMessage);
}

public async Task<EncryptionSettings> GetOrUpdateEncryptionSettingsFromCacheAsync(
Expand All @@ -879,6 +886,59 @@ public async Task<EncryptionSettings> GetOrUpdateEncryptionSettingsFromCacheAsyn
cancellationToken: cancellationToken);
}

internal async Task<Stream> DeserializeAndDecryptResponseAsync(
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

internal

private

Copy link
Contributor Author

@kr-santosh kr-santosh Jul 1, 2021

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is used by EncryptionFeedIterator... just moved the code above private methods..

Stream content,
EncryptionSettings encryptionSettings,
CancellationToken cancellationToken)
{
if (!encryptionSettings.PropertiesToEncrypt.Any())
{
return content;
}

JObject contentJObj = EncryptionProcessor.BaseSerializer.FromStream<JObject>(content);

if (!(contentJObj.SelectToken(Constants.DocumentsResourcePropertyName) is JArray documents))
{
throw new InvalidOperationException("Feed Response body contract was violated. Feed response did not have an array of Documents. ");
}

foreach (JToken value in documents)
{
if (value is not JObject document)
{
continue;
}

try
{
JObject decryptedDocument = await EncryptionProcessor.DecryptAsync(
document,
encryptionSettings,
cancellationToken);
}

// we cannot rely currently on a specific exception, this is due to the fact that the run time issue can be variable,
// we can hit issue with either Json serialization say an item was not encrypted but the policy shows it as encrypted,
// or we could hit a MicrosoftDataEncryptionException from MDE lib etc.
catch (Exception)
j82w marked this conversation as resolved.
Show resolved Hide resolved
{
// most likely the encryption policy has changed.
encryptionSettings = await this.GetOrUpdateEncryptionSettingsFromCacheAsync(
obsoleteEncryptionSettings: encryptionSettings,
cancellationToken: cancellationToken);

JObject decryptedDocument = await EncryptionProcessor.DecryptAsync(
document,
encryptionSettings,
cancellationToken);
}
}

// the contents get decrypted in place by DecryptAsync.
return EncryptionProcessor.BaseSerializer.ToStream(contentJObj);
}

/// <summary>
/// Returns a cloned copy of the passed RequestOptions if passed else creates a new ItemRequestOptions.
/// </summary>
Expand Down Expand Up @@ -926,7 +986,7 @@ private async Task<ResponseMessage> CreateItemHelperAsync(

ItemRequestOptions clonedRequestOptions = requestOptions;

// only clone it on the first try.
// Clone(once) the request options since we modify it to set AddRequestHeaders to add additional headers.
if (!isRetry)
{
clonedRequestOptions = GetClonedItemRequestOptions(requestOptions);
Expand Down Expand Up @@ -1001,7 +1061,7 @@ private async Task<ResponseMessage> ReadItemHelperAsync(

ItemRequestOptions clonedRequestOptions = requestOptions;

// only clone it on the first try.
// Clone(once) the request options since we modify it to set AddRequestHeaders to add additional headers.
if (!isRetry)
{
clonedRequestOptions = GetClonedItemRequestOptions(requestOptions);
Expand Down Expand Up @@ -1075,7 +1135,7 @@ private async Task<ResponseMessage> ReplaceItemHelperAsync(

ItemRequestOptions clonedRequestOptions = requestOptions;

// only clone it on the first try.
// Clone(once) the request options since we modify it to set AddRequestHeaders to add additional headers.
if (!isRetry)
{
clonedRequestOptions = GetClonedItemRequestOptions(requestOptions);
Expand Down Expand Up @@ -1151,7 +1211,7 @@ private async Task<ResponseMessage> UpsertItemHelperAsync(

ItemRequestOptions clonedRequestOptions = requestOptions;

// only clone it on the first try.
// Clone(once) the request options since we modify it to set AddRequestHeaders to add additional headers.
if (!isRetry)
{
clonedRequestOptions = GetClonedItemRequestOptions(requestOptions);
Expand Down Expand Up @@ -1269,75 +1329,65 @@ private async Task<List<T>> DecryptChangeFeedDocumentsAsync<T>(
return decryptedItems;
}

internal async Task<Stream> DeserializeAndDecryptResponseAsync(
Stream content,
EncryptionSettings encryptionSettings,
CancellationToken cancellationToken)
private async Task<ResponseMessage> ReadManyItemsHelperAsync(
IReadOnlyList<(string id, PartitionKey partitionKey)> items,
ReadManyRequestOptions readManyRequestOptions = null,
CancellationToken cancellationToken = default,
bool isRetry = false)
{
EncryptionSettings encryptionSettings = await this.GetOrUpdateEncryptionSettingsFromCacheAsync(
obsoleteEncryptionSettings: null,
cancellationToken: cancellationToken);

if (!encryptionSettings.PropertiesToEncrypt.Any())
{
return content;
return await this.container.ReadManyItemsStreamAsync(
items,
readManyRequestOptions,
cancellationToken);
}

JObject contentJObj = EncryptionProcessor.BaseSerializer.FromStream<JObject>(content);
JArray results = new JArray();
ReadManyRequestOptions clonedRequestOptions = readManyRequestOptions;

if (!(contentJObj.SelectToken(Constants.DocumentsResourcePropertyName) is JArray documents))
{
throw new InvalidOperationException("Feed Response body contract was violated. Feed response did not have an array of Documents. ");
}

foreach (JToken value in documents)
// Clone(once) the request options since we modify it to set AddRequestHeaders to add additional headers.
if (!isRetry)
{
if (value is not JObject document)
if (readManyRequestOptions != null)
{
results.Add(value);
continue;
clonedRequestOptions = (ReadManyRequestOptions)readManyRequestOptions.ShallowCopy();
}

try
else
{
JObject decryptedDocument = await EncryptionProcessor.DecryptAsync(
document,
encryptionSettings,
cancellationToken);

results.Add(decryptedDocument);
clonedRequestOptions = new ReadManyRequestOptions();
}
}

// we cannot rely currently on a specific exception, this is due to the fact that the run time issue can be variable,
// we can hit issue with either Json serialization say an item was not encrypted but the policy shows it as encrypted,
// or we could hit a MicrosoftDataEncryptionException from MDE lib etc.
catch (Exception)
{
// most likely the encryption policy has changed.
encryptionSettings = await this.GetOrUpdateEncryptionSettingsFromCacheAsync(
obsoleteEncryptionSettings: encryptionSettings,
cancellationToken: cancellationToken);

JObject decryptedDocument = await EncryptionProcessor.DecryptAsync(
document,
encryptionSettings,
cancellationToken);
encryptionSettings.SetRequestHeaders(clonedRequestOptions);

results.Add(decryptedDocument);
}
}
ResponseMessage responseMessage = await this.container.ReadManyItemsStreamAsync(
items,
clonedRequestOptions,
cancellationToken);

JObject decryptedResponse = new JObject();
foreach (JProperty property in contentJObj.Properties())
if (!isRetry &&
responseMessage.StatusCode == HttpStatusCode.BadRequest &&
string.Equals(responseMessage.Headers.Get(Constants.SubStatusHeader), Constants.IncorrectContainerRidSubStatus))
{
if (property.Name.Equals(Constants.DocumentsResourcePropertyName))
{
decryptedResponse.Add(property.Name, (JToken)results);
}
else
{
decryptedResponse.Add(property.Name, property.Value);
}
// get the latest encryption settings.
await this.GetOrUpdateEncryptionSettingsFromCacheAsync(
obsoleteEncryptionSettings: encryptionSettings,
cancellationToken: cancellationToken);

return await this.ReadManyItemsHelperAsync(
items,
clonedRequestOptions,
cancellationToken,
isRetry: true);
}

return EncryptionProcessor.BaseSerializer.ToStream(decryptedResponse);
Stream decryptedContent = await this.DeserializeAndDecryptResponseAsync(responseMessage.Content, encryptionSettings, cancellationToken);

return new DecryptedResponseMessage(responseMessage, decryptedContent);
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ namespace Microsoft.Azure.Cosmos.Encryption.EmulatorTests
using static Microsoft.Azure.Cosmos.Encryption.EmulatorTests.LegacyEncryptionTests;
using EncryptionKeyWrapMetadata = Custom.EncryptionKeyWrapMetadata;
using DataEncryptionKey = Custom.DataEncryptionKey;
using Newtonsoft.Json.Linq;

[TestClass]
public class MdeCustomEncryptionTests
Expand Down Expand Up @@ -348,6 +349,45 @@ public async Task ValidateCachingOfProtectedDataEncryptionKey()
Assert.AreEqual(1, unwrapcount);
}

[TestMethod]
public async Task EncryptionReadManyItemAsync()
{
TestDoc testDoc = await MdeCustomEncryptionTests.CreateItemAsync(MdeCustomEncryptionTests.encryptionContainer, MdeCustomEncryptionTests.dekId, TestDoc.PathsToEncrypt);

TestDoc testDoc2 = await MdeCustomEncryptionTests.CreateItemAsync(MdeCustomEncryptionTests.encryptionContainer, MdeCustomEncryptionTests.dekId, TestDoc.PathsToEncrypt);

List<(string, PartitionKey)> itemList = new List<(string, PartitionKey)>
{
(testDoc.Id, new PartitionKey(testDoc.PK)),
(testDoc2.Id, new PartitionKey(testDoc2.PK))
};

FeedResponse<TestDoc> response = await encryptionContainer.ReadManyItemsAsync<TestDoc>(itemList);

Assert.AreEqual(HttpStatusCode.OK, response.StatusCode);
Assert.AreEqual(2, response.Count);
VerifyExpectedDocResponse(testDoc, response.Resource.ElementAt(0));
VerifyExpectedDocResponse(testDoc2, response.Resource.ElementAt(1));

// stream test.
ResponseMessage responseStream = await encryptionContainer.ReadManyItemsStreamAsync(itemList);

Assert.IsTrue(responseStream.IsSuccessStatusCode);
Assert.AreEqual(HttpStatusCode.OK, response.StatusCode);

JObject contentJObjects = TestCommon.FromStream<JObject>(responseStream.Content);

if (contentJObjects.SelectToken(Constants.DocumentsResourcePropertyName) is JArray documents)
{
VerifyExpectedDocResponse(testDoc, documents.ElementAt(0).ToObject<TestDoc>());
VerifyExpectedDocResponse(testDoc2, documents.ElementAt(1).ToObject<TestDoc>());
}
else
{
Assert.Fail("ResponseMessage from ReadManyItemsStreamAsync did not have a valid response. ");
}
}

[TestMethod]
public async Task EncryptionCreateItem()
{
Expand Down
Loading