Skip to content

Commit

Permalink
Added DataLoader support to new execution engine. (ChilliCream#2233)
Browse files Browse the repository at this point in the history
  • Loading branch information
michaelstaib authored Aug 18, 2020
1 parent 0914215 commit 21da0c6
Show file tree
Hide file tree
Showing 91 changed files with 1,653 additions and 2,381 deletions.
50 changes: 25 additions & 25 deletions .devops/azure-pipelines.release-hotchocolate.yml
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
--- # ------------------------------------------------------------------------------
# ------------------------------------------------------------------------------
# <auto-generated>
#
Expand All @@ -12,60 +13,59 @@
# nuke --generate-configuration AzurePipelines_test-pr-hotchocolate --host AzurePipelines
#
# </auto-generated>
# ------------------------------------------------------------------------------
trigger: none

stages:
- stage: ubuntu_latest
displayName: 'ubuntu-latest'
dependsOn: [ ]
displayName: "ubuntu-latest"
dependsOn: []
pool:
vmImage: 'ubuntu-latest'
vmImage: "ubuntu-latest"
jobs:
- job: Test
displayName: 'Test'
dependsOn: [ ]
displayName: "Test"
dependsOn: []
strategy:
parallel: 8
steps:
steps:
- task: CmdLine@2
inputs:
script: './build.cmd test --skip --test-partition $(System.JobPositionInPhase)'
script: "./build.cmd test --skip --test-partition $(System.JobPositionInPhase)"
- task: PublishBuildArtifacts@1
inputs:
artifactName: test-results
pathtoPublish: 'output/test-results'
pathtoPublish: "output/test-results"
- job: Pack
displayName: 'Pack'
dependsOn: [ ]
steps:
displayName: "Pack"
dependsOn: []
steps:
- task: CmdLine@2
inputs:
script: './build.cmd pack --skip'
script: "./build.cmd pack --skip"
- task: PublishBuildArtifacts@1
inputs:
artifactName: packages
pathtoPublish: 'output/packages'
pathtoPublish: "output/packages"
- job: Publish
displayName: 'Publish'
dependsOn: [ Test, Pack ]
displayName: "Publish"
dependsOn: [Test, Pack]
steps:
- task: DownloadBuildArtifacts@0
inputs:
artifactName: 'packages'
downloadPath: '$(Build.Repository.LocalPath)/output'
artifactName: "packages"
downloadPath: "$(Build.Repository.LocalPath)/output"
- task: CmdLine@2
inputs:
script: './build.cmd publish --skip'
script: "./build.cmd publish --skip"
- task: GitHubRelease@0
displayName: 'GitHub release'
displayName: "GitHub release"
inputs:
gitHubConnection: 'ChilliCream GitHub'
gitHubConnection: "ChilliCream GitHub"
action: edit
tag: '$(GitHubVersion)'
title: '$(GitHubVersion)'
assets: '$(Build.Repository.LocalPath)/output/packages/*.*'
tag: "$(GitHubVersion)"
title: "$(GitHubVersion)"
assets: "$(Build.Repository.LocalPath)/output/packages/*.*"
assetUploadMode: replace
isPreRelease: true
releaseNotesSource: input
releaseNotes: 'For more details click [here](https://github.com/ChilliCream/hotchocolate/blob/master/CHANGELOG.md) to get to our CHANGELOG.'
releaseNotes: "For more details click [here](https://github.com/ChilliCream/hotchocolate/blob/master/CHANGELOG.md) to get to our CHANGELOG."
4 changes: 4 additions & 0 deletions build.ps1
Original file line number Diff line number Diff line change
Expand Up @@ -55,6 +55,10 @@ else {
[Net.ServicePointManager]::SecurityProtocol = [Net.SecurityProtocolType]::Tls12
(New-Object System.Net.WebClient).DownloadFile($DotNetInstallUrl, $DotNetInstallFile)

ExecSafe { & $DotNetInstallFile -InstallDir $DotNetDirectory -Version "2.1.809" -NoPath }
ExecSafe { & $DotNetInstallFile -InstallDir $DotNetDirectory -Version "3.0.103" -NoPath }
ExecSafe { & $DotNetInstallFile -InstallDir $DotNetDirectory -Version "3.1.401" -NoPath }

# Install by channel or version
if (!(Test-Path variable:DotNetVersion)) {
ExecSafe { & $DotNetInstallFile -InstallDir $DotNetDirectory -Channel $DotNetChannel -NoPath }
Expand Down
4 changes: 4 additions & 0 deletions build.sh
Original file line number Diff line number Diff line change
Expand Up @@ -48,6 +48,10 @@ else
curl -Lsfo "$DOTNET_INSTALL_FILE" "$DOTNET_INSTALL_URL"
chmod +x "$DOTNET_INSTALL_FILE"

"$DOTNET_INSTALL_FILE" --install-dir "$DOTNET_DIRECTORY" --version "2.1.809" --no-path
"$DOTNET_INSTALL_FILE" --install-dir "$DOTNET_DIRECTORY" --version "3.0.103" --no-path
"$DOTNET_INSTALL_FILE" --install-dir "$DOTNET_DIRECTORY" --version "3.1.401" --no-path

# Install by channel or version
if [[ -z ${DOTNET_VERSION+x} ]]; then
"$DOTNET_INSTALL_FILE" --install-dir "$DOTNET_DIRECTORY" --channel "$DOTNET_CHANNEL" --no-path
Expand Down
2 changes: 1 addition & 1 deletion global.json
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
{
"sdk": {
"version": "3.1.300"
"version": "5.0.100-preview.7.20366.6"
}
}
64 changes: 43 additions & 21 deletions src/GreenDonut/src/Core/Batch.cs
Original file line number Diff line number Diff line change
Expand Up @@ -2,56 +2,78 @@
using System;
using System.Collections.Generic;
using System.Threading.Tasks;
using System.Diagnostics.CodeAnalysis;

namespace GreenDonut
{
public class Batch<TKey, TValue>
where TKey : notnull
{
private readonly object _sync = new object();
private bool _hasDispatched = false;
private Dictionary<TKey, TaskCompletionSource<TValue>> _items =
new Dictionary<TKey, TaskCompletionSource<TValue>>();

public bool HasDispatched { get; private set; }

public IReadOnlyList<TKey> Keys => _items.Keys.ToArray();

public int Size => _items.Count;

public TaskCompletionSource<TValue> CreateOrGet(TKey key)
public bool TryGetOrCreate(
TKey key,
[NotNullWhen(true)] out TaskCompletionSource<TValue>? promise)
{
ThrowIfDispatched();

if (_items.ContainsKey(key))
if (!_hasDispatched)
{
return _items[key];
}
lock (_sync)
{
if (!_hasDispatched)
{
if (_items.ContainsKey(key))
{
promise = _items[key];
}
else
{
promise = new TaskCompletionSource<TValue>(
TaskCreationOptions.RunContinuationsAsynchronously);
_items.Add(key, promise);
}

var promise = new TaskCompletionSource<TValue>(
TaskCreationOptions.RunContinuationsAsynchronously);

_items.Add(key, promise);
return true;
}
}
}

return promise;
promise = null;
return false;
}

public TaskCompletionSource<TValue> Get(TKey key)
{
return _items[key];
}

public void StartDispatching()
public ValueTask StartDispatchingAsync(Func<ValueTask> dispatch)
{
ThrowIfDispatched();
bool execute = false;

HasDispatched = true;
}
if (!_hasDispatched)
{
lock (_sync)
{
if (!_hasDispatched)
{
execute = _hasDispatched = true;
}
}
}

private void ThrowIfDispatched()
{
if (HasDispatched)
if (execute)
{
throw new InvalidOperationException("This batch has already been dispatched.");
return dispatch();
}

return default;
}
}
}
73 changes: 29 additions & 44 deletions src/GreenDonut/src/Core/DataLoaderBase.cs
Original file line number Diff line number Diff line change
Expand Up @@ -174,7 +174,7 @@ public void Clear()
/// A list of results which are in the exact same order as the provided
/// keys.
/// </returns>
protected abstract Task<IReadOnlyList<Result<TValue>>> FetchAsync(
protected abstract ValueTask<IReadOnlyList<Result<TValue>>> FetchAsync(
IReadOnlyList<TKey> keys,
CancellationToken cancellationToken);

Expand All @@ -189,7 +189,6 @@ public Task<TValue> LoadAsync(TKey key, CancellationToken cancellationToken)
lock (_sync)
{
object cacheKey = _cacheKeyResolver(key);
Batch<TKey, TValue> batch = GetCurrentBatch();

if (_options.Caching && _cache.TryGetValue(cacheKey, out object? cachedValue))
{
Expand All @@ -200,7 +199,7 @@ public Task<TValue> LoadAsync(TKey key, CancellationToken cancellationToken)
return cachedTask;
}

TaskCompletionSource<TValue> promise = batch.CreateOrGet(key);
TaskCompletionSource<TValue> promise = GetOrCreatePromise(key);

if (_options.Caching)
{
Expand Down Expand Up @@ -312,62 +311,48 @@ private void BatchOperationSucceeded(
}
}

private Task DispatchBatchAsync(
private ValueTask DispatchBatchAsync(
Batch<TKey, TValue> batch,
CancellationToken cancellationToken)
{
if (!batch.HasDispatched)
return batch.StartDispatchingAsync(async () =>
{
lock (_sync)
{
if (!batch.HasDispatched)
{
batch.StartDispatching();
Activity? activity = DiagnosticEvents.StartBatching(batch.Keys);
IReadOnlyList<Result<TValue>> results = new Result<TValue>[0];

return DispatchBatchInternalAsync(batch, batch.Keys, cancellationToken);
}
try
{
results = await FetchAsync(batch.Keys, cancellationToken).ConfigureAwait(false);
BatchOperationSucceeded(batch, batch.Keys, results);
}
catch (Exception ex)
{
BatchOperationFailed(batch, batch.Keys, ex);
}
}

return Task.CompletedTask;
DiagnosticEvents.StopBatching(activity, batch.Keys,
results.Select(result => result.Value).ToArray());
});
}

private async Task DispatchBatchInternalAsync(
Batch<TKey, TValue> batch,
IReadOnlyList<TKey> keys,
CancellationToken cancellationToken)
private TaskCompletionSource<TValue> GetOrCreatePromise(TKey key)
{
Activity? activity = DiagnosticEvents.StartBatching(keys);
IReadOnlyList<Result<TValue>> results = new Result<TValue>[0];

try
{
results = await FetchAsync(keys, cancellationToken).ConfigureAwait(false);
BatchOperationSucceeded(batch, keys, results);
}
catch (Exception ex)
if (_currentBatch is {} &&
_currentBatch.Size < _maxBatchSize &&
_currentBatch.TryGetOrCreate(key, out TaskCompletionSource<TValue>? promise) &&
promise is {})
{
BatchOperationFailed(batch, keys, ex);
return promise;
}

DiagnosticEvents.StopBatching(activity, keys,
results.Select(result => result.Value).ToArray());
}

private Batch<TKey, TValue> GetCurrentBatch()
{
if (_currentBatch == null ||
_currentBatch.HasDispatched ||
_currentBatch.Size == _maxBatchSize)
{
var newBatch = new Batch<TKey, TValue>();
var newBatch = new Batch<TKey, TValue>();

_batchScheduler.Schedule(() =>
DispatchBatchAsync(newBatch, _disposeTokenSource.Token));
_currentBatch = newBatch;
}
newBatch.TryGetOrCreate(key, out TaskCompletionSource<TValue>? newPromise);
_batchScheduler.Schedule(() =>
DispatchBatchAsync(newBatch, _disposeTokenSource.Token));
_currentBatch = newBatch;

return _currentBatch;
return newPromise!;
}

private async Task<IReadOnlyList<TValue>> LoadInternalAsync(
Expand Down
2 changes: 1 addition & 1 deletion src/GreenDonut/src/Core/FetchDataDelegate.cs
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@ namespace GreenDonut
/// A list of results which are in the exact same order as the provided
/// keys.
/// </returns>
public delegate Task<IReadOnlyList<Result<TValue>>> FetchDataDelegate<TKey, TValue>(
public delegate ValueTask<IReadOnlyList<Result<TValue>>> FetchDataDelegate<TKey, TValue>(
IReadOnlyList<TKey> keys,
CancellationToken cancellationToken)
where TKey : notnull;
Expand Down
1 change: 1 addition & 0 deletions src/GreenDonut/src/Core/GreenDonut.csproj
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@

<ItemGroup>
<PackageReference Include="System.Diagnostics.DiagnosticSource" Version="4.7.0" />
<PackageReference Include="System.Threading.Tasks.Extensions" Version="4.5.0" />
</ItemGroup>

</Project>
3 changes: 2 additions & 1 deletion src/GreenDonut/src/Core/IBatchScheduler.cs
Original file line number Diff line number Diff line change
@@ -1,9 +1,10 @@
using System;
using System.Threading.Tasks;

namespace GreenDonut
{
public interface IBatchScheduler
{
void Schedule(Action dispatch);
void Schedule(Func<ValueTask> dispatch);
}
}
2 changes: 1 addition & 1 deletion src/GreenDonut/test/Core.Tests/DataLoader.cs
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@ public DataLoader(
_fetch = fetch ?? throw new ArgumentNullException(nameof(fetch));
}

protected override Task<IReadOnlyList<Result<TValue>>> FetchAsync(
protected override ValueTask<IReadOnlyList<Result<TValue>>> FetchAsync(
IReadOnlyList<TKey> keys,
CancellationToken cancellationToken)
{
Expand Down
Loading

0 comments on commit 21da0c6

Please sign in to comment.