Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fixed missing retry after for stream queries. #1263

Merged
merged 10 commits into from
Mar 17, 2020
39 changes: 19 additions & 20 deletions Microsoft.Azure.Cosmos/src/Query/v3Query/QueryIterator.cs
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,6 @@ namespace Microsoft.Azure.Cosmos.Query
using System.Threading.Tasks;
using Microsoft.Azure.Cosmos.CosmosElements;
using Microsoft.Azure.Cosmos.Diagnostics;
using Microsoft.Azure.Cosmos.Json;
using Microsoft.Azure.Cosmos.Query.Core;
using Microsoft.Azure.Cosmos.Query.Core.Exceptions;
using Microsoft.Azure.Cosmos.Query.Core.ExecutionContext;
Expand Down Expand Up @@ -135,10 +134,9 @@ public override async Task<ResponseMessage> ReadNextAsync(CancellationToken canc
diagnostics.AddDiagnosticsInternal(queryPage);
}

QueryResponse queryResponse;
if (responseCore.IsSuccess)
{
queryResponse = QueryResponse.CreateSuccess(
return QueryResponse.CreateSuccess(
result: responseCore.CosmosElements,
count: responseCore.CosmosElements.Count,
responseLengthBytes: responseCore.ResponseLengthBytes,
Expand All @@ -155,26 +153,27 @@ public override async Task<ResponseMessage> ReadNextAsync(CancellationToken canc
SubStatusCode = responseCore.SubStatusCode ?? Documents.SubStatusCodes.Unknown
});
}
else

if (responseCore.CosmosException != null)
{
queryResponse = QueryResponse.CreateFailure(
statusCode: responseCore.StatusCode,
cosmosException: responseCore.CosmosException,
requestMessage: null,
diagnostics: diagnostics,
responseHeaders: new CosmosQueryResponseMessageHeaders(
responseCore.ContinuationToken,
responseCore.DisallowContinuationTokenMessage,
cosmosQueryContext.ResourceTypeEnum,
cosmosQueryContext.ContainerResourceId)
{
RequestCharge = responseCore.RequestCharge,
ActivityId = responseCore.ActivityId,
SubStatusCode = responseCore.SubStatusCode ?? Documents.SubStatusCodes.Unknown
});
return responseCore.CosmosException.ToCosmosResponseMessage(null);
}

return queryResponse;
return QueryResponse.CreateFailure(
statusCode: responseCore.StatusCode,
cosmosException: responseCore.CosmosException,
requestMessage: null,
diagnostics: diagnostics,
responseHeaders: new CosmosQueryResponseMessageHeaders(
responseCore.ContinuationToken,
responseCore.DisallowContinuationTokenMessage,
cosmosQueryContext.ResourceTypeEnum,
cosmosQueryContext.ContainerResourceId)
{
RequestCharge = responseCore.RequestCharge,
ActivityId = responseCore.ActivityId,
SubStatusCode = responseCore.SubStatusCode ?? Documents.SubStatusCodes.Unknown,
});
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -9,11 +9,14 @@ namespace Microsoft.Azure.Cosmos.SDK.EmulatorTests
using System.IO;
using System.Linq;
using System.Net;
using System.Threading;
using System.Threading.Tasks;
using Cosmos.Scripts;
using Microsoft.Azure.Cosmos.Linq;
using Microsoft.Azure.Cosmos.Query.Core;
using Microsoft.Azure.Documents.Collections;
using Microsoft.VisualStudio.TestTools.UnitTesting;
using Moq;
using Newtonsoft.Json;

[TestClass]
Expand Down Expand Up @@ -147,6 +150,119 @@ public async Task ContainerTest(bool directMode)
}
}

[TestMethod]
[DataRow(false)]
[DataRow(true)]
public async Task QueryRequestRateTest(bool directMode)
{
CosmosClient client = directMode ? DirectCosmosClient : GatewayCosmosClient;
Container container = client.GetContainer(DatabaseId, ContainerId);
List<string> createdIds = new List<string>()
{
"BasicQueryItem" + Guid.NewGuid(),
"BasicQueryItem2"+ Guid.NewGuid(),
"BasicQueryItem3"+ Guid.NewGuid()
};

foreach (string id in createdIds)
{
dynamic item = new
{
id = id,
pk = id,
};

await container.CreateItemAsync<dynamic>(item: item);
}

Documents.IStoreModel storeModel = client.ClientContext.DocumentClient.StoreModel;
Mock<Documents.IStoreModel> mockStore = new Mock<Documents.IStoreModel>();
client.ClientContext.DocumentClient.StoreModel = mockStore.Object;

// Cause 429 after the first call
int callCount = 0;
string activityId = null;
string errorMessage = "Resource Not Found";
mockStore.Setup(x => x.ProcessMessageAsync(It.IsAny<Documents.DocumentServiceRequest>(), It.IsAny<CancellationToken>()))
.Returns<Documents.DocumentServiceRequest, CancellationToken>((dsr, token) =>
{
callCount++;

if (callCount > 1)
{
INameValueCollection headers = new DictionaryNameValueCollection();
headers.Add(Documents.HttpConstants.HttpHeaders.RetryAfterInMilliseconds, "42");
activityId = Guid.NewGuid().ToString();
headers.Add(Documents.HttpConstants.HttpHeaders.ActivityId, activityId);
Documents.DocumentServiceResponse response = new Documents.DocumentServiceResponse(
body: TestCommon.GenerateStreamFromString(@"{""Errors"":[""" + errorMessage + @"""]}"),
headers: headers,
statusCode: (HttpStatusCode)429,
clientSideRequestStatistics: dsr.RequestContext.ClientRequestStatistics);

return Task.FromResult(response);
}

return storeModel.ProcessMessageAsync(dsr, token);
});

List<dynamic> results = new List<dynamic>();
try
{
FeedIterator<dynamic> feedIterator = container.GetItemQueryIterator<dynamic>(
"select * from T where STARTSWITH(T.id, \"BasicQueryItem\")",
requestOptions: new QueryRequestOptions()
{
MaxItemCount = 1,
MaxConcurrency = 1
});

while (feedIterator.HasMoreResults)
{
FeedResponse<dynamic> response = await feedIterator.ReadNextAsync();
Assert.IsTrue(response.Count <= 1);
Assert.IsTrue(response.Resource.Count() <= 1);

results.AddRange(response);
}
Assert.Fail("Should throw 429 exception after the first page.");
}
catch (CosmosException ce)
{
Assert.IsTrue(ce.RetryAfter.HasValue);
Assert.AreEqual(42, ce.RetryAfter.Value.TotalMilliseconds);
Assert.AreEqual(activityId, ce.ActivityId);
Assert.IsNotNull(ce.DiagnosticsContext);
Assert.IsTrue(ce.Message.Contains(errorMessage));
}

callCount = 0;
FeedIterator streamIterator = container.GetItemQueryStreamIterator(
"select * from T where STARTSWITH(T.id, \"BasicQueryItem\")",
requestOptions: new QueryRequestOptions()
{
MaxItemCount = 1,
MaxConcurrency = 1
});

// First request should be a success
using (ResponseMessage response = await streamIterator.ReadNextAsync())
{
response.EnsureSuccessStatusCode();
Assert.IsNotNull(response.Content);
}

// Second page should be a failure
using (ResponseMessage response = await streamIterator.ReadNextAsync())
{
Assert.AreEqual(429, (int)response.StatusCode);
Assert.AreEqual("42", response.Headers.RetryAfterLiteral);
Assert.AreEqual(activityId, response.Headers.ActivityId);
Assert.IsNotNull(response.DiagnosticsContext);
Assert.IsTrue(response.ErrorMessage.Contains(errorMessage));
}
}

[TestMethod]
[DataRow(false)]
[DataRow(true)]
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -53,6 +53,16 @@ static TestCommon()
TestCommon.masterStalenessIntervalInSeconds = int.Parse(ConfigurationManager.AppSettings["MasterStalenessIntervalInSeconds"], CultureInfo.InvariantCulture);
}

internal static MemoryStream GenerateStreamFromString(string s)
{
MemoryStream stream = new MemoryStream();
StreamWriter writer = new StreamWriter(stream);
writer.Write(s);
writer.Flush();
stream.Position = 0;
return stream;
}

internal static (string endpoint, string authKey) GetAccountInfo()
{
string authKey = ConfigurationManager.AppSettings["MasterKey"];
Expand Down
1 change: 1 addition & 0 deletions changelog.md
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0
- [#1242](https://github.com/Azure/azure-cosmos-dotnet-v3/pull/1242) Client encryption - Fix bug in read path without encrypted properties
- [#1189](https://github.com/Azure/azure-cosmos-dotnet-v3/pull/1189) Query diagnostics shows correct overall time.
- [#1189](https://github.com/Azure/azure-cosmos-dotnet-v3/pull/1189) Fixed a bug that caused duplicate information in diagnostic context.
- [#1263](https://github.com/Azure/azure-cosmos-dotnet-v3/pull/1263) Fix a bug where retry after internval did not get set on query stream responses

## <a name="3.7.0-preview2"/> [3.7.0-preview2](https://www.nuget.org/packages/Microsoft.Azure.Cosmos/3.7.0-preview2) - 2020-03-09

Expand Down