Skip to content

Commit 26a5854

Browse files
committed
More SlimResult, Standardize patterns
1 parent 43bd56b commit 26a5854

File tree

4 files changed

+56
-147
lines changed

4 files changed

+56
-147
lines changed

src/core/Akka.Streams/Implementation/Fusing/Ops.cs

Lines changed: 7 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -2528,7 +2528,13 @@ public readonly struct SlimResult<T>
25282528

25292529
public static readonly SlimResult<T> NotYetReady =
25302530
new SlimResult<T>(NotYetThereSentinel.Instance, default);
2531-
2531+
2532+
public static SlimResult<T> FromTask(Task<T> task)
2533+
{
2534+
return task.IsCanceled || task.IsFaulted
2535+
? new SlimResult<T>(task.Exception, default)
2536+
: new SlimResult<T>(default, task.Result);
2537+
}
25322538
public SlimResult(Exception errorOrSentinel, T result)
25332539
{
25342540
if (result == null)

src/core/Akka.Streams/Implementation/PooledValueTaskContinuationHelper.cs

Lines changed: 0 additions & 118 deletions
This file was deleted.

src/core/Akka.Streams/Implementation/Sources.cs

Lines changed: 27 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -12,6 +12,7 @@
1212
using Akka.Annotations;
1313
using Akka.Pattern;
1414
using Akka.Streams.Dsl;
15+
using Akka.Streams.Implementation.Fusing;
1516
using Akka.Streams.Implementation.Stages;
1617
using Akka.Streams.Stage;
1718
using Akka.Streams.Supervision;
@@ -779,8 +780,8 @@ private sealed class Logic : OutGraphStageLogic
779780
private readonly Lazy<Decider> _decider;
780781
private Option<TSource> _state = Option<TSource>.None;
781782

782-
private readonly PooledValueTaskContinuationHelper<Option<TOut>>
783-
_pooledContinuation;
783+
private ValueTask<Option<TOut>> _currentReadVt;
784+
private readonly Action _valueTaskAwaiterOnCompleteAction;
784785
public Logic(UnfoldResourceSourceValueTaskAsync<TOut, TCreateState, TSource> stage, Attributes inheritedAttributes)
785786
: base(stage.Shape)
786787
{
@@ -790,12 +791,11 @@ public Logic(UnfoldResourceSourceValueTaskAsync<TOut, TCreateState, TSource> sta
790791
var strategy = inheritedAttributes.GetAttribute<ActorAttributes.SupervisionStrategy>(null);
791792
return strategy != null ? strategy.Decider : Deciders.StoppingDecider;
792793
});
793-
_pooledContinuation =
794-
new PooledValueTaskContinuationHelper<Option<TOut>>(
795-
ReadCallback);
794+
_valueTaskAwaiterOnCompleteAction = SelfReadCallback;
796795
SetHandler(_stage.Out, this);
797796
}
798797

798+
799799
private Action<Try<TSource>> CreatedCallback => GetAsyncCallback<Try<TSource>>(resource =>
800800
{
801801
if (resource.IsSuccess)
@@ -830,12 +830,26 @@ private void ErrorHandler(Exception ex)
830830
throw new ArgumentOutOfRangeException();
831831
}
832832
}
833-
834-
private Action<Try<Option<TOut>>> ReadCallback => GetAsyncCallback<Try<Option<TOut>>>(read =>
833+
834+
835+
private void SelfReadCallback()
835836
{
836-
if (read.IsSuccess)
837+
var swap = _currentReadVt;
838+
_currentReadVt = default;
839+
if (swap.IsCompletedSuccessfully)
837840
{
838-
var data = read.Success.Value;
841+
ReadCallback(new SlimResult<Option<TOut>>(default,swap.Result));
842+
}
843+
else
844+
{
845+
ReadCallback(SlimResult<Option<TOut>>.FromTask(swap.AsTask()));
846+
}
847+
}
848+
private Action<SlimResult<Option<TOut>>> ReadCallback => GetAsyncCallback<SlimResult<Option<TOut>>>(read =>
849+
{
850+
if (read.IsSuccess())
851+
{
852+
var data = read.Result;
839853
if (data.HasValue)
840854
{
841855
var some = data.Value;
@@ -855,7 +869,7 @@ private void ErrorHandler(Exception ex)
855869
}
856870
}
857871
}
858-
else ErrorHandler(read.Failure.Value);
872+
else ErrorHandler(read.Error);
859873
});
860874

861875
private void CloseResource()
@@ -894,7 +908,9 @@ public override void OnPull()
894908
}
895909
else
896910
{
897-
_pooledContinuation.AttachAwaiter(vt);
911+
_currentReadVt = vt;
912+
_currentReadVt.GetAwaiter().OnCompleted(_valueTaskAwaiterOnCompleteAction);
913+
//_pooledContinuation.AttachAwaiter(vt);
898914
}
899915

900916

src/core/Akka.Streams/Implementation/Unfold.cs

Lines changed: 22 additions & 17 deletions
Original file line numberDiff line numberDiff line change
@@ -10,6 +10,7 @@
1010
using System.Threading.Tasks;
1111
using System.Threading.Tasks.Sources;
1212
using Akka.Annotations;
13+
using Akka.Streams.Implementation.Fusing;
1314
using Akka.Streams.Stage;
1415
using Akka.Streams.Util;
1516
using Akka.Util;
@@ -103,7 +104,7 @@ private sealed class Logic : OutGraphStageLogic
103104
{
104105
private readonly UnfoldValueTaskAsync<TState, TElement> _stage;
105106
private TState _state;
106-
private Action<Result<Option<(TState, TElement)>>> _asyncHandler;
107+
private Action<SlimResult<Option<(TState, TElement)>>> _asyncHandler;
107108
private ValueTask<Option<(TState, TElement)>> _currentTask;
108109
public Logic(UnfoldValueTaskAsync<TState, TElement> stage) : base(stage.Shape)
109110
{
@@ -116,38 +117,42 @@ public Logic(UnfoldValueTaskAsync<TState, TElement> stage) : base(stage.Shape)
116117
public override void OnPull()
117118
{
118119
var vt = _stage.UnfoldFunc(_state);
119-
var peeker = Unsafe.As<ValueTask<Option<(TState,TElement)>>,ValueTaskCheatingPeeker<Option<(TState,TElement)>>>(ref vt);
120-
if (peeker._obj == null)
121-
{
122-
_asyncHandler(Result.Success<Option<(TState, TElement)>>(peeker._result));
123-
}
124-
else
125-
{
126-
_currentTask = vt;
127-
vt.GetAwaiter().OnCompleted(CompletionAction);
128-
}
120+
if (vt.IsCompletedSuccessfully)
121+
{
122+
_asyncHandler(
123+
new SlimResult<Option<(TState, TElement)>>(default,
124+
vt.Result));
125+
}
126+
else
127+
{
128+
_currentTask = vt;
129+
vt.GetAwaiter().OnCompleted(CompletionAction);
130+
}
129131
}
130132
private void CompletionAction()
131133
{
132134
if (_currentTask.IsCompletedSuccessfully)
133135
{
134-
_asyncHandler.Invoke(Result.Success(_currentTask.Result));
136+
_asyncHandler.Invoke(
137+
new SlimResult<Option<(TState, TElement)>>(default,
138+
_currentTask.Result));
135139
}
136140
else
137141
{
138142
_asyncHandler.Invoke(
139-
Result.FromTask(_currentTask.AsTask()));
143+
SlimResult<Option<(TState, TElement)>>.FromTask(
144+
_currentTask.AsTask()));
140145
}
141146
}
142147
public override void PreStart()
143148
{
144-
var ac = GetAsyncCallback<Result<Option<(TState, TElement)>>>(result =>
149+
var ac = GetAsyncCallback<SlimResult<Option<(TState, TElement)>>>(result =>
145150
{
146-
if (!result.IsSuccess)
147-
Fail(_stage.Out, result.Exception);
151+
if (!result.IsSuccess())
152+
Fail(_stage.Out, result.Error);
148153
else
149154
{
150-
var option = result.Value;
155+
var option = result.Result;
151156
if (!option.HasValue)
152157
Complete(_stage.Out);
153158
else

0 commit comments

Comments
 (0)