Skip to content

Commit 5036833

Browse files
authored
IAsyncEnumerable on DelayedQuery (#170)
* Introduce IAsyncEnumerable on DelayedQuery to utilize async Session query while asynchronously prefetching data * ConfigureAwaitFalse and reformat * fixed review issues
1 parent 158b6eb commit 5036833

File tree

2 files changed

+114
-9
lines changed

2 files changed

+114
-9
lines changed

Orm/Xtensive.Orm.Manual/Prefetch/PrefetchTest.cs

Lines changed: 102 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -5,11 +5,14 @@
55
// Created: 2009.12.24
66

77
using System;
8+
using System.Collections.Generic;
89
using NUnit.Framework;
910
using System.Linq;
11+
using System.Reflection;
12+
using System.Threading;
1013
using System.Threading.Tasks;
11-
using Xtensive.Core;
12-
using Xtensive.Orm.Internals.Prefetch;
14+
using Xtensive.Orm.Internals;
15+
using Xtensive.Orm.Providers;
1316
using Xtensive.Orm.Services;
1417
using Xtensive.Orm.Tests;
1518

@@ -44,7 +47,7 @@ public class Person : Entity
4447

4548
public Person(Session session)
4649
: base(session)
47-
{}
50+
{ }
4851
}
4952

5053
#endregion
@@ -63,10 +66,10 @@ protected override Configuration.DomainConfiguration BuildConfiguration()
6366
[TearDown]
6467
public void ClearContent()
6568
{
66-
using(var session = Domain.OpenSession())
67-
using(var tx = session.OpenTransaction()) {
69+
using (var session = Domain.OpenSession())
70+
using (var tx = session.OpenTransaction()) {
6871
var people = session.Query.All<Person>().ToList();
69-
foreach(var person in people) {
72+
foreach (var person in people) {
7073
person.Manager = null;
7174
}
7275
session.SaveChanges();
@@ -247,7 +250,7 @@ public async Task MultipleBatchesAsyncTest()
247250
var count = 1000;
248251

249252
await using (var session = await Domain.OpenSessionAsync())
250-
using (var transactionScope = session.OpenTransaction()){
253+
using (var transactionScope = session.OpenTransaction()) {
251254
var people = new Person[count];
252255
for (var i = 0; i < count; i++) {
253256
people[i] = new Person(session) { Name = i.ToString(), Photo = new[] { (byte) (i % 256) } };
@@ -376,6 +379,98 @@ public async Task DelayedQueryAsyncTest()
376379
}
377380
}
378381

382+
private class QueryCounterSessionHandlerMoq : ChainingSessionHandler
383+
{
384+
private volatile int syncCounter;
385+
private volatile int asyncCounter;
386+
387+
public int GetSyncCounter() => syncCounter;
388+
389+
public int GetAsyncCounter() => asyncCounter;
390+
391+
public QueryCounterSessionHandlerMoq(SessionHandler chainedHandler) : base(chainedHandler)
392+
{
393+
}
394+
395+
public override void ExecuteQueryTasks(IEnumerable<QueryTask> queryTasks, bool allowPartialExecution)
396+
{
397+
_ = Interlocked.Increment(ref syncCounter);
398+
base.ExecuteQueryTasks(queryTasks, allowPartialExecution);
399+
}
400+
401+
public override Task ExecuteQueryTasksAsync(IEnumerable<QueryTask> queryTasks, bool allowPartialExecution, CancellationToken token)
402+
{
403+
_ = Interlocked.Increment(ref asyncCounter);
404+
return base.ExecuteQueryTasksAsync(queryTasks, allowPartialExecution, token);
405+
}
406+
}
407+
408+
[Test]
409+
public async Task DelayedQueryAsyncShouldMaterializeAsyncTest()
410+
{
411+
await using (var session = await Domain.OpenSessionAsync()) {
412+
using (var transactionScope = session.OpenTransaction()) {
413+
var employee = new Person(session) { Name = "Employee", Photo = new byte[] { 8, 0 } };
414+
var manager = new Person(session) { Name = "Manager", Photo = new byte[] { 8, 0 } };
415+
_ = manager.Employees.Add(employee);
416+
transactionScope.Complete();
417+
}
418+
}
419+
420+
await using (var session = await Domain.OpenSessionAsync()) // no session activation!
421+
{
422+
var sessionAccessor = new DirectSessionAccessor(session);
423+
var prop = typeof(Session).GetProperty("Handler", BindingFlags.Instance | BindingFlags.NonPublic | BindingFlags.GetProperty);
424+
var handler = prop.GetValue(session) as SessionHandler;
425+
var moq = new QueryCounterSessionHandlerMoq(handler);
426+
using (sessionAccessor.ChangeSessionHandler(moq))
427+
428+
using (var transactionScope = session.OpenTransaction()) {
429+
var people = session.Query.CreateDelayedQuery(q => q.All<Person>())
430+
.Prefetch(p => p.Photo) // Lazy load field
431+
.Prefetch(p => p.Employees // EntitySet Employees
432+
.Prefetch(e => e.Photo)) // and lazy load field of each of its items
433+
.Prefetch(p => p.Manager); // Referenced entity
434+
foreach (var person in people) {
435+
// some code here...
436+
}
437+
Assert.That(moq.GetSyncCounter(), Is.GreaterThan(0));
438+
Assert.That(moq.GetAsyncCounter(), Is.EqualTo(0));
439+
transactionScope.Complete();
440+
}
441+
}
442+
443+
await using (var session = await Domain.OpenSessionAsync())// no session activation!
444+
{
445+
var sessionAccessor = new DirectSessionAccessor(session);
446+
var prop = typeof(Session).GetProperty("Handler", BindingFlags.Instance | BindingFlags.NonPublic | BindingFlags.GetProperty);
447+
var handler = prop.GetValue(session) as SessionHandler;
448+
var moq = new QueryCounterSessionHandlerMoq(handler);
449+
using (sessionAccessor.ChangeSessionHandler(moq))
450+
451+
using (var transactionScope = session.OpenTransaction()) {
452+
var people = session.Query.CreateDelayedQuery(q => q.All<Person>())
453+
.Prefetch(p => p.Photo) // Lazy load field
454+
.Prefetch(p => p.Employees.Prefetch(e => e.Photo)) // EntitySet Employees and lazy load field of each of its items with the limit on number of items to be loaded
455+
.Prefetch(p => p.Manager.Photo); // Referenced entity and lazy load field for each of them
456+
await foreach (var person in people.AsAsyncEnumerable()) {
457+
var accessor = DirectStateAccessor.Get(person);
458+
Assert.That(accessor.GetFieldState("Photo"), Is.EqualTo(PersistentFieldState.Loaded));
459+
Assert.That(accessor.GetFieldState("Manager"), Is.EqualTo(PersistentFieldState.Loaded));
460+
if (person.ManagerKey != null) {
461+
Assert.IsNotNull(DirectStateAccessor.Get(session)[person.ManagerKey]);
462+
Assert.That(DirectStateAccessor.Get(person.Manager).GetFieldState("Photo"), Is.EqualTo(PersistentFieldState.Loaded));
463+
}
464+
// some code here...
465+
}
466+
Assert.That(moq.GetSyncCounter(), Is.EqualTo(0));
467+
Assert.That(moq.GetAsyncCounter(), Is.GreaterThan(0));
468+
transactionScope.Complete();
469+
}
470+
471+
}
472+
}
473+
379474
[Test]
380475
public void CachedQueryTest()
381476
{

Orm/Xtensive.Orm/Orm/DelayedQuery{T}.cs

Lines changed: 12 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -20,14 +20,23 @@ namespace Xtensive.Orm
2020
/// </summary>
2121
/// <typeparam name="TElement">The type of the element in a resulting sequence.</typeparam>
2222
[Serializable]
23-
public sealed class DelayedQuery<TElement> : DelayedQuery, IEnumerable<TElement>
23+
public sealed class DelayedQuery<TElement> : DelayedQuery, IEnumerable<TElement>, IAsyncEnumerable<TElement>
2424
{
2525
/// <inheritdoc/>
2626
public IEnumerator<TElement> GetEnumerator() => Materialize<TElement>().GetEnumerator();
2727

2828
/// <inheritdoc/>
2929
IEnumerator IEnumerable.GetEnumerator() => GetEnumerator();
3030

31+
/// <inheritdoc/>
32+
async IAsyncEnumerator<TElement> IAsyncEnumerable<TElement>.GetAsyncEnumerator(CancellationToken token)
33+
{
34+
var elements = await ExecuteAsync(token).ConfigureAwaitFalse();
35+
foreach (var element in elements) {
36+
yield return element;
37+
}
38+
}
39+
3140
/// <summary>
3241
/// Asynchronously executes delayed query.
3342
/// </summary>
@@ -43,6 +52,7 @@ public ValueTask<QueryResult<TElement>> ExecuteAsync(CancellationToken token = d
4352

4453
internal DelayedQuery(Session session, TranslatedQuery translatedQuery, ParameterContext parameterContext)
4554
: base(session, translatedQuery, parameterContext)
46-
{}
55+
{ }
56+
4757
}
4858
}

0 commit comments

Comments
 (0)