Skip to content

Commit

Permalink
Merge branch 'master' into users/ealsur/errormessage
Browse files Browse the repository at this point in the history
  • Loading branch information
kirankumarkolli authored Sep 4, 2019
2 parents c082a06 + 83bd642 commit ec16bd7
Show file tree
Hide file tree
Showing 3 changed files with 51 additions and 1 deletion.
Original file line number Diff line number Diff line change
Expand Up @@ -116,14 +116,20 @@ private Task DispatchChangesAsync(ResponseMessage response, CancellationToken ca
Collection<T> asFeedResponse;
try
{
asFeedResponse = cosmosJsonSerializer.FromStream<CosmosFeedResponseUtil<T>>(response.Content).Data;
asFeedResponse = this.cosmosJsonSerializer.FromStream<CosmosFeedResponseUtil<T>>(response.Content).Data;
}
catch (Exception serializationException)
{
// Error using custom serializer to parse stream
throw new ObserverException(serializationException);
}

// When StartFromBeginning is used, the first request returns OK but no content
if (asFeedResponse.Count == 0)
{
return Task.CompletedTask;
}

List<T> asReadOnlyList = new List<T>(asFeedResponse.Count);
asReadOnlyList.AddRange(asFeedResponse);

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -195,5 +195,48 @@ await scripts.ExecuteStoredProcedureAsync<object>(
Assert.IsTrue(isStartOk, "Timed out waiting for docs to process");
Assert.AreEqual("doc0.doc1.doc2.doc3.doc4.", accumulator);
}

[TestMethod]
public async Task TestWithStartTime_Beginning()
{
int partitionKey = 0;

foreach (int id in Enumerable.Range(0, 5))
{
await this.Container.CreateItemAsync<dynamic>(new { id = $"doc{id}", pk = partitionKey });
}

ManualResetEvent allDocsProcessed = new ManualResetEvent(false);

int processedDocCount = 0;
string accumulator = string.Empty;
ChangeFeedProcessor processor = this.Container
.GetChangeFeedProcessorBuilder("test", (IReadOnlyCollection<dynamic> docs, CancellationToken token) =>
{
Assert.IsTrue(docs.Count > 0);
processedDocCount += docs.Count;
foreach (dynamic doc in docs)
{
accumulator += doc.id.ToString() + ".";
}

if (processedDocCount == 5)
{
allDocsProcessed.Set();
}

return Task.CompletedTask;
})
.WithStartTime(DateTime.MinValue.ToUniversalTime())
.WithInstanceName("random")
.WithLeaseContainer(this.LeaseContainer).Build();

await processor.StartAsync();
// Letting processor initialize and pickup changes
bool isStartOk = allDocsProcessed.WaitOne(10 * BaseChangeFeedClientHelper.ChangeFeedSetupTime);
await processor.StopAsync();
Assert.IsTrue(isStartOk, "Timed out waiting for docs to process");
Assert.AreEqual("doc0.doc1.doc2.doc3.doc4.", accumulator);
}
}
}
1 change: 1 addition & 0 deletions changelog.md
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0
- [#726](https://github.com/Azure/azure-cosmos-dotnet-v3/pull/726) Query iterator HasMoreResults now returns false if an exception is hit
- [#705](https://github.com/Azure/azure-cosmos-dotnet-v3/pull/705) User agent suffix gets truncated
- [#753](https://github.com/Azure/azure-cosmos-dotnet-v3/pull/753) Reason was not being propagated for Conflict exceptions
- [#756](https://github.com/Azure/azure-cosmos-dotnet-v3/pull/756) Change Feed Processor with WithStartTime would execute the delegate the first time with no items.

## [3.1.1](https://www.nuget.org/packages/Microsoft.Azure.Cosmos/3.1.1) - 2019-08-12

Expand Down

0 comments on commit ec16bd7

Please sign in to comment.