Skip to content

Commit

Permalink
Query : Adds WithParameterStream to QueryDefinition to pass in serial…
Browse files Browse the repository at this point in the history
…ized values (Azure#2222)

* Support Stream Value in SqlParameter and disable call to customer serializer.

* Updated contracts.

* Updated Contracts.

* Fixes / Fixed Comment.

* removed redundant logic.

* Renamed method, updated summary.

* Update DotNetSDKAPI.json

* Update CosmosItemTests.cs

* Fixes as per review comments. Updated tests.

Co-authored-by: j82w <j82w@users.noreply.github.com>
  • Loading branch information
kr-santosh and j82w authored Mar 25, 2021
1 parent e8812c6 commit ec8bf98
Show file tree
Hide file tree
Showing 5 changed files with 253 additions and 11 deletions.
25 changes: 18 additions & 7 deletions Microsoft.Azure.Cosmos/src/CosmosSqlQuerySpecJsonConverter.cs
Original file line number Diff line number Diff line change
Expand Up @@ -41,15 +41,27 @@ public override void WriteJson(JsonWriter writer, object value, JsonSerializer s
serializer.Serialize(writer, sqlParameter.Name);
writer.WritePropertyName("value");

// Use the user serializer for the parameter values so custom conversions are correctly handled
using (Stream str = this.UserSerializer.ToStream(sqlParameter.Value))
// if the SqlParameter has stream value we dont pass it through the custom serializer.
if (sqlParameter.Value is SerializedParameterValue serializedEncryptedData)
{
using (StreamReader streamReader = new StreamReader(str))
using (StreamReader streamReader = new StreamReader(serializedEncryptedData.valueStream))
{
string parameterValue = streamReader.ReadToEnd();
writer.WriteRawValue(parameterValue);
}
}
else
{
// Use the user serializer for the parameter values so custom conversions are correctly handled
using (Stream str = this.UserSerializer.ToStream(sqlParameter.Value))
{
using (StreamReader streamReader = new StreamReader(str))
{
string parameterValue = streamReader.ReadToEnd();
writer.WriteRawValue(parameterValue);
}
}
}

writer.WriteEndObject();
}
Expand All @@ -62,15 +74,14 @@ internal static CosmosSerializer CreateSqlQuerySpecSerializer(
CosmosSerializer cosmosSerializer,
CosmosSerializer propertiesSerializer)
{
// If both serializers are the same no need for the custom converter
if (object.ReferenceEquals(cosmosSerializer, propertiesSerializer))
if (propertiesSerializer is CosmosJsonSerializerWrapper cosmosJsonSerializerWrapper)
{
return propertiesSerializer;
propertiesSerializer = cosmosJsonSerializerWrapper.InternalJsonSerializer;
}

JsonSerializerSettings settings = new JsonSerializerSettings()
{
Converters = new List<JsonConverter>() { new CosmosSqlQuerySpecJsonConverter(cosmosSerializer) }
Converters = new List<JsonConverter>() { new CosmosSqlQuerySpecJsonConverter(cosmosSerializer ?? propertiesSerializer) }
};

return new CosmosJsonSerializerWrapper(new CosmosJsonDotNetSerializer(settings));
Expand Down
37 changes: 37 additions & 0 deletions Microsoft.Azure.Cosmos/src/Query/v3Query/QueryDefinition.cs
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ namespace Microsoft.Azure.Cosmos
using System;
using System.Collections;
using System.Collections.Generic;
using System.IO;
using Microsoft.Azure.Cosmos.Query.Core;
using Newtonsoft.Json;

Expand Down Expand Up @@ -108,6 +109,37 @@ public QueryDefinition WithParameter(string name, object value)
return this;
}

/// <summary>
/// Add parameters with Stream Value to the SQL query.
/// </summary>
/// <param name="name">The name of the parameter.</param>
/// <param name="valueStream">The stream value for the parameter.</param>
/// <remarks>
/// UseCase : This is useful in cases like running a Query on Encrypted Values, where the value is generated post serialization and then encrypted
/// and we don't want to change the cipher value due to a call to serializer again.
/// If the same name is added again it will replace the original value.
/// </remarks>
/// <example>
/// <code language="c#">
/// <![CDATA[
/// QueryDefinition query = new QueryDefinition(
/// "select * from t where t.Account = @account")
/// .WithParameterStream("@account", streamValue);
/// ]]>
/// </code>
/// </example>
/// <returns>An instance of <see cref="QueryDefinition"/>.</returns>
public QueryDefinition WithParameterStream(string name, Stream valueStream)
{
// pack it into an internal type for identification.
SerializedParameterValue serializedParameterValue = new SerializedParameterValue
{
valueStream = valueStream
};

return this.WithParameter(name, serializedParameterValue);
}

/// <summary>
/// Returns the names and values of parameters in this <see cref="QueryDefinition"/>.
/// </summary>
Expand Down Expand Up @@ -164,4 +196,9 @@ IEnumerator IEnumerable.GetEnumerator()
}
}
}

internal struct SerializedParameterValue
{
internal Stream valueStream;
}
}
10 changes: 6 additions & 4 deletions Microsoft.Azure.Cosmos/src/Serializer/CosmosSerializerCore.cs
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,10 @@ internal CosmosSerializerCore(
if (customSerializer == null)
{
this.customSerializer = null;
this.sqlQuerySpecSerializer = null;
// this would allow us to set the JsonConverter and inturn handle Serialized/Stream Query Parameter Value.
this.sqlQuerySpecSerializer = CosmosSqlQuerySpecJsonConverter.CreateSqlQuerySpecSerializer(
cosmosSerializer: null,
propertiesSerializer: CosmosSerializerCore.propertiesSerializer);
this.patchOperationSerializer = null;
}
else
Expand Down Expand Up @@ -84,16 +87,15 @@ internal Stream ToStreamSqlQuerySpec(SqlQuerySpec input, ResourceType resourceTy

// All the public types that support query use the custom serializer
// Internal types like offers will use the default serializer.
if (this.customSerializer != null &&
(resourceType == ResourceType.Database ||
if (resourceType == ResourceType.Database ||
resourceType == ResourceType.Collection ||
resourceType == ResourceType.Document ||
resourceType == ResourceType.Trigger ||
resourceType == ResourceType.UserDefinedFunction ||
resourceType == ResourceType.StoredProcedure ||
resourceType == ResourceType.Permission ||
resourceType == ResourceType.User ||
resourceType == ResourceType.Conflict))
resourceType == ResourceType.Conflict)
{
serializer = this.sqlQuerySpecSerializer;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -851,6 +851,193 @@ public async Task ItemCustomSerialzierTest()
}
}

[TestMethod]
public async Task QueryStreamValueTest()
{
DateTime createDateTime = DateTime.UtcNow;

dynamic testItem1 = new
{
id = "testItem1",
cost = (double?)null,
totalCost = 98.2789,
pk = "MyCustomStatus",
taskNum = 4909,
createdDateTime = createDateTime,
statusCode = HttpStatusCode.Accepted,
itemIds = new int[] { 1, 5, 10 },
itemcode = new byte?[5] { 0x16, (byte)'\0', 0x3, null, (byte)'}' },
};

dynamic testItem2 = new
{
id = "testItem2",
cost = (double?)null,
totalCost = 98.2789,
pk = "MyCustomStatus",
taskNum = 4909,
createdDateTime = createDateTime,
statusCode = HttpStatusCode.Accepted,
itemIds = new int[] { 1, 5, 10 },
itemcode = new byte?[5] { 0x16, (byte)'\0', 0x3, null, (byte)'}' },
};

//with Custom Serializer.
JsonSerializerSettings jsonSerializerSettings = new JsonSerializerSettings()
{
Converters = new List<JsonConverter>() { new CosmosSerializerHelper.FormatNumbersAsTextConverter() }
};

int toStreamCount = 0;
int fromStreamCount = 0;
CosmosSerializerHelper cosmosSerializerHelper = new CosmosSerializerHelper(
jsonSerializerSettings,
toStreamCallBack: (itemValue) =>
{
Type itemType = itemValue?.GetType();
if (itemValue == null
|| itemType == typeof(int)
|| itemType == typeof(double)
|| itemType == typeof(string)
|| itemType == typeof(DateTime)
|| itemType == typeof(HttpStatusCode)
|| itemType == typeof(int[])
|| itemType == typeof(byte))
{
toStreamCount++;
}
},
fromStreamCallback: (item) => fromStreamCount++);

CosmosClientOptions options = new CosmosClientOptions()
{
Serializer = cosmosSerializerHelper
};

CosmosClient clientSerializer = TestCommon.CreateCosmosClient(options);
Container containerSerializer = clientSerializer.GetContainer(this.database.Id, this.Container.Id);

List<QueryDefinition> queryDefinitions = new List<QueryDefinition>()
{
new QueryDefinition("select * from t where t.pk = @pk" )
.WithParameterStream("@pk", cosmosSerializerHelper.ToStream<dynamic>(testItem1.pk)),
new QueryDefinition("select * from t where t.cost = @cost" )
.WithParameterStream("@cost", cosmosSerializerHelper.ToStream<dynamic>(testItem1.cost)),
new QueryDefinition("select * from t where t.taskNum = @taskNum" )
.WithParameterStream("@taskNum", cosmosSerializerHelper.ToStream<dynamic>(testItem1.taskNum)),
new QueryDefinition("select * from t where t.totalCost = @totalCost" )
.WithParameterStream("@totalCost", cosmosSerializerHelper.ToStream<dynamic>(testItem1.totalCost)),
new QueryDefinition("select * from t where t.createdDateTime = @createdDateTime" )
.WithParameterStream("@createdDateTime", cosmosSerializerHelper.ToStream<dynamic>(testItem1.createdDateTime)),
new QueryDefinition("select * from t where t.statusCode = @statusCode" )
.WithParameterStream("@statusCode", cosmosSerializerHelper.ToStream<dynamic>(testItem1.statusCode)),
new QueryDefinition("select * from t where t.itemIds = @itemIds" )
.WithParameterStream("@itemIds", cosmosSerializerHelper.ToStream<dynamic>(testItem1.itemIds)),
new QueryDefinition("select * from t where t.itemcode = @itemcode" )
.WithParameterStream("@itemcode", cosmosSerializerHelper.ToStream<dynamic>(testItem1.itemcode)),
new QueryDefinition("select * from t where t.pk = @pk and t.cost = @cost" )
.WithParameterStream("@pk", cosmosSerializerHelper.ToStream<dynamic>(testItem1.pk))
.WithParameterStream("@cost", cosmosSerializerHelper.ToStream<dynamic>(testItem1.cost)),
};

try
{
await containerSerializer.CreateItemAsync<dynamic>(testItem1);
await containerSerializer.CreateItemAsync<dynamic>(testItem2);
}
catch (CosmosException ex) when (ex.StatusCode == HttpStatusCode.Conflict)
{
// Ignore conflicts since the object already exists
}

foreach (QueryDefinition queryDefinition in queryDefinitions)
{
toStreamCount = 0;
fromStreamCount = 0;

List<dynamic> allItems = new List<dynamic>();
int pageCount = 0;
using (FeedIterator<dynamic> feedIterator = containerSerializer.GetItemQueryIterator<dynamic>(
queryDefinition: queryDefinition))
{
while (feedIterator.HasMoreResults)
{
// Only need once to verify correct serialization of the query definition
FeedResponse<dynamic> response = await feedIterator.ReadNextAsync(this.cancellationToken);
Assert.AreEqual(response.Count, response.Count());
allItems.AddRange(response);
pageCount++;
}
}

Assert.AreEqual(2, allItems.Count, $"missing query results. Only found: {allItems.Count} items for query:{queryDefinition.ToSqlQuerySpec().QueryText}");

// There should be no call to custom serializer since the parameter values are already serialized.
Assert.AreEqual(0, toStreamCount, $"missing to stream call. Expected: 0 , Actual: {toStreamCount} for query:{queryDefinition.ToSqlQuerySpec().QueryText}");
Assert.AreEqual(pageCount, fromStreamCount);
}

// Standard Cosmos Serializer Used

CosmosClient clientStandardSerializer = TestCommon.CreateCosmosClient(useCustomSeralizer:false);
Container containerStandardSerializer = clientStandardSerializer.GetContainer(this.database.Id, this.Container.Id);

testItem1 = ToDoActivity.CreateRandomToDoActivity();
testItem1.pk = "myPk";
await containerStandardSerializer.CreateItemAsync(testItem1, new Cosmos.PartitionKey(testItem1.pk));

testItem2 = ToDoActivity.CreateRandomToDoActivity();
testItem2.pk = "myPk";
await containerStandardSerializer.CreateItemAsync(testItem2, new Cosmos.PartitionKey(testItem2.pk));
CosmosSerializer cosmosSerializer = containerStandardSerializer.Database.Client.ClientOptions.Serializer;

queryDefinitions = new List<QueryDefinition>()
{
new QueryDefinition("select * from t where t.pk = @pk" )
.WithParameterStream("@pk", cosmosSerializer.ToStream(testItem1.pk)),
new QueryDefinition("select * from t where t.cost = @cost" )
.WithParameterStream("@cost", cosmosSerializer.ToStream(testItem1.cost)),
new QueryDefinition("select * from t where t.taskNum = @taskNum" )
.WithParameterStream("@taskNum", cosmosSerializer.ToStream(testItem1.taskNum)),
new QueryDefinition("select * from t where t.CamelCase = @CamelCase" )
.WithParameterStream("@CamelCase", cosmosSerializer.ToStream(testItem1.CamelCase)),
new QueryDefinition("select * from t where t.valid = @valid" )
.WithParameterStream("@valid", cosmosSerializer.ToStream(testItem1.valid)),
new QueryDefinition("select * from t where t.description = @description" )
.WithParameterStream("@description", cosmosSerializer.ToStream(testItem1.description)),
new QueryDefinition("select * from t where t.pk = @pk and t.cost = @cost" )
.WithParameterStream("@pk", cosmosSerializer.ToStream(testItem1.pk))
.WithParameterStream("@cost", cosmosSerializer.ToStream(testItem1.cost)),
};

foreach (QueryDefinition queryDefinition in queryDefinitions)
{
List<ToDoActivity> allItems = new List<ToDoActivity>();
int pageCount = 0;
using (FeedIterator<ToDoActivity> feedIterator = containerStandardSerializer.GetItemQueryIterator<ToDoActivity>(
queryDefinition: queryDefinition))
{
while (feedIterator.HasMoreResults)
{
// Only need once to verify correct serialization of the query definition
FeedResponse<ToDoActivity> response = await feedIterator.ReadNextAsync(this.cancellationToken);
Assert.AreEqual(response.Count, response.Count());
allItems.AddRange(response);
pageCount++;
}
}

Assert.AreEqual(2, allItems.Count, $"missing query results. Only found: {allItems.Count} items for query:{queryDefinition.ToSqlQuerySpec().QueryText}");
Assert.AreEqual(pageCount, 1);


IReadOnlyList<(string Name, object Value)> parameters1 = queryDefinition.GetQueryParameters();
IReadOnlyList<(string Name, object Value)> parameters2 = queryDefinition.GetQueryParameters();

Assert.AreSame(parameters1, parameters2);
}
}

[TestMethod]
public async Task ItemIterator()
{
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -4472,6 +4472,11 @@
"Attributes": [],
"MethodInfo": "Microsoft.Azure.Cosmos.QueryDefinition WithParameter(System.String, System.Object);IsAbstract:False;IsStatic:False;IsVirtual:False;IsGenericMethod:False;IsConstructor:False;IsFinal:False;"
},
"Microsoft.Azure.Cosmos.QueryDefinition WithParameterStream(System.String, System.IO.Stream)": {
"Type": "Method",
"Attributes": [],
"MethodInfo": "Microsoft.Azure.Cosmos.QueryDefinition WithParameterStream(System.String, System.IO.Stream);IsAbstract:False;IsStatic:False;IsVirtual:False;IsGenericMethod:False;IsConstructor:False;IsFinal:False;"
},
"System.Collections.Generic.IReadOnlyList`1[System.ValueTuple`2[System.String,System.Object]] GetQueryParameters()": {
"Type": "Method",
"Attributes": [],
Expand Down

0 comments on commit ec8bf98

Please sign in to comment.