From 251a6e4037237365f542703d39a64a8b53e457db Mon Sep 17 00:00:00 2001 From: Santosh Kulkarni <66682828+kr-santosh@users.noreply.github.com> Date: Thu, 30 Sep 2021 22:06:32 +0530 Subject: [PATCH] Client Encryption Bulk: Adds the ability to pass container rid to SDK header for Bulk operations for container recreate scenarios (#2404) * Add support to inject/append custom header via RequestOptions. * Update DotNetSDKAPI.json * Update CosmosHeaderTests.cs * Changes as per review comments. * Update CosmosHeaderTests.cs * Provide shallow copy function. * Fix allows headers required by enc package to be passed during bulk operaton. * Move to PREVIEW * Fixes. * Fixes. * Update BatchAsyncBatcher.cs * Update BatchAsyncContainerExecutor.cs * using constants * Update BatchAsyncBatcher.cs * fixed names * Update PartitionKeyRangeServerBatchRequest.cs * Updated documentation. * Fixes as per review comments. * Update PartitionKeyRangeServerBatchRequestTests.cs * fixes. Co-authored-by: j82w --- .../src/Batch/BatchAsyncBatcher.cs | 10 ++ .../src/Batch/BatchAsyncContainerExecutor.cs | 35 +++++- .../src/Batch/ItemBatchOperationContext.cs | 4 + .../PartitionKeyRangeServerBatchRequest.cs | 23 +++- .../Batch/CosmosItemBulkTests.cs | 113 +++++++++++++++++- .../CosmosReadManyItemsTests.cs | 8 +- ...artitionKeyRangeServerBatchRequestTests.cs | 8 ++ 7 files changed, 191 insertions(+), 10 deletions(-) diff --git a/Microsoft.Azure.Cosmos/src/Batch/BatchAsyncBatcher.cs b/Microsoft.Azure.Cosmos/src/Batch/BatchAsyncBatcher.cs index 987264f6bb..1776abe926 100644 --- a/Microsoft.Azure.Cosmos/src/Batch/BatchAsyncBatcher.cs +++ b/Microsoft.Azure.Cosmos/src/Batch/BatchAsyncBatcher.cs @@ -38,6 +38,8 @@ internal class BatchAsyncBatcher private readonly CosmosClientContext clientContext; private long currentSize = 0; private bool dispatched = false; + private bool isClientEncrypted = false; + private string intendedCollectionRidValue; public bool IsEmpty => this.batchOperations.Count == 0; @@ -86,6 +88,12 @@ public virtual bool TryAdd(ItemBatchOperation operation) throw new ArgumentNullException(nameof(operation.Context)); } + if (operation.Context.IsClientEncrypted && !this.isClientEncrypted) + { + this.isClientEncrypted = true; + this.intendedCollectionRidValue = operation.Context.IntendedCollectionRidValue; + } + if (this.batchOperations.Count == this.maxBatchOperationCount) { DefaultTrace.TraceInformation($"Batch is full - Max operation count {this.maxBatchOperationCount} reached."); @@ -224,6 +232,8 @@ internal virtual async Task AddAsync( trace, cancellationToken).ConfigureAwait(false); BatchAsyncStreamer streamer = this.GetOrAddStreamerForPartitionKeyRange(resolvedPartitionKeyRangeId); + ItemBatchOperationContext context = new ItemBatchOperationContext( resolvedPartitionKeyRangeId, trace, BatchAsyncContainerExecutor.GetRetryPolicy(this.cosmosContainer, operation.OperationType, this.retryOptions)); + + if (itemRequestOptions != null && itemRequestOptions.AddRequestHeaders != null) + { + // get the header value if any, passed by the encryption package. + Headers encryptionHeaders = new Headers(); + itemRequestOptions.AddRequestHeaders?.Invoke(encryptionHeaders); + + // make sure we set the Intended Collection Rid header when we have encrypted payload. + // This primarily would allow CosmosDB Encryption package to detect change in container referenced by a Client + // and prevent creating data with wrong Encryption Policy. + if (encryptionHeaders.TryGetValue(HttpConstants.HttpHeaders.IsClientEncrypted, out string encrypted)) + { + context.IsClientEncrypted = bool.Parse(encrypted); + + if (context.IsClientEncrypted && encryptionHeaders.TryGetValue(WFConstants.BackendHeaders.IntendedCollectionRid, out string ridValue)) + { + context.IntendedCollectionRidValue = ridValue; + } + } + } + operation.AttachContext(context); streamer.Add(operation); return await context.OperationTask; @@ -176,9 +198,16 @@ private static bool ValidateOperationEPK( return true; } - private static void AddHeadersToRequestMessage(RequestMessage requestMessage, string partitionKeyRangeId) + private static void AddHeadersToRequestMessage(RequestMessage requestMessage, PartitionKeyRangeServerBatchRequest partitionKeyRangeServerBatchRequest) { - requestMessage.Headers.PartitionKeyRangeId = partitionKeyRangeId; + requestMessage.Headers.PartitionKeyRangeId = partitionKeyRangeServerBatchRequest.PartitionKeyRangeId; + + if (partitionKeyRangeServerBatchRequest.IsClientEncrypted) + { + requestMessage.Headers.Add(HttpConstants.HttpHeaders.IsClientEncrypted, partitionKeyRangeServerBatchRequest.IsClientEncrypted.ToString()); + requestMessage.Headers.Add(WFConstants.BackendHeaders.IntendedCollectionRid, partitionKeyRangeServerBatchRequest.IntendedCollectionRidValue); + } + requestMessage.Headers.Add(HttpConstants.HttpHeaders.ShouldBatchContinueOnError, bool.TrueString); requestMessage.Headers.Add(HttpConstants.HttpHeaders.IsBatchAtomic, bool.FalseString); requestMessage.Headers.Add(HttpConstants.HttpHeaders.IsBatchRequest, bool.TrueString); @@ -247,7 +276,7 @@ private async Task ExecuteAsync( cosmosContainerCore: this.cosmosContainer, feedRange: null, streamPayload: serverRequestPayload, - requestEnricher: requestMessage => BatchAsyncContainerExecutor.AddHeadersToRequestMessage(requestMessage, serverRequest.PartitionKeyRangeId), + requestEnricher: requestMessage => BatchAsyncContainerExecutor.AddHeadersToRequestMessage(requestMessage, serverRequest), trace: trace, cancellationToken: cancellationToken).ConfigureAwait(false); diff --git a/Microsoft.Azure.Cosmos/src/Batch/ItemBatchOperationContext.cs b/Microsoft.Azure.Cosmos/src/Batch/ItemBatchOperationContext.cs index e4316eace4..7be48aa672 100644 --- a/Microsoft.Azure.Cosmos/src/Batch/ItemBatchOperationContext.cs +++ b/Microsoft.Azure.Cosmos/src/Batch/ItemBatchOperationContext.cs @@ -18,6 +18,10 @@ internal class ItemBatchOperationContext : IDisposable { public string PartitionKeyRangeId { get; private set; } + public bool IsClientEncrypted { get; set; } + + public string IntendedCollectionRidValue { get; set; } + public BatchAsyncBatcher CurrentBatcher { get; set; } public Task OperationTask => this.taskCompletionSource.Task; diff --git a/Microsoft.Azure.Cosmos/src/Batch/PartitionKeyRangeServerBatchRequest.cs b/Microsoft.Azure.Cosmos/src/Batch/PartitionKeyRangeServerBatchRequest.cs index e4c83b2782..b19dc817d9 100644 --- a/Microsoft.Azure.Cosmos/src/Batch/PartitionKeyRangeServerBatchRequest.cs +++ b/Microsoft.Azure.Cosmos/src/Batch/PartitionKeyRangeServerBatchRequest.cs @@ -14,17 +14,23 @@ internal sealed class PartitionKeyRangeServerBatchRequest : ServerBatchRequest /// Initializes a new instance of the class. /// /// The partition key range id associated with all requests. + /// If the operation has Encrypted data. + /// Intended Collection Rid value. /// Maximum length allowed for the request body. /// Maximum number of operations allowed in the request. /// Serializer to serialize user provided objects to JSON. public PartitionKeyRangeServerBatchRequest( string partitionKeyRangeId, + bool isClientEncrypted, + string intendedCollectionRidValue, int maxBodyLength, int maxOperationCount, CosmosSerializerCore serializerCore) : base(maxBodyLength, maxOperationCount, serializerCore) { this.PartitionKeyRangeId = partitionKeyRangeId; + this.IsClientEncrypted = isClientEncrypted; + this.IntendedCollectionRidValue = intendedCollectionRidValue; } /// @@ -32,6 +38,10 @@ public PartitionKeyRangeServerBatchRequest( /// public string PartitionKeyRangeId { get; } + public bool IsClientEncrypted { get; } + + public string IntendedCollectionRidValue { get; } + /// /// Creates an instance of . /// In case of direct mode requests, all the operations are expected to belong to the same PartitionKeyRange. @@ -43,6 +53,8 @@ public PartitionKeyRangeServerBatchRequest( /// Maximum number of operations allowed in the request. /// Whether to stop adding operations to the request once there is non-continuity in the operation indexes. /// Serializer to serialize user provided objects to JSON. + /// Indicates if the request has encrypted data. + /// The intended collection Rid value. /// representing request cancellation. /// A newly created instance of and the overflow ItemBatchOperation not being processed. public static async Task>> CreateAsync( @@ -52,9 +64,18 @@ public static async Task pendingOperations = await request.CreateBodyStreamAsync(operations, cancellationToken, ensureContinuousOperationIndexes); return new Tuple>(request, pendingOperations); } diff --git a/Microsoft.Azure.Cosmos/tests/Microsoft.Azure.Cosmos.EmulatorTests/Batch/CosmosItemBulkTests.cs b/Microsoft.Azure.Cosmos/tests/Microsoft.Azure.Cosmos.EmulatorTests/Batch/CosmosItemBulkTests.cs index 028b4ad95b..3927d038c2 100644 --- a/Microsoft.Azure.Cosmos/tests/Microsoft.Azure.Cosmos.EmulatorTests/Batch/CosmosItemBulkTests.cs +++ b/Microsoft.Azure.Cosmos/tests/Microsoft.Azure.Cosmos.EmulatorTests/Batch/CosmosItemBulkTests.cs @@ -8,6 +8,7 @@ namespace Microsoft.Azure.Cosmos.SDK.EmulatorTests using System.Collections.Generic; using System.Net; using System.Threading.Tasks; + using Microsoft.Azure.Cosmos.Tracing; using Microsoft.VisualStudio.TestTools.UnitTesting; using Newtonsoft.Json.Linq; @@ -121,6 +122,114 @@ public async Task CreateItemAsync_WithBulk() } } + [TestMethod] + public async Task CreateItemAsyncValidateIntendedCollRid_WithBulk() + { + Container container = await this.database.CreateContainerAsync(Guid.NewGuid().ToString(), "/pk", 10000); + + List>> tasks = new List>>(); + + ContainerInlineCore containerInternal = (ContainerInlineCore)container; + + string rid = await containerInternal.GetCachedRIDAsync(forceRefresh: false, NoOpTrace.Singleton, cancellationToken: default); + + // case 1. use wrong rid by using a stale rid. + ItemRequestOptions itemRequestOptions = new ItemRequestOptions() + { + AddRequestHeaders = (headers) => + { + headers[Documents.HttpConstants.HttpHeaders.IsClientEncrypted] = bool.TrueString; + headers[Documents.WFConstants.BackendHeaders.IntendedCollectionRid] = rid; + } + }; + + // delete the container. + using (await this.database.GetContainer(container.Id).DeleteContainerStreamAsync()) + { } + + // recreate with same id. + await this.database.CreateContainerAsync(container.Id, "/pk", 10000); + + + for (int i = 0; i < 2; i++) + { + tasks.Add(ExecuteCreateAsync(container, CreateItem(i.ToString()), itemRequestOptions)); + } + + try + { + await Task.WhenAll(tasks); + Assert.Fail("Bulk execution should have failed. "); + } + catch(CosmosException ex) + { + if(ex.StatusCode == HttpStatusCode.Created || ex.SubStatusCode != 1024) + { + Assert.Fail("Bulk execution should have failed with these specific status codes. "); + } + } + + // case 2. + tasks.Clear(); + + // should ignore if the item is not encrypted. + itemRequestOptions = new ItemRequestOptions() + { + AddRequestHeaders = (headers) => + { + headers[Documents.HttpConstants.HttpHeaders.IsClientEncrypted] = bool.FalseString; + headers[Documents.WFConstants.BackendHeaders.IntendedCollectionRid] = rid; + } + }; + + for (int i = 0; i < 2; i++) + { + tasks.Add(ExecuteCreateAsync(container, CreateItem(i.ToString()), itemRequestOptions)); + } + + await Task.WhenAll(tasks); + + for (int i = 0; i < 2; i++) + { + Task> task = tasks[i]; + ItemResponse result = await task; + Assert.IsTrue(result.Headers.RequestCharge > 0); + Assert.IsFalse(string.IsNullOrEmpty(result.Diagnostics.ToString())); + Assert.AreEqual(HttpStatusCode.Created, result.StatusCode); + } + + // case 3. + tasks.Clear(); + + // use the correct rid. + rid = await containerInternal.GetCachedRIDAsync(forceRefresh: false, NoOpTrace.Singleton, cancellationToken: default); + + itemRequestOptions = new ItemRequestOptions() + { + AddRequestHeaders = (headers) => + { + headers[Documents.HttpConstants.HttpHeaders.IsClientEncrypted] = bool.TrueString; + headers[Documents.WFConstants.BackendHeaders.IntendedCollectionRid] = rid; + } + }; + + for (int i = 3; i < 8; i++) + { + tasks.Add(ExecuteCreateAsync(container, CreateItem(i.ToString()), itemRequestOptions)); + } + + await Task.WhenAll(tasks); + + for (int i = 0; i < 5; i++) + { + Task> task = tasks[i]; + ItemResponse result = await task; + Assert.IsTrue(result.Headers.RequestCharge > 0); + Assert.IsFalse(string.IsNullOrEmpty(result.Diagnostics.ToString())); + Assert.AreEqual(HttpStatusCode.Created, result.StatusCode); + } + } + [TestMethod] public async Task CreateItemJObjectWithoutPK_WithBulk() { @@ -528,9 +637,9 @@ private async Task CreateLargeItemStreamWithBulk(int appxItemSize) } } - private static Task> ExecuteCreateAsync(Container container, ToDoActivity item) + private static Task> ExecuteCreateAsync(Container container, ToDoActivity item, ItemRequestOptions itemRequestOptions = null) { - return container.CreateItemAsync(item, new PartitionKey(item.pk)); + return container.CreateItemAsync(item, new PartitionKey(item.pk), itemRequestOptions); } private static Task> ExecuteCreateAsync(Container container, JObject item) diff --git a/Microsoft.Azure.Cosmos/tests/Microsoft.Azure.Cosmos.EmulatorTests/CosmosReadManyItemsTests.cs b/Microsoft.Azure.Cosmos/tests/Microsoft.Azure.Cosmos.EmulatorTests/CosmosReadManyItemsTests.cs index 5e3975f828..4a406c29d1 100644 --- a/Microsoft.Azure.Cosmos/tests/Microsoft.Azure.Cosmos.EmulatorTests/CosmosReadManyItemsTests.cs +++ b/Microsoft.Azure.Cosmos/tests/Microsoft.Azure.Cosmos.EmulatorTests/CosmosReadManyItemsTests.cs @@ -271,8 +271,8 @@ public async Task ReadManyTestWithIncorrectIntendedContainerRid() { AddRequestHeaders = (headers) => { - headers["x-ms-cosmos-is-client-encrypted"] = bool.TrueString; - headers["x-ms-cosmos-intended-collection-rid"] = "iCoRrecTrID="; + headers[Documents.HttpConstants.HttpHeaders.IsClientEncrypted] = bool.TrueString; + headers[Documents.WFConstants.BackendHeaders.IntendedCollectionRid] = "iCoRrecTrID="; } }; @@ -307,8 +307,8 @@ public async Task ReadManyTestWithIncorrectIntendedContainerRid() { AddRequestHeaders = (headers) => { - headers["x-ms-cosmos-is-client-encrypted"] = bool.TrueString; - headers["x-ms-cosmos-intended-collection-rid"] = rid; + headers[Documents.HttpConstants.HttpHeaders.IsClientEncrypted] = bool.TrueString; + headers[Documents.WFConstants.BackendHeaders.IntendedCollectionRid] = rid; } }; diff --git a/Microsoft.Azure.Cosmos/tests/Microsoft.Azure.Cosmos.Tests/Batch/PartitionKeyRangeServerBatchRequestTests.cs b/Microsoft.Azure.Cosmos/tests/Microsoft.Azure.Cosmos.Tests/Batch/PartitionKeyRangeServerBatchRequestTests.cs index bfbbed3ec5..e2ddfd3e13 100644 --- a/Microsoft.Azure.Cosmos/tests/Microsoft.Azure.Cosmos.Tests/Batch/PartitionKeyRangeServerBatchRequestTests.cs +++ b/Microsoft.Azure.Cosmos/tests/Microsoft.Azure.Cosmos.Tests/Batch/PartitionKeyRangeServerBatchRequestTests.cs @@ -37,6 +37,8 @@ public async Task FitsAllOperations() 2, false, MockCosmosUtil.Serializer, + isClientEncrypted: false, + intendedCollectionRidValue: null, default(CancellationToken)); Assert.AreEqual(operations.Count, request.Operations.Count); @@ -65,6 +67,8 @@ public async Task OverflowsBasedOnCount() 1, false, MockCosmosUtil.Serializer, + isClientEncrypted: false, + intendedCollectionRidValue: null, default(CancellationToken)); Assert.AreEqual(1, request.Operations.Count); @@ -96,6 +100,8 @@ public async Task OverflowsBasedOnCount_WithOffset() 1, false, MockCosmosUtil.Serializer, + isClientEncrypted: false, + intendedCollectionRidValue: null, default(CancellationToken)); Assert.AreEqual(1, request.Operations.Count); @@ -170,6 +176,8 @@ private static async Task