diff --git a/Microsoft.Azure.Cosmos/src/Resource/Database/DatabaseCore.cs b/Microsoft.Azure.Cosmos/src/Resource/Database/DatabaseCore.cs
index d7241314b8..554330b058 100644
--- a/Microsoft.Azure.Cosmos/src/Resource/Database/DatabaseCore.cs
+++ b/Microsoft.Azure.Cosmos/src/Resource/Database/DatabaseCore.cs
@@ -1,894 +1,894 @@
-//------------------------------------------------------------
-// Copyright (c) Microsoft Corporation. All rights reserved.
-//------------------------------------------------------------
-
-namespace Microsoft.Azure.Cosmos
-{
- using System;
- using System.Diagnostics;
- using System.IO;
- using System.Linq;
- using System.Net;
- using System.Threading;
- using System.Threading.Tasks;
- using Microsoft.Azure.Cosmos.Fluent;
- using Microsoft.Azure.Cosmos.Tracing;
- using Microsoft.Azure.Documents;
-
-#if PREVIEW
- using System.Collections.Generic;
-#endif
-
- ///
- /// Operations for reading or deleting an existing database.
- ///
- /// for or creating new databases, and reading/querying all databases; use `client.Databases`.
- ///
- internal abstract class DatabaseCore : DatabaseInternal
- {
- protected DatabaseCore(
- CosmosClientContext clientContext,
- string databaseId)
- {
- this.Id = databaseId;
- this.ClientContext = clientContext;
- this.LinkUri = clientContext.CreateLink(
- parentLink: null,
- uriPathSegment: Paths.DatabasesPathSegment,
- id: databaseId);
- }
-
- public override string Id { get; }
-
- public override CosmosClient Client => this.ClientContext.Client;
-
- internal override string LinkUri { get; }
-
- internal override CosmosClientContext ClientContext { get; }
-
- public async Task ReadAsync(
- RequestOptions requestOptions,
- ITrace trace,
- CancellationToken cancellationToken)
- {
- ResponseMessage response = await this.ReadStreamAsync(
- requestOptions: requestOptions,
- trace: trace,
- cancellationToken: cancellationToken);
-
- return this.ClientContext.ResponseFactory.CreateDatabaseResponse(this, response);
- }
-
- public async Task DeleteAsync(
- RequestOptions requestOptions,
- ITrace trace,
- CancellationToken cancellationToken)
- {
- ResponseMessage response = await this.DeleteStreamAsync(
- requestOptions: requestOptions,
- trace: trace,
- cancellationToken: cancellationToken);
-
- return this.ClientContext.ResponseFactory.CreateDatabaseResponse(this, response);
- }
-
- public async Task ReadThroughputAsync(
- ITrace trace,
- CancellationToken cancellationToken)
- {
- ThroughputResponse response = await this.ReadThroughputIfExistsAsync(null, cancellationToken);
- return response.Resource?.Throughput;
- }
-
- public async Task ReadThroughputAsync(
- RequestOptions requestOptions,
- ITrace trace,
- CancellationToken cancellationToken)
- {
- string rid = await this.GetRIDAsync(cancellationToken);
- CosmosOffers cosmosOffers = new CosmosOffers(this.ClientContext);
- return await cosmosOffers.ReadThroughputAsync(
- targetRID: rid,
- requestOptions: requestOptions,
- cancellationToken: cancellationToken);
- }
-
- internal override async Task ReadThroughputIfExistsAsync(
- RequestOptions requestOptions,
- CancellationToken cancellationToken)
- {
- string rid = await this.GetRIDAsync(cancellationToken);
- CosmosOffers cosmosOffers = new CosmosOffers(this.ClientContext);
- return await cosmosOffers.ReadThroughputIfExistsAsync(targetRID: rid, requestOptions: requestOptions, cancellationToken: cancellationToken);
- }
-
- public async Task ReplaceThroughputAsync(
- int throughput,
- RequestOptions requestOptions,
- ITrace trace,
- CancellationToken cancellationToken)
- {
- string rid = await this.GetRIDAsync(cancellationToken);
- CosmosOffers cosmosOffers = new CosmosOffers(this.ClientContext);
- return await cosmosOffers.ReplaceThroughputAsync(
- targetRID: rid,
- throughput: throughput,
- requestOptions: requestOptions,
- cancellationToken: cancellationToken);
- }
-
- internal override async Task ReplaceThroughputIfExistsAsync(
- int throughput,
- RequestOptions requestOptions,
- CancellationToken cancellationToken)
- {
- string rid = await this.GetRIDAsync(cancellationToken);
- CosmosOffers cosmosOffers = new CosmosOffers(this.ClientContext);
- return await cosmosOffers.ReplaceThroughputIfExistsAsync(
- targetRID: rid,
- throughput: throughput,
- requestOptions: requestOptions,
- cancellationToken: cancellationToken);
- }
-
- public Task CreateContainerStreamAsync(
- ContainerProperties containerProperties,
- ThroughputProperties throughputProperties,
- RequestOptions requestOptions,
- ITrace trace,
- CancellationToken cancellationToken)
- {
- if (containerProperties == null)
- {
- throw new ArgumentNullException(nameof(containerProperties));
- }
-
- this.ValidateContainerProperties(containerProperties);
-
- return this.ProcessCollectionCreateAsync(
- streamPayload: this.ClientContext.SerializerCore.ToStream(containerProperties),
- throughputProperties: throughputProperties,
- requestOptions: requestOptions,
- trace: trace,
- cancellationToken: cancellationToken);
- }
-
- public async Task CreateContainerAsync(
- ContainerProperties containerProperties,
- ThroughputProperties throughputProperties,
- RequestOptions requestOptions,
- ITrace trace,
- CancellationToken cancellationToken = default)
- {
- if (containerProperties == null)
- {
- throw new ArgumentNullException(nameof(containerProperties));
- }
-
- this.ValidateContainerProperties(containerProperties);
-
- ResponseMessage response = await this.ProcessCollectionCreateAsync(
- streamPayload: this.ClientContext.SerializerCore.ToStream(containerProperties),
- throughputProperties: throughputProperties,
- requestOptions: requestOptions,
- trace: trace,
- cancellationToken: cancellationToken);
-
- return this.ClientContext.ResponseFactory.CreateContainerResponse(this.GetContainer(containerProperties.Id), response);
- }
-
- public async Task CreateContainerIfNotExistsAsync(
- ContainerProperties containerProperties,
- ThroughputProperties throughputProperties,
- RequestOptions requestOptions,
- ITrace trace,
- CancellationToken cancellationToken)
- {
- if (containerProperties == null)
- {
- throw new ArgumentNullException(nameof(containerProperties));
- }
-
- this.ValidateContainerProperties(containerProperties);
-
- double totalRequestCharge = 0;
- ContainerCore container = (ContainerCore)this.GetContainer(containerProperties.Id);
- using (ResponseMessage readResponse = await container.ReadContainerStreamAsync(
- requestOptions: requestOptions,
- trace: trace,
- cancellationToken: cancellationToken))
- {
- totalRequestCharge = readResponse.Headers.RequestCharge;
-
- if (readResponse.StatusCode != HttpStatusCode.NotFound)
- {
- ContainerResponse retrivedContainerResponse = this.ClientContext.ResponseFactory.CreateContainerResponse(
- container,
- readResponse);
-
- if (containerProperties.PartitionKey.Kind != Documents.PartitionKind.MultiHash)
- {
- if (!retrivedContainerResponse.Resource.PartitionKeyPath.Equals(containerProperties.PartitionKeyPath))
- {
- throw new ArgumentException(
- string.Format(
- ClientResources.PartitionKeyPathConflict,
- containerProperties.PartitionKeyPath,
- containerProperties.Id,
- retrivedContainerResponse.Resource.PartitionKeyPath),
- nameof(containerProperties.PartitionKey));
- }
- }
-#if PREVIEW
- else
- {
- IReadOnlyList retrivedPartitionKeyPaths = retrivedContainerResponse.Resource.PartitionKeyPaths;
- IReadOnlyList receivedPartitionKeyPaths = containerProperties.PartitionKeyPaths;
-
- if (retrivedPartitionKeyPaths.Count != receivedPartitionKeyPaths.Count || !Enumerable.SequenceEqual(retrivedPartitionKeyPaths, receivedPartitionKeyPaths))
- {
- throw new ArgumentException(
- string.Format(
- ClientResources.PartitionKeyPathConflict,
- string.Join(",", containerProperties.PartitionKeyPaths),
- containerProperties.Id,
- string.Join(",", retrivedContainerResponse.Resource.PartitionKeyPaths)),
- nameof(containerProperties.PartitionKey));
- }
- }
-#endif
- return retrivedContainerResponse;
- }
- }
-
- this.ValidateContainerProperties(containerProperties);
- using (ResponseMessage createResponse = await this.CreateContainerStreamAsync(
- containerProperties,
- throughputProperties,
- requestOptions,
- trace,
- cancellationToken))
- {
- totalRequestCharge += createResponse.Headers.RequestCharge;
- createResponse.Headers.RequestCharge = totalRequestCharge;
-
- if (createResponse.StatusCode != HttpStatusCode.Conflict)
- {
- return this.ClientContext.ResponseFactory.CreateContainerResponse(container, createResponse);
- }
- }
-
- // This second Read is to handle the race condition when 2 or more threads have Read the database and only one succeeds with Create
- // so for the remaining ones we should do a Read instead of throwing Conflict exception
- using (ResponseMessage readResponseAfterCreate = await container.ReadContainerStreamAsync(
- requestOptions: requestOptions,
- trace: trace,
- cancellationToken: cancellationToken))
- {
- totalRequestCharge += readResponseAfterCreate.Headers.RequestCharge;
- readResponseAfterCreate.Headers.RequestCharge = totalRequestCharge;
-
- return this.ClientContext.ResponseFactory.CreateContainerResponse(container, readResponseAfterCreate);
- }
- }
-
- public async Task ReplaceThroughputAsync(
- ThroughputProperties throughputProperties,
- RequestOptions requestOptions,
- ITrace trace,
- CancellationToken cancellationToken = default)
- {
- string rid = await this.GetRIDAsync(cancellationToken);
- CosmosOffers cosmosOffers = new CosmosOffers(this.ClientContext);
- return await cosmosOffers.ReplaceThroughputPropertiesAsync(
- targetRID: rid,
- throughputProperties: throughputProperties,
- requestOptions: requestOptions,
- cancellationToken: cancellationToken);
- }
-
- internal override async Task ReplaceThroughputPropertiesIfExistsAsync(
- ThroughputProperties throughputProperties,
- RequestOptions requestOptions,
- CancellationToken cancellationToken)
- {
- string rid = await this.GetRIDAsync(cancellationToken);
- CosmosOffers cosmosOffers = new CosmosOffers(this.ClientContext);
- return await cosmosOffers.ReplaceThroughputPropertiesIfExistsAsync(
- targetRID: rid,
- throughputProperties: throughputProperties,
- requestOptions: requestOptions,
- cancellationToken: cancellationToken);
- }
-
- public Task ReadStreamAsync(
- RequestOptions requestOptions,
- ITrace trace,
- CancellationToken cancellationToken)
- {
- return this.ProcessResourceOperationStreamAsync(
- streamPayload: null,
- operationType: OperationType.Read,
- linkUri: this.LinkUri,
- resourceType: ResourceType.Database,
- requestOptions: requestOptions,
- trace: trace,
- cancellationToken: cancellationToken);
- }
-
- public Task DeleteStreamAsync(
- RequestOptions requestOptions,
- ITrace trace,
- CancellationToken cancellationToken)
- {
- return this.ProcessResourceOperationStreamAsync(
- streamPayload: null,
- operationType: OperationType.Delete,
- linkUri: this.LinkUri,
- resourceType: ResourceType.Database,
- requestOptions: requestOptions,
- trace: trace,
- cancellationToken: cancellationToken);
- }
-
- public async Task CreateContainerAsync(
- ContainerProperties containerProperties,
- int? throughput,
- RequestOptions requestOptions,
- ITrace trace,
- CancellationToken cancellationToken)
- {
- if (containerProperties == null)
- {
- throw new ArgumentNullException(nameof(containerProperties));
- }
-
- this.ValidateContainerProperties(containerProperties);
-
- ResponseMessage response = await this.ProcessCollectionCreateAsync(
- streamPayload: this.ClientContext.SerializerCore.ToStream(containerProperties),
- throughput: throughput,
- requestOptions: requestOptions,
- trace: trace,
- cancellationToken: cancellationToken);
-
- return this.ClientContext.ResponseFactory.CreateContainerResponse(this.GetContainer(containerProperties.Id), response);
- }
-
- public Task CreateContainerAsync(
- string id,
- string partitionKeyPath,
- int? throughput,
- RequestOptions requestOptions,
- ITrace trace,
- CancellationToken cancellationToken)
- {
- if (string.IsNullOrEmpty(id))
- {
- throw new ArgumentNullException(nameof(id));
- }
-
- if (string.IsNullOrEmpty(partitionKeyPath))
- {
- throw new ArgumentNullException(nameof(partitionKeyPath));
- }
-
- ContainerProperties containerProperties = new ContainerProperties(id, partitionKeyPath);
-
- return this.CreateContainerAsync(
- containerProperties,
- throughput,
- requestOptions,
- trace,
- cancellationToken);
- }
-
- public Task CreateContainerIfNotExistsAsync(
- ContainerProperties containerProperties,
- int? throughput,
- RequestOptions requestOptions,
- ITrace trace,
- CancellationToken cancellationToken)
- {
- if (containerProperties == null)
- {
- throw new ArgumentNullException(nameof(containerProperties));
- }
-
- return this.CreateContainerIfNotExistsAsync(
- containerProperties,
- ThroughputProperties.CreateManualThroughput(throughput),
- requestOptions,
- trace,
- cancellationToken);
- }
-
- public Task CreateContainerIfNotExistsAsync(
- string id,
- string partitionKeyPath,
- int? throughput,
- RequestOptions requestOptions,
- ITrace trace,
- CancellationToken cancellationToken)
- {
- if (string.IsNullOrEmpty(id))
- {
- throw new ArgumentNullException(nameof(id));
- }
-
- if (string.IsNullOrEmpty(partitionKeyPath))
- {
- throw new ArgumentNullException(nameof(partitionKeyPath));
- }
-
- ContainerProperties containerProperties = new ContainerProperties(id, partitionKeyPath);
- return this.CreateContainerIfNotExistsAsync(containerProperties, throughput, requestOptions, trace, cancellationToken);
- }
-
- public override Container GetContainer(string id)
- {
- if (string.IsNullOrEmpty(id))
- {
- throw new ArgumentNullException(nameof(id));
- }
-
- return new ContainerInlineCore(
- this.ClientContext,
- this,
- id);
- }
-
- public Task CreateContainerStreamAsync(
- ContainerProperties containerProperties,
- int? throughput,
- RequestOptions requestOptions,
- ITrace trace,
- CancellationToken cancellationToken)
- {
- if (containerProperties == null)
- {
- throw new ArgumentNullException(nameof(containerProperties));
- }
-
- this.ValidateContainerProperties(containerProperties);
-
- Stream streamPayload = this.ClientContext.SerializerCore.ToStream(containerProperties);
- return this.ProcessCollectionCreateAsync(
- streamPayload,
- throughput,
- requestOptions,
- trace,
- cancellationToken);
- }
-
- public async Task CreateUserAsync(
- string id,
- RequestOptions requestOptions,
- ITrace trace,
- CancellationToken cancellationToken)
- {
- if (string.IsNullOrEmpty(id))
- {
- throw new ArgumentNullException(nameof(id));
- }
-
- UserProperties userProperties = new UserProperties(id);
-
- ResponseMessage response = await this.CreateUserStreamAsync(
- userProperties: userProperties,
- requestOptions: requestOptions,
- trace: trace,
- cancellationToken: cancellationToken);
-
- return this.ClientContext.ResponseFactory.CreateUserResponse(this.GetUser(userProperties.Id), response);
- }
-
- public override User GetUser(string id)
- {
- if (string.IsNullOrEmpty(id))
- {
- throw new ArgumentNullException(nameof(id));
- }
-
- return new UserInlineCore(
- this.ClientContext,
- this,
- id);
- }
-
- public Task CreateUserStreamAsync(
- UserProperties userProperties,
- RequestOptions requestOptions,
- ITrace trace,
- CancellationToken cancellationToken)
- {
- if (userProperties == null)
- {
- throw new ArgumentNullException(nameof(userProperties));
- }
-
- this.ClientContext.ValidateResource(userProperties.Id);
-
- Stream streamPayload = this.ClientContext.SerializerCore.ToStream(userProperties);
- return this.ProcessUserCreateAsync(
- streamPayload: streamPayload,
- requestOptions: requestOptions,
- trace: trace,
- cancellationToken: cancellationToken);
- }
-
- public async Task UpsertUserAsync(
- string id,
- RequestOptions requestOptions,
- ITrace trace,
- CancellationToken cancellationToken)
- {
- if (string.IsNullOrEmpty(id))
- {
- throw new ArgumentNullException(nameof(id));
- }
-
- this.ClientContext.ValidateResource(id);
-
- ResponseMessage response = await this.ProcessUserUpsertAsync(
- streamPayload: this.ClientContext.SerializerCore.ToStream(new UserProperties(id)),
- requestOptions: requestOptions,
- trace: trace,
- cancellationToken: cancellationToken);
-
- return this.ClientContext.ResponseFactory.CreateUserResponse(this.GetUser(id), response);
- }
-
- public override FeedIterator GetContainerQueryStreamIterator(
- string queryText = null,
- string continuationToken = null,
- QueryRequestOptions requestOptions = null)
- {
- QueryDefinition queryDefinition = null;
- if (queryText != null)
- {
- queryDefinition = new QueryDefinition(queryText);
- }
-
- return this.GetContainerQueryStreamIterator(
- queryDefinition,
- continuationToken,
- requestOptions);
- }
-
- public override FeedIterator GetContainerQueryIterator(
- string queryText = null,
- string continuationToken = null,
- QueryRequestOptions requestOptions = null)
- {
- QueryDefinition queryDefinition = null;
- if (queryText != null)
- {
- queryDefinition = new QueryDefinition(queryText);
- }
-
- return this.GetContainerQueryIterator(
- queryDefinition,
- continuationToken,
- requestOptions);
- }
-
- public override FeedIterator GetContainerQueryStreamIterator(
- QueryDefinition queryDefinition,
- string continuationToken = null,
- QueryRequestOptions requestOptions = null)
- {
- return new FeedIteratorCore(
- clientContext: this.ClientContext,
- resourceLink: this.LinkUri,
- resourceType: ResourceType.Collection,
- queryDefinition: queryDefinition,
- continuationToken: continuationToken,
- options: requestOptions);
- }
-
- public override FeedIterator GetContainerQueryIterator(
- QueryDefinition queryDefinition,
- string continuationToken = null,
- QueryRequestOptions requestOptions = null)
- {
- if (!(this.GetContainerQueryStreamIterator(
- queryDefinition,
- continuationToken,
- requestOptions) is FeedIteratorInternal containerStreamIterator))
- {
- // This class should inherit from DatabaseInteral to avoid the downcasting hacks.
- throw new InvalidOperationException($"Expected FeedIteratorInternal.");
- }
-
- return new FeedIteratorCore(
- containerStreamIterator,
- (response) => this.ClientContext.ResponseFactory.CreateQueryFeedResponse(
- responseMessage: response,
- resourceType: ResourceType.Collection));
- }
-
- public override FeedIterator GetUserQueryIterator(
- QueryDefinition queryDefinition,
- string continuationToken = null,
- QueryRequestOptions requestOptions = null)
- {
- if (!(this.GetUserQueryStreamIterator(
- queryDefinition,
- continuationToken,
- requestOptions) is FeedIteratorInternal userStreamIterator))
- {
- // This class should inherit from DatabaseInteral to avoid the downcasting hacks.
- throw new InvalidOperationException($"Expected FeedIteratorInternal.");
- }
-
- return new FeedIteratorCore(
- userStreamIterator,
- (response) => this.ClientContext.ResponseFactory.CreateQueryFeedResponse(
- responseMessage: response,
- resourceType: ResourceType.User));
- }
-
- public override FeedIterator GetUserQueryStreamIterator(
- QueryDefinition queryDefinition,
- string continuationToken = null,
- QueryRequestOptions requestOptions = null)
- {
- return new FeedIteratorCore(
- clientContext: this.ClientContext,
- resourceLink: this.LinkUri,
- resourceType: ResourceType.User,
- queryDefinition: queryDefinition,
- continuationToken: continuationToken,
- options: requestOptions);
- }
-
- public override FeedIterator GetUserQueryIterator(
- string queryText = null,
- string continuationToken = null,
- QueryRequestOptions requestOptions = null)
- {
- QueryDefinition queryDefinition = null;
- if (queryText != null)
- {
- queryDefinition = new QueryDefinition(queryText);
- }
-
- return this.GetUserQueryIterator(
- queryDefinition,
- continuationToken,
- requestOptions);
- }
-
- public override FeedIterator GetUserQueryStreamIterator(
- string queryText = null,
- string continuationToken = null,
- QueryRequestOptions requestOptions = null)
- {
- QueryDefinition queryDefinition = null;
- if (queryText != null)
- {
- queryDefinition = new QueryDefinition(queryText);
- }
-
- return this.GetUserQueryStreamIterator(
- queryDefinition,
- continuationToken,
- requestOptions);
- }
-
- public override ContainerBuilder DefineContainer(
- string name,
- string partitionKeyPath)
- {
- return new ContainerBuilder(this, name, partitionKeyPath);
- }
-
- public override ClientEncryptionKey GetClientEncryptionKey(string id)
- {
- if (string.IsNullOrEmpty(id))
- {
- throw new ArgumentNullException(nameof(id));
- }
-
- return new ClientEncryptionKeyInlineCore(
- this.ClientContext,
- this,
- id);
- }
-
- public override FeedIterator GetClientEncryptionKeyQueryIterator(
- QueryDefinition queryDefinition,
- string continuationToken = null,
- QueryRequestOptions requestOptions = null)
- {
- if (!(this.GetClientEncryptionKeyQueryStreamIterator(
- queryDefinition: queryDefinition,
- continuationToken: continuationToken,
- requestOptions: requestOptions) is FeedIteratorInternal cekStreamIterator))
- {
- throw new InvalidOperationException($"Expected FeedIteratorInternal.");
- }
-
- return new FeedIteratorCore(
- cekStreamIterator,
- (responseMessage) =>
- {
- FeedResponse results = this.ClientContext.ResponseFactory.CreateQueryFeedResponse(responseMessage, ResourceType.ClientEncryptionKey);
- return results;
- });
- }
-
- private FeedIterator GetClientEncryptionKeyQueryStreamIterator(
- QueryDefinition queryDefinition,
- string continuationToken = null,
- QueryRequestOptions requestOptions = null)
- {
- return new FeedIteratorCore(
- clientContext: this.ClientContext,
- resourceLink: this.LinkUri,
- resourceType: ResourceType.ClientEncryptionKey,
- queryDefinition: queryDefinition,
- continuationToken: continuationToken,
- options: requestOptions);
- }
-
- public async Task CreateClientEncryptionKeyAsync(
- ITrace trace,
- ClientEncryptionKeyProperties clientEncryptionKeyProperties,
- RequestOptions requestOptions = null,
- CancellationToken cancellationToken = default)
- {
- Stream streamPayload = this.ClientContext.SerializerCore.ToStream(clientEncryptionKeyProperties);
- ResponseMessage responseMessage = await this.CreateClientEncryptionKeyStreamAsync(
- trace: trace,
- streamPayload: streamPayload,
- requestOptions: requestOptions,
- cancellationToken: cancellationToken);
-
- ClientEncryptionKeyResponse cekResponse = this.ClientContext.ResponseFactory.CreateClientEncryptionKeyResponse(
- this.GetClientEncryptionKey(clientEncryptionKeyProperties.Id),
- responseMessage);
-
- Debug.Assert(cekResponse.Resource != null);
-
- return cekResponse;
- }
-
- private void ValidateContainerProperties(ContainerProperties containerProperties)
- {
- containerProperties.ValidateRequiredProperties();
- this.ClientContext.ValidateResource(containerProperties.Id);
- }
-
- private Task ProcessCollectionCreateAsync(
- Stream streamPayload,
- ThroughputProperties throughputProperties,
- RequestOptions requestOptions,
- ITrace trace,
- CancellationToken cancellationToken)
- {
- return this.ClientContext.ProcessResourceOperationStreamAsync(
- resourceUri: this.LinkUri,
- resourceType: ResourceType.Collection,
- operationType: OperationType.Create,
- cosmosContainerCore: null,
- feedRange: null,
- streamPayload: streamPayload,
- requestOptions: requestOptions,
- requestEnricher: (httpRequestMessage) => httpRequestMessage.AddThroughputPropertiesHeader(throughputProperties),
- trace: trace,
- cancellationToken: cancellationToken);
- }
-
- private Task ProcessCollectionCreateAsync(
- Stream streamPayload,
- int? throughput,
- RequestOptions requestOptions,
- ITrace trace,
- CancellationToken cancellationToken)
- {
- return this.ClientContext.ProcessResourceOperationStreamAsync(
- resourceUri: this.LinkUri,
- resourceType: ResourceType.Collection,
- operationType: OperationType.Create,
- cosmosContainerCore: null,
- feedRange: null,
- streamPayload: streamPayload,
- requestOptions: requestOptions,
- requestEnricher: (httpRequestMessage) => httpRequestMessage.AddThroughputHeader(throughput),
- trace: trace,
- cancellationToken: cancellationToken);
- }
-
- private Task ProcessUserCreateAsync(
- Stream streamPayload,
- RequestOptions requestOptions,
- ITrace trace,
- CancellationToken cancellationToken)
- {
- return this.ClientContext.ProcessResourceOperationStreamAsync(
- resourceUri: this.LinkUri,
- resourceType: ResourceType.User,
- operationType: OperationType.Create,
- cosmosContainerCore: null,
- feedRange: null,
- streamPayload: streamPayload,
- requestOptions: requestOptions,
- requestEnricher: null,
- trace: trace,
- cancellationToken: cancellationToken);
- }
-
- private Task ProcessUserUpsertAsync(
- Stream streamPayload,
- RequestOptions requestOptions,
- ITrace trace,
- CancellationToken cancellationToken)
- {
- return this.ClientContext.ProcessResourceOperationStreamAsync(
- resourceUri: this.LinkUri,
- resourceType: ResourceType.User,
- operationType: OperationType.Upsert,
- cosmosContainerCore: null,
- feedRange: null,
- streamPayload: streamPayload,
- requestOptions: requestOptions,
- requestEnricher: null,
- trace: trace,
- cancellationToken: cancellationToken);
- }
-
- internal override async Task GetRIDAsync(CancellationToken cancellationToken)
- {
- DatabaseResponse databaseResponse = await this.ReadAsync(cancellationToken: cancellationToken);
- return databaseResponse?.Resource?.ResourceId;
- }
-
- private Task CreateClientEncryptionKeyStreamAsync(
- ITrace trace,
- Stream streamPayload,
- RequestOptions requestOptions = null,
- CancellationToken cancellationToken = default)
- {
- if (streamPayload == null)
- {
- throw new ArgumentNullException(nameof(streamPayload));
- }
-
- return this.ClientContext.ProcessResourceOperationStreamAsync(
- resourceUri: this.LinkUri,
- resourceType: ResourceType.ClientEncryptionKey,
- operationType: OperationType.Create,
- cosmosContainerCore: null,
- feedRange: null,
- streamPayload: streamPayload,
- requestOptions: requestOptions,
- requestEnricher: null,
- trace: trace,
- cancellationToken: cancellationToken);
- }
-
- private Task ProcessResourceOperationStreamAsync(
- Stream streamPayload,
- OperationType operationType,
- string linkUri,
- ResourceType resourceType,
- RequestOptions requestOptions,
- ITrace trace,
- CancellationToken cancellationToken)
- {
- return this.ClientContext.ProcessResourceOperationStreamAsync(
- resourceUri: linkUri,
- resourceType: resourceType,
- operationType: operationType,
- cosmosContainerCore: null,
- feedRange: null,
- streamPayload: streamPayload,
- requestOptions: requestOptions,
- requestEnricher: null,
- trace: trace,
- cancellationToken: cancellationToken);
- }
- }
-}
+//------------------------------------------------------------
+// Copyright (c) Microsoft Corporation. All rights reserved.
+//------------------------------------------------------------
+
+namespace Microsoft.Azure.Cosmos
+{
+ using System;
+ using System.Diagnostics;
+ using System.IO;
+ using System.Linq;
+ using System.Net;
+ using System.Threading;
+ using System.Threading.Tasks;
+ using Microsoft.Azure.Cosmos.Fluent;
+ using Microsoft.Azure.Cosmos.Tracing;
+ using Microsoft.Azure.Documents;
+
+#if PREVIEW
+ using System.Collections.Generic;
+#endif
+
+ ///
+ /// Operations for reading or deleting an existing database.
+ ///
+ /// for or creating new databases, and reading/querying all databases; use `client.Databases`.
+ ///
+ internal abstract class DatabaseCore : DatabaseInternal
+ {
+ protected DatabaseCore(
+ CosmosClientContext clientContext,
+ string databaseId)
+ {
+ this.Id = databaseId;
+ this.ClientContext = clientContext;
+ this.LinkUri = clientContext.CreateLink(
+ parentLink: null,
+ uriPathSegment: Paths.DatabasesPathSegment,
+ id: databaseId);
+ }
+
+ public override string Id { get; }
+
+ public override CosmosClient Client => this.ClientContext.Client;
+
+ internal override string LinkUri { get; }
+
+ internal override CosmosClientContext ClientContext { get; }
+
+ public async Task ReadAsync(
+ RequestOptions requestOptions,
+ ITrace trace,
+ CancellationToken cancellationToken)
+ {
+ ResponseMessage response = await this.ReadStreamAsync(
+ requestOptions: requestOptions,
+ trace: trace,
+ cancellationToken: cancellationToken);
+
+ return this.ClientContext.ResponseFactory.CreateDatabaseResponse(this, response);
+ }
+
+ public async Task DeleteAsync(
+ RequestOptions requestOptions,
+ ITrace trace,
+ CancellationToken cancellationToken)
+ {
+ ResponseMessage response = await this.DeleteStreamAsync(
+ requestOptions: requestOptions,
+ trace: trace,
+ cancellationToken: cancellationToken);
+
+ return this.ClientContext.ResponseFactory.CreateDatabaseResponse(this, response);
+ }
+
+ public async Task ReadThroughputAsync(
+ ITrace trace,
+ CancellationToken cancellationToken)
+ {
+ ThroughputResponse response = await this.ReadThroughputIfExistsAsync(null, cancellationToken);
+ return response.Resource?.Throughput;
+ }
+
+ public async Task ReadThroughputAsync(
+ RequestOptions requestOptions,
+ ITrace trace,
+ CancellationToken cancellationToken)
+ {
+ string rid = await this.GetRIDAsync(cancellationToken);
+ CosmosOffers cosmosOffers = new CosmosOffers(this.ClientContext);
+ return await cosmosOffers.ReadThroughputAsync(
+ targetRID: rid,
+ requestOptions: requestOptions,
+ cancellationToken: cancellationToken);
+ }
+
+ internal override async Task ReadThroughputIfExistsAsync(
+ RequestOptions requestOptions,
+ CancellationToken cancellationToken)
+ {
+ string rid = await this.GetRIDAsync(cancellationToken);
+ CosmosOffers cosmosOffers = new CosmosOffers(this.ClientContext);
+ return await cosmosOffers.ReadThroughputIfExistsAsync(targetRID: rid, requestOptions: requestOptions, cancellationToken: cancellationToken);
+ }
+
+ public async Task ReplaceThroughputAsync(
+ int throughput,
+ RequestOptions requestOptions,
+ ITrace trace,
+ CancellationToken cancellationToken)
+ {
+ string rid = await this.GetRIDAsync(cancellationToken);
+ CosmosOffers cosmosOffers = new CosmosOffers(this.ClientContext);
+ return await cosmosOffers.ReplaceThroughputAsync(
+ targetRID: rid,
+ throughput: throughput,
+ requestOptions: requestOptions,
+ cancellationToken: cancellationToken);
+ }
+
+ internal override async Task ReplaceThroughputIfExistsAsync(
+ int throughput,
+ RequestOptions requestOptions,
+ CancellationToken cancellationToken)
+ {
+ string rid = await this.GetRIDAsync(cancellationToken);
+ CosmosOffers cosmosOffers = new CosmosOffers(this.ClientContext);
+ return await cosmosOffers.ReplaceThroughputIfExistsAsync(
+ targetRID: rid,
+ throughput: throughput,
+ requestOptions: requestOptions,
+ cancellationToken: cancellationToken);
+ }
+
+ public Task CreateContainerStreamAsync(
+ ContainerProperties containerProperties,
+ ThroughputProperties throughputProperties,
+ RequestOptions requestOptions,
+ ITrace trace,
+ CancellationToken cancellationToken)
+ {
+ if (containerProperties == null)
+ {
+ throw new ArgumentNullException(nameof(containerProperties));
+ }
+
+ this.ValidateContainerProperties(containerProperties);
+
+ return this.ProcessCollectionCreateAsync(
+ streamPayload: this.ClientContext.SerializerCore.ToStream(containerProperties),
+ throughputProperties: throughputProperties,
+ requestOptions: requestOptions,
+ trace: trace,
+ cancellationToken: cancellationToken);
+ }
+
+ public async Task CreateContainerAsync(
+ ContainerProperties containerProperties,
+ ThroughputProperties throughputProperties,
+ RequestOptions requestOptions,
+ ITrace trace,
+ CancellationToken cancellationToken = default)
+ {
+ if (containerProperties == null)
+ {
+ throw new ArgumentNullException(nameof(containerProperties));
+ }
+
+ this.ValidateContainerProperties(containerProperties);
+
+ ResponseMessage response = await this.ProcessCollectionCreateAsync(
+ streamPayload: this.ClientContext.SerializerCore.ToStream(containerProperties),
+ throughputProperties: throughputProperties,
+ requestOptions: requestOptions,
+ trace: trace,
+ cancellationToken: cancellationToken);
+
+ return this.ClientContext.ResponseFactory.CreateContainerResponse(this.GetContainer(containerProperties.Id), response);
+ }
+
+ public async Task CreateContainerIfNotExistsAsync(
+ ContainerProperties containerProperties,
+ ThroughputProperties throughputProperties,
+ RequestOptions requestOptions,
+ ITrace trace,
+ CancellationToken cancellationToken)
+ {
+ if (containerProperties == null)
+ {
+ throw new ArgumentNullException(nameof(containerProperties));
+ }
+
+ this.ValidateContainerProperties(containerProperties);
+
+ double totalRequestCharge = 0;
+ ContainerCore container = (ContainerCore)this.GetContainer(containerProperties.Id);
+ using (ResponseMessage readResponse = await container.ReadContainerStreamAsync(
+ requestOptions: requestOptions,
+ trace: trace,
+ cancellationToken: cancellationToken))
+ {
+ totalRequestCharge = readResponse.Headers.RequestCharge;
+
+ if (readResponse.StatusCode != HttpStatusCode.NotFound)
+ {
+ ContainerResponse retrivedContainerResponse = this.ClientContext.ResponseFactory.CreateContainerResponse(
+ container,
+ readResponse);
+
+ if (containerProperties.PartitionKey.Kind != Documents.PartitionKind.MultiHash)
+ {
+ if (!retrivedContainerResponse.Resource.PartitionKeyPath.Equals(containerProperties.PartitionKeyPath))
+ {
+ throw new ArgumentException(
+ string.Format(
+ ClientResources.PartitionKeyPathConflict,
+ containerProperties.PartitionKeyPath,
+ containerProperties.Id,
+ retrivedContainerResponse.Resource.PartitionKeyPath),
+ nameof(containerProperties.PartitionKey));
+ }
+ }
+#if PREVIEW
+ else
+ {
+ IReadOnlyList retrivedPartitionKeyPaths = retrivedContainerResponse.Resource.PartitionKeyPaths;
+ IReadOnlyList receivedPartitionKeyPaths = containerProperties.PartitionKeyPaths;
+
+ if (retrivedPartitionKeyPaths.Count != receivedPartitionKeyPaths.Count || !Enumerable.SequenceEqual(retrivedPartitionKeyPaths, receivedPartitionKeyPaths))
+ {
+ throw new ArgumentException(
+ string.Format(
+ ClientResources.PartitionKeyPathConflict,
+ string.Join(",", containerProperties.PartitionKeyPaths),
+ containerProperties.Id,
+ string.Join(",", retrivedContainerResponse.Resource.PartitionKeyPaths)),
+ nameof(containerProperties.PartitionKey));
+ }
+ }
+#endif
+ return retrivedContainerResponse;
+ }
+ }
+
+ this.ValidateContainerProperties(containerProperties);
+ using (ResponseMessage createResponse = await this.CreateContainerStreamAsync(
+ containerProperties,
+ throughputProperties,
+ requestOptions,
+ trace,
+ cancellationToken))
+ {
+ totalRequestCharge += createResponse.Headers.RequestCharge;
+ createResponse.Headers.RequestCharge = totalRequestCharge;
+
+ if (createResponse.StatusCode != HttpStatusCode.Conflict)
+ {
+ return this.ClientContext.ResponseFactory.CreateContainerResponse(container, createResponse);
+ }
+ }
+
+ // This second Read is to handle the race condition when 2 or more threads have Read the database and only one succeeds with Create
+ // so for the remaining ones we should do a Read instead of throwing Conflict exception
+ using (ResponseMessage readResponseAfterCreate = await container.ReadContainerStreamAsync(
+ requestOptions: requestOptions,
+ trace: trace,
+ cancellationToken: cancellationToken))
+ {
+ totalRequestCharge += readResponseAfterCreate.Headers.RequestCharge;
+ readResponseAfterCreate.Headers.RequestCharge = totalRequestCharge;
+
+ return this.ClientContext.ResponseFactory.CreateContainerResponse(container, readResponseAfterCreate);
+ }
+ }
+
+ public async Task ReplaceThroughputAsync(
+ ThroughputProperties throughputProperties,
+ RequestOptions requestOptions,
+ ITrace trace,
+ CancellationToken cancellationToken = default)
+ {
+ string rid = await this.GetRIDAsync(cancellationToken);
+ CosmosOffers cosmosOffers = new CosmosOffers(this.ClientContext);
+ return await cosmosOffers.ReplaceThroughputPropertiesAsync(
+ targetRID: rid,
+ throughputProperties: throughputProperties,
+ requestOptions: requestOptions,
+ cancellationToken: cancellationToken);
+ }
+
+ internal override async Task ReplaceThroughputPropertiesIfExistsAsync(
+ ThroughputProperties throughputProperties,
+ RequestOptions requestOptions,
+ CancellationToken cancellationToken)
+ {
+ string rid = await this.GetRIDAsync(cancellationToken);
+ CosmosOffers cosmosOffers = new CosmosOffers(this.ClientContext);
+ return await cosmosOffers.ReplaceThroughputPropertiesIfExistsAsync(
+ targetRID: rid,
+ throughputProperties: throughputProperties,
+ requestOptions: requestOptions,
+ cancellationToken: cancellationToken);
+ }
+
+ public Task ReadStreamAsync(
+ RequestOptions requestOptions,
+ ITrace trace,
+ CancellationToken cancellationToken)
+ {
+ return this.ProcessResourceOperationStreamAsync(
+ streamPayload: null,
+ operationType: OperationType.Read,
+ linkUri: this.LinkUri,
+ resourceType: ResourceType.Database,
+ requestOptions: requestOptions,
+ trace: trace,
+ cancellationToken: cancellationToken);
+ }
+
+ public Task DeleteStreamAsync(
+ RequestOptions requestOptions,
+ ITrace trace,
+ CancellationToken cancellationToken)
+ {
+ return this.ProcessResourceOperationStreamAsync(
+ streamPayload: null,
+ operationType: OperationType.Delete,
+ linkUri: this.LinkUri,
+ resourceType: ResourceType.Database,
+ requestOptions: requestOptions,
+ trace: trace,
+ cancellationToken: cancellationToken);
+ }
+
+ public async Task CreateContainerAsync(
+ ContainerProperties containerProperties,
+ int? throughput,
+ RequestOptions requestOptions,
+ ITrace trace,
+ CancellationToken cancellationToken)
+ {
+ if (containerProperties == null)
+ {
+ throw new ArgumentNullException(nameof(containerProperties));
+ }
+
+ this.ValidateContainerProperties(containerProperties);
+
+ ResponseMessage response = await this.ProcessCollectionCreateAsync(
+ streamPayload: this.ClientContext.SerializerCore.ToStream(containerProperties),
+ throughput: throughput,
+ requestOptions: requestOptions,
+ trace: trace,
+ cancellationToken: cancellationToken);
+
+ return this.ClientContext.ResponseFactory.CreateContainerResponse(this.GetContainer(containerProperties.Id), response);
+ }
+
+ public Task CreateContainerAsync(
+ string id,
+ string partitionKeyPath,
+ int? throughput,
+ RequestOptions requestOptions,
+ ITrace trace,
+ CancellationToken cancellationToken)
+ {
+ if (string.IsNullOrEmpty(id))
+ {
+ throw new ArgumentNullException(nameof(id));
+ }
+
+ if (string.IsNullOrEmpty(partitionKeyPath))
+ {
+ throw new ArgumentNullException(nameof(partitionKeyPath));
+ }
+
+ ContainerProperties containerProperties = new ContainerProperties(id, partitionKeyPath);
+
+ return this.CreateContainerAsync(
+ containerProperties,
+ throughput,
+ requestOptions,
+ trace,
+ cancellationToken);
+ }
+
+ public Task CreateContainerIfNotExistsAsync(
+ ContainerProperties containerProperties,
+ int? throughput,
+ RequestOptions requestOptions,
+ ITrace trace,
+ CancellationToken cancellationToken)
+ {
+ if (containerProperties == null)
+ {
+ throw new ArgumentNullException(nameof(containerProperties));
+ }
+
+ return this.CreateContainerIfNotExistsAsync(
+ containerProperties,
+ ThroughputProperties.CreateManualThroughput(throughput),
+ requestOptions,
+ trace,
+ cancellationToken);
+ }
+
+ public Task CreateContainerIfNotExistsAsync(
+ string id,
+ string partitionKeyPath,
+ int? throughput,
+ RequestOptions requestOptions,
+ ITrace trace,
+ CancellationToken cancellationToken)
+ {
+ if (string.IsNullOrEmpty(id))
+ {
+ throw new ArgumentNullException(nameof(id));
+ }
+
+ if (string.IsNullOrEmpty(partitionKeyPath))
+ {
+ throw new ArgumentNullException(nameof(partitionKeyPath));
+ }
+
+ ContainerProperties containerProperties = new ContainerProperties(id, partitionKeyPath);
+ return this.CreateContainerIfNotExistsAsync(containerProperties, throughput, requestOptions, trace, cancellationToken);
+ }
+
+ public override Container GetContainer(string id)
+ {
+ if (string.IsNullOrEmpty(id))
+ {
+ throw new ArgumentNullException(nameof(id));
+ }
+
+ return new ContainerInlineCore(
+ this.ClientContext,
+ this,
+ id);
+ }
+
+ public Task CreateContainerStreamAsync(
+ ContainerProperties containerProperties,
+ int? throughput,
+ RequestOptions requestOptions,
+ ITrace trace,
+ CancellationToken cancellationToken)
+ {
+ if (containerProperties == null)
+ {
+ throw new ArgumentNullException(nameof(containerProperties));
+ }
+
+ this.ValidateContainerProperties(containerProperties);
+
+ Stream streamPayload = this.ClientContext.SerializerCore.ToStream(containerProperties);
+ return this.ProcessCollectionCreateAsync(
+ streamPayload,
+ throughput,
+ requestOptions,
+ trace,
+ cancellationToken);
+ }
+
+ public async Task CreateUserAsync(
+ string id,
+ RequestOptions requestOptions,
+ ITrace trace,
+ CancellationToken cancellationToken)
+ {
+ if (string.IsNullOrEmpty(id))
+ {
+ throw new ArgumentNullException(nameof(id));
+ }
+
+ UserProperties userProperties = new UserProperties(id);
+
+ ResponseMessage response = await this.CreateUserStreamAsync(
+ userProperties: userProperties,
+ requestOptions: requestOptions,
+ trace: trace,
+ cancellationToken: cancellationToken);
+
+ return this.ClientContext.ResponseFactory.CreateUserResponse(this.GetUser(userProperties.Id), response);
+ }
+
+ public override User GetUser(string id)
+ {
+ if (string.IsNullOrEmpty(id))
+ {
+ throw new ArgumentNullException(nameof(id));
+ }
+
+ return new UserInlineCore(
+ this.ClientContext,
+ this,
+ id);
+ }
+
+ public Task CreateUserStreamAsync(
+ UserProperties userProperties,
+ RequestOptions requestOptions,
+ ITrace trace,
+ CancellationToken cancellationToken)
+ {
+ if (userProperties == null)
+ {
+ throw new ArgumentNullException(nameof(userProperties));
+ }
+
+ this.ClientContext.ValidateResource(userProperties.Id);
+
+ Stream streamPayload = this.ClientContext.SerializerCore.ToStream(userProperties);
+ return this.ProcessUserCreateAsync(
+ streamPayload: streamPayload,
+ requestOptions: requestOptions,
+ trace: trace,
+ cancellationToken: cancellationToken);
+ }
+
+ public async Task UpsertUserAsync(
+ string id,
+ RequestOptions requestOptions,
+ ITrace trace,
+ CancellationToken cancellationToken)
+ {
+ if (string.IsNullOrEmpty(id))
+ {
+ throw new ArgumentNullException(nameof(id));
+ }
+
+ this.ClientContext.ValidateResource(id);
+
+ ResponseMessage response = await this.ProcessUserUpsertAsync(
+ streamPayload: this.ClientContext.SerializerCore.ToStream(new UserProperties(id)),
+ requestOptions: requestOptions,
+ trace: trace,
+ cancellationToken: cancellationToken);
+
+ return this.ClientContext.ResponseFactory.CreateUserResponse(this.GetUser(id), response);
+ }
+
+ public override FeedIterator GetContainerQueryStreamIterator(
+ string queryText = null,
+ string continuationToken = null,
+ QueryRequestOptions requestOptions = null)
+ {
+ QueryDefinition queryDefinition = null;
+ if (queryText != null)
+ {
+ queryDefinition = new QueryDefinition(queryText);
+ }
+
+ return this.GetContainerQueryStreamIterator(
+ queryDefinition,
+ continuationToken,
+ requestOptions);
+ }
+
+ public override FeedIterator GetContainerQueryIterator(
+ string queryText = null,
+ string continuationToken = null,
+ QueryRequestOptions requestOptions = null)
+ {
+ QueryDefinition queryDefinition = null;
+ if (queryText != null)
+ {
+ queryDefinition = new QueryDefinition(queryText);
+ }
+
+ return this.GetContainerQueryIterator(
+ queryDefinition,
+ continuationToken,
+ requestOptions);
+ }
+
+ public override FeedIterator GetContainerQueryStreamIterator(
+ QueryDefinition queryDefinition,
+ string continuationToken = null,
+ QueryRequestOptions requestOptions = null)
+ {
+ return new FeedIteratorCore(
+ clientContext: this.ClientContext,
+ resourceLink: this.LinkUri,
+ resourceType: ResourceType.Collection,
+ queryDefinition: queryDefinition,
+ continuationToken: continuationToken,
+ options: requestOptions);
+ }
+
+ public override FeedIterator GetContainerQueryIterator(
+ QueryDefinition queryDefinition,
+ string continuationToken = null,
+ QueryRequestOptions requestOptions = null)
+ {
+ if (!(this.GetContainerQueryStreamIterator(
+ queryDefinition,
+ continuationToken,
+ requestOptions) is FeedIteratorInternal containerStreamIterator))
+ {
+ // This class should inherit from DatabaseInteral to avoid the downcasting hacks.
+ throw new InvalidOperationException($"Expected FeedIteratorInternal.");
+ }
+
+ return new FeedIteratorCore(
+ containerStreamIterator,
+ (response) => this.ClientContext.ResponseFactory.CreateQueryFeedResponse(
+ responseMessage: response,
+ resourceType: ResourceType.Collection));
+ }
+
+ public override FeedIterator GetUserQueryIterator(
+ QueryDefinition queryDefinition,
+ string continuationToken = null,
+ QueryRequestOptions requestOptions = null)
+ {
+ if (!(this.GetUserQueryStreamIterator(
+ queryDefinition,
+ continuationToken,
+ requestOptions) is FeedIteratorInternal userStreamIterator))
+ {
+ // This class should inherit from DatabaseInteral to avoid the downcasting hacks.
+ throw new InvalidOperationException($"Expected FeedIteratorInternal.");
+ }
+
+ return new FeedIteratorCore(
+ userStreamIterator,
+ (response) => this.ClientContext.ResponseFactory.CreateQueryFeedResponse(
+ responseMessage: response,
+ resourceType: ResourceType.User));
+ }
+
+ public override FeedIterator GetUserQueryStreamIterator(
+ QueryDefinition queryDefinition,
+ string continuationToken = null,
+ QueryRequestOptions requestOptions = null)
+ {
+ return new FeedIteratorCore(
+ clientContext: this.ClientContext,
+ resourceLink: this.LinkUri,
+ resourceType: ResourceType.User,
+ queryDefinition: queryDefinition,
+ continuationToken: continuationToken,
+ options: requestOptions);
+ }
+
+ public override FeedIterator GetUserQueryIterator(
+ string queryText = null,
+ string continuationToken = null,
+ QueryRequestOptions requestOptions = null)
+ {
+ QueryDefinition queryDefinition = null;
+ if (queryText != null)
+ {
+ queryDefinition = new QueryDefinition(queryText);
+ }
+
+ return this.GetUserQueryIterator(
+ queryDefinition,
+ continuationToken,
+ requestOptions);
+ }
+
+ public override FeedIterator GetUserQueryStreamIterator(
+ string queryText = null,
+ string continuationToken = null,
+ QueryRequestOptions requestOptions = null)
+ {
+ QueryDefinition queryDefinition = null;
+ if (queryText != null)
+ {
+ queryDefinition = new QueryDefinition(queryText);
+ }
+
+ return this.GetUserQueryStreamIterator(
+ queryDefinition,
+ continuationToken,
+ requestOptions);
+ }
+
+ public override ContainerBuilder DefineContainer(
+ string name,
+ string partitionKeyPath)
+ {
+ return new ContainerBuilder(this, name, partitionKeyPath);
+ }
+
+ public override ClientEncryptionKey GetClientEncryptionKey(string id)
+ {
+ if (string.IsNullOrEmpty(id))
+ {
+ throw new ArgumentNullException(nameof(id));
+ }
+
+ return new ClientEncryptionKeyInlineCore(
+ this.ClientContext,
+ this,
+ id);
+ }
+
+ public override FeedIterator GetClientEncryptionKeyQueryIterator(
+ QueryDefinition queryDefinition,
+ string continuationToken = null,
+ QueryRequestOptions requestOptions = null)
+ {
+ if (!(this.GetClientEncryptionKeyQueryStreamIterator(
+ queryDefinition: queryDefinition,
+ continuationToken: continuationToken,
+ requestOptions: requestOptions) is FeedIteratorInternal cekStreamIterator))
+ {
+ throw new InvalidOperationException($"Expected FeedIteratorInternal.");
+ }
+
+ return new FeedIteratorCore(
+ cekStreamIterator,
+ (responseMessage) =>
+ {
+ FeedResponse results = this.ClientContext.ResponseFactory.CreateQueryFeedResponse(responseMessage, ResourceType.ClientEncryptionKey);
+ return results;
+ });
+ }
+
+ private FeedIterator GetClientEncryptionKeyQueryStreamIterator(
+ QueryDefinition queryDefinition,
+ string continuationToken = null,
+ QueryRequestOptions requestOptions = null)
+ {
+ return new FeedIteratorCore(
+ clientContext: this.ClientContext,
+ resourceLink: this.LinkUri,
+ resourceType: ResourceType.ClientEncryptionKey,
+ queryDefinition: queryDefinition,
+ continuationToken: continuationToken,
+ options: requestOptions);
+ }
+
+ public async Task CreateClientEncryptionKeyAsync(
+ ITrace trace,
+ ClientEncryptionKeyProperties clientEncryptionKeyProperties,
+ RequestOptions requestOptions = null,
+ CancellationToken cancellationToken = default)
+ {
+ Stream streamPayload = this.ClientContext.SerializerCore.ToStream(clientEncryptionKeyProperties);
+ ResponseMessage responseMessage = await this.CreateClientEncryptionKeyStreamAsync(
+ trace: trace,
+ streamPayload: streamPayload,
+ requestOptions: requestOptions,
+ cancellationToken: cancellationToken);
+
+ ClientEncryptionKeyResponse cekResponse = this.ClientContext.ResponseFactory.CreateClientEncryptionKeyResponse(
+ this.GetClientEncryptionKey(clientEncryptionKeyProperties.Id),
+ responseMessage);
+
+ Debug.Assert(cekResponse.Resource != null);
+
+ return cekResponse;
+ }
+
+ private void ValidateContainerProperties(ContainerProperties containerProperties)
+ {
+ containerProperties.ValidateRequiredProperties();
+ this.ClientContext.ValidateResource(containerProperties.Id);
+ }
+
+ private Task ProcessCollectionCreateAsync(
+ Stream streamPayload,
+ ThroughputProperties throughputProperties,
+ RequestOptions requestOptions,
+ ITrace trace,
+ CancellationToken cancellationToken)
+ {
+ return this.ClientContext.ProcessResourceOperationStreamAsync(
+ resourceUri: this.LinkUri,
+ resourceType: ResourceType.Collection,
+ operationType: OperationType.Create,
+ cosmosContainerCore: null,
+ feedRange: null,
+ streamPayload: streamPayload,
+ requestOptions: requestOptions,
+ requestEnricher: (httpRequestMessage) => httpRequestMessage.AddThroughputPropertiesHeader(throughputProperties),
+ trace: trace,
+ cancellationToken: cancellationToken);
+ }
+
+ private Task ProcessCollectionCreateAsync(
+ Stream streamPayload,
+ int? throughput,
+ RequestOptions requestOptions,
+ ITrace trace,
+ CancellationToken cancellationToken)
+ {
+ return this.ClientContext.ProcessResourceOperationStreamAsync(
+ resourceUri: this.LinkUri,
+ resourceType: ResourceType.Collection,
+ operationType: OperationType.Create,
+ cosmosContainerCore: null,
+ feedRange: null,
+ streamPayload: streamPayload,
+ requestOptions: requestOptions,
+ requestEnricher: (httpRequestMessage) => httpRequestMessage.AddThroughputHeader(throughput),
+ trace: trace,
+ cancellationToken: cancellationToken);
+ }
+
+ private Task ProcessUserCreateAsync(
+ Stream streamPayload,
+ RequestOptions requestOptions,
+ ITrace trace,
+ CancellationToken cancellationToken)
+ {
+ return this.ClientContext.ProcessResourceOperationStreamAsync(
+ resourceUri: this.LinkUri,
+ resourceType: ResourceType.User,
+ operationType: OperationType.Create,
+ cosmosContainerCore: null,
+ feedRange: null,
+ streamPayload: streamPayload,
+ requestOptions: requestOptions,
+ requestEnricher: null,
+ trace: trace,
+ cancellationToken: cancellationToken);
+ }
+
+ private Task ProcessUserUpsertAsync(
+ Stream streamPayload,
+ RequestOptions requestOptions,
+ ITrace trace,
+ CancellationToken cancellationToken)
+ {
+ return this.ClientContext.ProcessResourceOperationStreamAsync(
+ resourceUri: this.LinkUri,
+ resourceType: ResourceType.User,
+ operationType: OperationType.Upsert,
+ cosmosContainerCore: null,
+ feedRange: null,
+ streamPayload: streamPayload,
+ requestOptions: requestOptions,
+ requestEnricher: null,
+ trace: trace,
+ cancellationToken: cancellationToken);
+ }
+
+ internal override async Task GetRIDAsync(CancellationToken cancellationToken)
+ {
+ DatabaseResponse databaseResponse = await this.ReadAsync(cancellationToken: cancellationToken);
+ return databaseResponse?.Resource?.ResourceId;
+ }
+
+ private Task CreateClientEncryptionKeyStreamAsync(
+ ITrace trace,
+ Stream streamPayload,
+ RequestOptions requestOptions = null,
+ CancellationToken cancellationToken = default)
+ {
+ if (streamPayload == null)
+ {
+ throw new ArgumentNullException(nameof(streamPayload));
+ }
+
+ return this.ClientContext.ProcessResourceOperationStreamAsync(
+ resourceUri: this.LinkUri,
+ resourceType: ResourceType.ClientEncryptionKey,
+ operationType: OperationType.Create,
+ cosmosContainerCore: null,
+ feedRange: null,
+ streamPayload: streamPayload,
+ requestOptions: requestOptions,
+ requestEnricher: null,
+ trace: trace,
+ cancellationToken: cancellationToken);
+ }
+
+ private Task ProcessResourceOperationStreamAsync(
+ Stream streamPayload,
+ OperationType operationType,
+ string linkUri,
+ ResourceType resourceType,
+ RequestOptions requestOptions,
+ ITrace trace,
+ CancellationToken cancellationToken)
+ {
+ return this.ClientContext.ProcessResourceOperationStreamAsync(
+ resourceUri: linkUri,
+ resourceType: resourceType,
+ operationType: operationType,
+ cosmosContainerCore: null,
+ feedRange: null,
+ streamPayload: streamPayload,
+ requestOptions: requestOptions,
+ requestEnricher: null,
+ trace: trace,
+ cancellationToken: cancellationToken);
+ }
+ }
+}