Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Use Channel Instead of BufferBlock #5123

Merged
merged 14 commits into from
Jul 8, 2020
11 changes: 6 additions & 5 deletions src/Microsoft.ML.Sweeper/AsyncSweeper.cs
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@
using System;
using System.Collections.Generic;
using System.Threading;
using System.Threading.Channels;
using System.Threading.Tasks;
using System.Threading.Tasks.Dataflow;
jwood803 marked this conversation as resolved.
Show resolved Hide resolved
using Microsoft.ML;
Expand Down Expand Up @@ -168,7 +169,7 @@ public sealed class Options
private readonly object _lock;
private readonly CancellationTokenSource _cts;

private readonly BufferBlock<ParameterSetWithId> _paramQueue;
private readonly Channel<ParameterSetWithId> _paramQueue;
jwood803 marked this conversation as resolved.
Show resolved Hide resolved
private readonly int _relaxation;
private readonly ISweeper _baseSweeper;
private readonly IHost _host;
Expand Down Expand Up @@ -208,7 +209,7 @@ public DeterministicSweeperAsync(IHostEnvironment env, Options options)
_lock = new object();
_results = new List<IRunResult>();
_nullRuns = new HashSet<int>();
_paramQueue = new BufferBlock<ParameterSetWithId>();
_paramQueue = Channel.CreateUnbounded<ParameterSetWithId>();
jwood803 marked this conversation as resolved.
Show resolved Hide resolved

PrepareNextBatch(null);
}
Expand All @@ -220,12 +221,12 @@ private void PrepareNextBatch(IEnumerable<IRunResult> results)
if (Utils.Size(paramSets) == 0)
{
// Mark the queue as completed.
_paramQueue.Complete();
_paramQueue.Writer.Complete();
return;
}
// Assign an id to each ParameterSet and enque it.
foreach (var paramSet in paramSets)
_paramQueue.Post(new ParameterSetWithId(_numGenerated++, paramSet));
_paramQueue.Writer.TryWrite(new ParameterSetWithId(_numGenerated++, paramSet));
EnsureResultsSize();
}

Expand Down Expand Up @@ -278,7 +279,7 @@ public async Task<ParameterSetWithId> ProposeAsync()
return null;
try
{
return await _paramQueue.ReceiveAsync(_cts.Token);
return await _paramQueue.Reader.ReadAsync(_cts.Token);
}
catch (InvalidOperationException)
{
Expand Down
4 changes: 4 additions & 0 deletions src/Microsoft.ML.Sweeper/Microsoft.ML.Sweeper.csproj
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,10 @@
<AllowUnsafeBlocks>true</AllowUnsafeBlocks>
</PropertyGroup>

<ItemGroup>
<PackageReference Include="System.Threading.Channels" Version="4.7.1" />
</ItemGroup>

<ItemGroup>
<ProjectReference Include="..\Microsoft.ML.Core\Microsoft.ML.Core.csproj" />
<ProjectReference Include="..\Microsoft.ML.CpuMath\Microsoft.ML.CpuMath.csproj" />
Expand Down