Skip to content

IAsyncEnumerable on DelayedQuery #170

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 3 commits into from
Dec 20, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
109 changes: 102 additions & 7 deletions Orm/Xtensive.Orm.Manual/Prefetch/PrefetchTest.cs
Original file line number Diff line number Diff line change
Expand Up @@ -5,11 +5,14 @@
// Created: 2009.12.24

using System;
using System.Collections.Generic;
using NUnit.Framework;
using System.Linq;
using System.Reflection;
using System.Threading;
using System.Threading.Tasks;
using Xtensive.Core;
using Xtensive.Orm.Internals.Prefetch;
using Xtensive.Orm.Internals;
using Xtensive.Orm.Providers;
using Xtensive.Orm.Services;
using Xtensive.Orm.Tests;

Expand Down Expand Up @@ -44,7 +47,7 @@ public class Person : Entity

public Person(Session session)
: base(session)
{}
{ }
}

#endregion
Expand All @@ -63,10 +66,10 @@ protected override Configuration.DomainConfiguration BuildConfiguration()
[TearDown]
public void ClearContent()
{
using(var session = Domain.OpenSession())
using(var tx = session.OpenTransaction()) {
using (var session = Domain.OpenSession())
using (var tx = session.OpenTransaction()) {
var people = session.Query.All<Person>().ToList();
foreach(var person in people) {
foreach (var person in people) {
person.Manager = null;
}
session.SaveChanges();
Expand Down Expand Up @@ -247,7 +250,7 @@ public async Task MultipleBatchesAsyncTest()
var count = 1000;

await using (var session = await Domain.OpenSessionAsync())
using (var transactionScope = session.OpenTransaction()){
using (var transactionScope = session.OpenTransaction()) {
var people = new Person[count];
for (var i = 0; i < count; i++) {
people[i] = new Person(session) { Name = i.ToString(), Photo = new[] { (byte) (i % 256) } };
Expand Down Expand Up @@ -376,6 +379,98 @@ public async Task DelayedQueryAsyncTest()
}
}

private class QueryCounterSessionHandlerMoq : ChainingSessionHandler
{
private volatile int syncCounter;
private volatile int asyncCounter;

public int GetSyncCounter() => syncCounter;

public int GetAsyncCounter() => asyncCounter;

public QueryCounterSessionHandlerMoq(SessionHandler chainedHandler) : base(chainedHandler)
{
}

public override void ExecuteQueryTasks(IEnumerable<QueryTask> queryTasks, bool allowPartialExecution)
{
_ = Interlocked.Increment(ref syncCounter);
base.ExecuteQueryTasks(queryTasks, allowPartialExecution);
}

public override Task ExecuteQueryTasksAsync(IEnumerable<QueryTask> queryTasks, bool allowPartialExecution, CancellationToken token)
{
_ = Interlocked.Increment(ref asyncCounter);
return base.ExecuteQueryTasksAsync(queryTasks, allowPartialExecution, token);
}
}

[Test]
public async Task DelayedQueryAsyncShouldMaterializeAsyncTest()
{
await using (var session = await Domain.OpenSessionAsync()) {
using (var transactionScope = session.OpenTransaction()) {
var employee = new Person(session) { Name = "Employee", Photo = new byte[] { 8, 0 } };
var manager = new Person(session) { Name = "Manager", Photo = new byte[] { 8, 0 } };
_ = manager.Employees.Add(employee);
transactionScope.Complete();
}
}

await using (var session = await Domain.OpenSessionAsync()) // no session activation!
{
var sessionAccessor = new DirectSessionAccessor(session);
var prop = typeof(Session).GetProperty("Handler", BindingFlags.Instance | BindingFlags.NonPublic | BindingFlags.GetProperty);
var handler = prop.GetValue(session) as SessionHandler;
var moq = new QueryCounterSessionHandlerMoq(handler);
using (sessionAccessor.ChangeSessionHandler(moq))

using (var transactionScope = session.OpenTransaction()) {
var people = session.Query.CreateDelayedQuery(q => q.All<Person>())
.Prefetch(p => p.Photo) // Lazy load field
.Prefetch(p => p.Employees // EntitySet Employees
.Prefetch(e => e.Photo)) // and lazy load field of each of its items
.Prefetch(p => p.Manager); // Referenced entity
foreach (var person in people) {
// some code here...
}
Assert.That(moq.GetSyncCounter(), Is.GreaterThan(0));
Assert.That(moq.GetAsyncCounter(), Is.EqualTo(0));
transactionScope.Complete();
}
}

await using (var session = await Domain.OpenSessionAsync())// no session activation!
{
var sessionAccessor = new DirectSessionAccessor(session);
var prop = typeof(Session).GetProperty("Handler", BindingFlags.Instance | BindingFlags.NonPublic | BindingFlags.GetProperty);
var handler = prop.GetValue(session) as SessionHandler;
var moq = new QueryCounterSessionHandlerMoq(handler);
using (sessionAccessor.ChangeSessionHandler(moq))

using (var transactionScope = session.OpenTransaction()) {
var people = session.Query.CreateDelayedQuery(q => q.All<Person>())
.Prefetch(p => p.Photo) // Lazy load field
.Prefetch(p => p.Employees.Prefetch(e => e.Photo)) // EntitySet Employees and lazy load field of each of its items with the limit on number of items to be loaded
.Prefetch(p => p.Manager.Photo); // Referenced entity and lazy load field for each of them
await foreach (var person in people.AsAsyncEnumerable()) {
var accessor = DirectStateAccessor.Get(person);
Assert.That(accessor.GetFieldState("Photo"), Is.EqualTo(PersistentFieldState.Loaded));
Assert.That(accessor.GetFieldState("Manager"), Is.EqualTo(PersistentFieldState.Loaded));
if (person.ManagerKey != null) {
Assert.IsNotNull(DirectStateAccessor.Get(session)[person.ManagerKey]);
Assert.That(DirectStateAccessor.Get(person.Manager).GetFieldState("Photo"), Is.EqualTo(PersistentFieldState.Loaded));
}
// some code here...
}
Assert.That(moq.GetSyncCounter(), Is.EqualTo(0));
Assert.That(moq.GetAsyncCounter(), Is.GreaterThan(0));
transactionScope.Complete();
}

}
}

[Test]
public void CachedQueryTest()
{
Expand Down
14 changes: 12 additions & 2 deletions Orm/Xtensive.Orm/Orm/DelayedQuery{T}.cs
Original file line number Diff line number Diff line change
Expand Up @@ -20,14 +20,23 @@ namespace Xtensive.Orm
/// </summary>
/// <typeparam name="TElement">The type of the element in a resulting sequence.</typeparam>
[Serializable]
public sealed class DelayedQuery<TElement> : DelayedQuery, IEnumerable<TElement>
public sealed class DelayedQuery<TElement> : DelayedQuery, IEnumerable<TElement>, IAsyncEnumerable<TElement>
{
/// <inheritdoc/>
public IEnumerator<TElement> GetEnumerator() => Materialize<TElement>().GetEnumerator();

/// <inheritdoc/>
IEnumerator IEnumerable.GetEnumerator() => GetEnumerator();

/// <inheritdoc/>
async IAsyncEnumerator<TElement> IAsyncEnumerable<TElement>.GetAsyncEnumerator(CancellationToken token)
{
var elements = await ExecuteAsync(token).ConfigureAwaitFalse();
foreach (var element in elements) {
yield return element;
}
}

/// <summary>
/// Asynchronously executes delayed query.
/// </summary>
Expand All @@ -43,6 +52,7 @@ public ValueTask<QueryResult<TElement>> ExecuteAsync(CancellationToken token = d

internal DelayedQuery(Session session, TranslatedQuery translatedQuery, ParameterContext parameterContext)
: base(session, translatedQuery, parameterContext)
{}
{ }

}
}