From 55da8d9d8ab3f3e2f46ddd78529fab20c99a507a Mon Sep 17 00:00:00 2001 From: Paul Louth Date: Fri, 14 Feb 2025 19:12:24 +0000 Subject: [PATCH] Enhance IO Monad with new robustness and utilities Added extensive functionality to the IO Monad, including local cancellation, retry, repeat, and fold mechanisms to improve resource management and scheduling options. Updated method implementations and removed obsolete functionality for improved consistency and performance. --- LanguageExt.Core/Common/Errors.cs | 12 +- LanguageExt.Core/Common/Exceptions.cs | 2 +- .../Extensions/Eff.Extensions.cs | 8 +- .../Extensions/Eff.Extensions.cs | 11 +- LanguageExt.Core/Effects/IO/IO.Extensions.cs | 196 ++------- LanguageExt.Core/Effects/IO/IO.Module.cs | 20 +- LanguageExt.Core/Effects/IO/IO.Monad.cs | 390 ++++++++++++++++++ LanguageExt.Core/Effects/IO/IO.Prelude.cs | 72 +--- .../Prelude.Collections.cs | 16 + .../Traits/Monads/MonadIO/MonadIO.Trait.cs | 389 ++++++++++++++++- .../Concurrent/Inbox/Inbox.DSL.cs | 20 +- LanguageExt.Pipes/Concurrent/Inbox/Inbox.cs | 4 +- .../Concurrent/Mailbox/Mailbox.Module.cs | 10 +- .../Concurrent/Outbox/Outbox.DSL.cs | 18 +- LanguageExt.Pipes/Concurrent/Outbox/Outbox.cs | 3 +- LanguageExt.Pipes/Consumer/Consumer.Module.cs | 30 +- LanguageExt.Pipes/Effect/Effect.Module.cs | 13 + LanguageExt.Pipes/Effect/Effect.Monad.cs | 5 + .../EffectT/EffectT.Extensions.cs | 6 + LanguageExt.Pipes/EffectT/EffectT.Monad.cs | 5 + LanguageExt.Pipes/Pipe/Pipe.Module.cs | 51 +++ LanguageExt.Pipes/Pipe/Pipe.cs | 3 +- LanguageExt.Pipes/PipeT/PipeT.Module.cs | 22 + LanguageExt.Pipes/PipeT/PipeT.Monad.cs | 5 + LanguageExt.Pipes/Producer/Producer.Module.cs | 77 ++-- .../ProducerT/ProducerT.Module.cs | 46 ++- LanguageExt.Tests/PipesTests.cs | 19 +- LanguageExt.Tests/Sys/Diag/ActivityTests.cs | 6 +- .../EffectsExamples/Examples/QueueExample.cs | 11 +- Samples/PipesExamples/Program.cs | 56 +-- Samples/TestBed.Web/Program.cs | 3 +- Samples/TestBed/PipesTest.cs | 42 +- Samples/TestBed/Program.cs | 65 +-- 33 files changed, 1152 insertions(+), 484 deletions(-) diff --git a/LanguageExt.Core/Common/Errors.cs b/LanguageExt.Core/Common/Errors.cs index 9c66cfe3e..989f75cf9 100644 --- a/LanguageExt.Core/Common/Errors.cs +++ b/LanguageExt.Core/Common/Errors.cs @@ -120,22 +120,22 @@ public static class Errors public static readonly Error LiftIONotSupported = (LiftIONotSupportedCode, LiftIONotSupportedText); /// - /// Transformer stack has no unliftIO support error text + /// Transformer stack has no `ToIO` support error text /// - public const string UnliftIONotSupportedText = + public const string ToIONotSupportedText = "The IO monad is not in the monad-transformer stack or MonadIO.ToIO has not been implemented in the trait " + "implementation for your monad-type. Therefore it's not possible to leverage `MonadIO` unlifting trait " + "functionality. To resolve this, implement `MonadIO.ToIO` and/or `MonadIO,MapIO`."; /// - /// Transformer stack has no unliftIO support error code + /// Transformer stack has no `ToIO` support error code /// - public const int UnliftIONotSupportedCode = -2000000008; + public const int ToIONotSupportedCode = -2000000008; /// - /// Transformer stack has no unliftIO support error + /// Transformer stack has no `ToIO` support error /// - public static readonly Error UnliftIONotSupported = (UnliftIONotSupportedCode, UnliftIONotSupportedText); + public static readonly Error ToIONotSupported = (ToIONotSupportedCode, ToIONotSupportedText); /// /// End-of-stream error text diff --git a/LanguageExt.Core/Common/Exceptions.cs b/LanguageExt.Core/Common/Exceptions.cs index 17d4c172a..d6eee28ad 100644 --- a/LanguageExt.Core/Common/Exceptions.cs +++ b/LanguageExt.Core/Common/Exceptions.cs @@ -50,7 +50,7 @@ public class Exceptions /// /// Transformer stack has no unliftIO support error /// - public static readonly ExceptionalException UnliftIONotSupported = new (Errors.UnliftIONotSupportedText, Errors.UnliftIONotSupportedCode); + public static readonly ExceptionalException UnliftIONotSupported = new (Errors.ToIONotSupportedText, Errors.ToIONotSupportedCode); /// /// End-of-stream error diff --git a/LanguageExt.Core/Effects/Eff/Eff no runtime/Extensions/Eff.Extensions.cs b/LanguageExt.Core/Effects/Eff/Eff no runtime/Extensions/Eff.Extensions.cs index 6c49d7434..ed41e0006 100644 --- a/LanguageExt.Core/Effects/Eff/Eff no runtime/Extensions/Eff.Extensions.cs +++ b/LanguageExt.Core/Effects/Eff/Eff no runtime/Extensions/Eff.Extensions.cs @@ -163,7 +163,7 @@ public static Eff SelectMany( this (K First, K Second) self, Func<(A First, B Second), K> bind, Func<(A First, B Second), C, D> project) => - self.ZipIO().Bind(ab => bind(ab).Map(c => project(ab, c))).As(); + self.Zip().Bind(ab => bind(ab).Map(c => project(ab, c))).As(); /// /// Monadic bind and project with paired IO monads @@ -172,7 +172,7 @@ public static Eff SelectMany( this K self, Func First, K Second)> bind, Func project) => - self.As().Bind(a => bind(a).ZipIO().Map(cd => project(a, cd))); + self.As().Bind(a => bind(a).Zip().Map(cd => project(a, cd))); /// /// Monadic bind and project with paired IO monads @@ -181,7 +181,7 @@ public static Eff SelectMany( this (K First, K Second, K Third) self, Func<(A First, B Second, C Third), K> bind, Func<(A First, B Second, C Third), D, E> project) => - self.ZipIO().Bind(ab => bind(ab).Map(c => project(ab, c))).As(); + self.Zip().Bind(ab => bind(ab).Map(c => project(ab, c))).As(); /// /// Monadic bind and project with paired IO monads @@ -190,5 +190,5 @@ public static Eff SelectMany( this K self, Func First, K Second, K Third)> bind, Func project) => - self.As().Bind(a => bind(a).ZipIO().Map(cd => project(a, cd))); + self.As().Bind(a => bind(a).Zip().Map(cd => project(a, cd))); } diff --git a/LanguageExt.Core/Effects/Eff/Eff with runtime/Extensions/Eff.Extensions.cs b/LanguageExt.Core/Effects/Eff/Eff with runtime/Extensions/Eff.Extensions.cs index 7e6a78961..f9c8d3c66 100644 --- a/LanguageExt.Core/Effects/Eff/Eff with runtime/Extensions/Eff.Extensions.cs +++ b/LanguageExt.Core/Effects/Eff/Eff with runtime/Extensions/Eff.Extensions.cs @@ -1,11 +1,8 @@ using System; -using System.Collections.Generic; using System.Diagnostics.Contracts; using System.Runtime.CompilerServices; using System.Threading.Tasks; -using LanguageExt.Common; using LanguageExt.Traits; -using static LanguageExt.Prelude; namespace LanguageExt; @@ -185,7 +182,7 @@ public static Eff SelectMany( this (K, A> First, K, B> Second) self, Func<(A First, B Second), K, C>> bind, Func<(A First, B Second), C, D> project) => - self.ZipIO().Bind(ab => bind(ab).Map(c => project(ab, c))).As(); + self.Zip().Bind(ab => bind(ab).Map(c => project(ab, c))).As(); /// /// Monadic bind and project with paired IO monads @@ -194,7 +191,7 @@ public static Eff SelectMany( this K, A> self, Func, B> First, K, C> Second)> bind, Func project) => - self.As().Bind(a => bind(a).ZipIO().Map(cd => project(a, cd))); + self.As().Bind(a => bind(a).Zip().Map(cd => project(a, cd))); /// /// Monadic bind and project with paired IO monads @@ -203,7 +200,7 @@ public static Eff SelectMany( this (K, A> First, K, B> Second, K, C> Third) self, Func<(A First, B Second, C Third), K, D>> bind, Func<(A First, B Second, C Third), D, E> project) => - self.ZipIO().Bind(ab => bind(ab).Map(c => project(ab, c))).As(); + self.Zip().Bind(ab => bind(ab).Map(c => project(ab, c))).As(); /// /// Monadic bind and project with paired IO monads @@ -212,5 +209,5 @@ public static Eff SelectMany( this K, A> self, Func, B> First, K, C> Second, K, D> Third)> bind, Func project) => - self.As().Bind(a => bind(a).ZipIO().Map(cd => project(a, cd))); + self.As().Bind(a => bind(a).Zip().Map(cd => project(a, cd))); } diff --git a/LanguageExt.Core/Effects/IO/IO.Extensions.cs b/LanguageExt.Core/Effects/IO/IO.Extensions.cs index 82d805bb3..7b46173a1 100644 --- a/LanguageExt.Core/Effects/IO/IO.Extensions.cs +++ b/LanguageExt.Core/Effects/IO/IO.Extensions.cs @@ -83,7 +83,7 @@ public static IO SelectMany(this K ma, Func> bind /// Result of the computation public static K LocalIO(this K ma) where M : Monad => - ma.MapIO(io => io.Local()); + M.LocalIO(ma); /// /// Make this IO computation run on the `SynchronizationContext` that was captured at the start @@ -94,7 +94,7 @@ public static K LocalIO(this K ma) [MethodImpl(MethodImplOptions.AggressiveInlining)] public static K PostIO(this K ma) where M : Monad => - ma.MapIO(io => io.Post()); + M.PostIO(ma); /// /// Await a forked operation @@ -103,7 +103,7 @@ public static K PostIO(this K ma) [MethodImpl(MethodImplOptions.AggressiveInlining)] public static K Await(this K> ma) where M : Monad => - ma.MapIO(io => io.Bind(f => f.Await)); + M.Await(ma); /// /// Queue this IO operation to run on the thread-pool. @@ -116,8 +116,7 @@ public static K Await(this K> ma) [MethodImpl(MethodImplOptions.AggressiveInlining)] public static K> ForkIO(this K ma, Option timeout = default) where M : Monad => - ma.MapIO(io => io.Fork(timeout)); - + M.ForkIO(ma, timeout); /// /// Queue this IO operation to run on the thread-pool. @@ -133,7 +132,6 @@ public static K>> ForkIO(this StreamT ma, Option ma.Run() .Map(oht => oht.Map(ht => ht.Item1)) .ForkIO(timeout); - /// /// Timeout operation if it takes too long @@ -142,7 +140,7 @@ public static K>> ForkIO(this StreamT ma, Option [MethodImpl(Opt.Default)] public static K TimeoutIO(this K ma, TimeSpan timeout) where M : Monad, MonadIO => - ma.MapIO(io => io.Timeout(timeout)); + M.TimeoutIO(ma, timeout); //////////////////////////////////////////////////////////////////////////////////////////////////////////////////// // @@ -156,9 +154,9 @@ public static K TimeoutIO(this K ma, TimeSpan timeout) /// [Pure] [MethodImpl(Opt.Default)] - public static K BracketIO(this K ma) + public static K BracketIO(this K ma) where M : Monad => - ma.MapIO(io => io.Bracket()); + M.BracketIO(ma); /// /// When acquiring, using, and releasing various resources, it can be quite convenient to write a function to manage @@ -173,7 +171,7 @@ public static K BracketIO( Func> Use, Func> Fin) where M : Monad => - acq.MapIO(io => io.Bracket(Use, Fin)); + M.BracketIO(acq, Use, Fin); /// /// When acquiring, using, and releasing various resources, it can be quite convenient to write a function to manage @@ -190,7 +188,7 @@ public static K BracketIO( Func> Catch, Func> Fin) where M : Monad => - acq.MapIO(io => io.Bracket(Use, Catch, Fin)); + M.BracketIO(acq, Use, Catch, Fin); //////////////////////////////////////////////////////////////////////////////////////////////////////////////////// // @@ -207,7 +205,7 @@ public static K BracketIO( /// The result of the last invocation public static K RepeatIO(this K ma) where M : Monad => - ma.MapIO(io => io.Repeat()); + M.RepeatIO(ma); /// /// Keeps repeating the computation, until the scheduler expires, or an error occurs @@ -222,7 +220,7 @@ public static K RepeatIO( this K ma, Schedule schedule) where M : Monad => - ma.MapIO(io => io.Repeat(schedule)); + M.RepeatIO(ma, schedule); /// /// Keeps repeating the computation until the predicate returns false, or an error occurs @@ -237,7 +235,7 @@ public static K RepeatWhileIO( this K ma, Func predicate) where M : Monad => - ma.MapIO(io => io.RepeatWhile(predicate)); + M.RepeatWhileIO(ma, predicate); /// /// Keeps repeating the computation, until the scheduler expires, or the predicate returns false, or an error occurs @@ -254,7 +252,7 @@ public static K RepeatWhileIO( Schedule schedule, Func predicate) where M : Monad => - ma.MapIO(io => io.RepeatWhile(schedule, predicate)); + M.RepeatWhileIO(ma, schedule, predicate); /// /// Keeps repeating the computation until the predicate returns true, or an error occurs @@ -269,7 +267,7 @@ public static K RepeatUntilIO( this K ma, Func predicate) where M : Monad => - ma.MapIO(io => io.RepeatUntil(predicate)); + M.RepeatUntilIO(ma, predicate); /// /// Keeps repeating the computation, until the scheduler expires, or the predicate returns true, or an error occurs @@ -286,7 +284,7 @@ public static K RepeatUntilIO( Schedule schedule, Func predicate) where M : Monad => - ma.MapIO(io => io.RepeatUntil(schedule, predicate)); + M.RepeatUntilIO(ma, schedule, predicate); //////////////////////////////////////////////////////////////////////////////////////////////////////////////////// // @@ -306,7 +304,7 @@ public static K RepeatUntilIO( /// public static K RetryIO(this K ma) where M : Monad => - ma.MapIO(io => io.Retry()); + M.RetryIO(ma); /// /// Retry if the IO computation fails @@ -323,7 +321,7 @@ public static K RetryIO( this K ma, Schedule schedule) where M : Monad => - ma.MapIO(io => io.Retry(schedule)); + M.RetryIO(ma, schedule); /// /// Retry if the IO computation fails @@ -341,7 +339,7 @@ public static K RetryWhileIO( this K ma, Func predicate) where M : Monad => - ma.MapIO(io => io.RetryWhile(predicate)); + M.RetryWhileIO(ma, predicate); /// /// Retry if the IO computation fails @@ -360,7 +358,7 @@ public static K RetryWhileIO( Schedule schedule, Func predicate) where M : Monad => - ma.MapIO(io => io.RetryWhile(schedule, predicate)); + M.RetryWhileIO(ma, schedule, predicate); /// /// Retry if the IO computation fails @@ -378,7 +376,7 @@ public static K RetryUntilIO( this K ma, Func predicate) where M : Monad => - ma.MapIO(io => io.RetryUntil(predicate)); + M.RetryUntilIO(ma, predicate); /// /// Retry if the IO computation fails @@ -397,7 +395,7 @@ public static K RetryUntilIO( Schedule schedule, Func predicate) where M : Monad => - ma.MapIO(io => io.RetryUntil(schedule, predicate)); + M.RetryUntilIO(ma, schedule, predicate); //////////////////////////////////////////////////////////////////////////////////////////////////////////////////// // @@ -410,14 +408,14 @@ public static K FoldIO( S initialState, Func folder) where M : Monad => - ma.MapIO(io => io.Fold(schedule, initialState, folder)); + M.FoldIO(ma, schedule, initialState, folder); public static K FoldIO( this K ma, S initialState, Func folder) where M : Monad => - ma.MapIO(io => io.Fold(initialState, folder)); + M.FoldIO(ma, initialState, folder); public static K FoldWhileIO( this K ma, @@ -426,7 +424,7 @@ public static K FoldWhileIO( Func folder, Func stateIs) where M : Monad => - ma.MapIO(io => io.FoldWhile(schedule, initialState, folder, stateIs)); + M.FoldWhileIO(ma, schedule, initialState, folder, stateIs); public static K FoldWhileIO( this K ma, @@ -434,7 +432,7 @@ public static K FoldWhileIO( Func folder, Func stateIs) where M : Monad => - ma.MapIO(io => io.FoldWhile(initialState, folder, stateIs)); + M.FoldWhileIO(ma, initialState, folder, stateIs); public static K FoldWhileIO( this K ma, @@ -443,7 +441,7 @@ public static K FoldWhileIO( Func folder, Func valueIs) where M : Monad => - ma.MapIO(io => io.FoldWhile(schedule, initialState, folder, valueIs)); + M.FoldWhileIO(ma, schedule, initialState, folder, valueIs); public static K FoldWhileIO( this K ma, @@ -451,7 +449,7 @@ public static K FoldWhileIO( Func folder, Func valueIs) where M : Monad => - ma.MapIO(io => io.FoldWhile(initialState, folder, valueIs)); + M.FoldWhileIO(ma, initialState, folder, valueIs); public static K FoldWhileIO( this K ma, @@ -460,7 +458,7 @@ public static K FoldWhileIO( Func folder, Func<(S State, A Value), bool> predicate) where M : Monad => - ma.MapIO(io => io.FoldWhile(schedule, initialState, folder, predicate)); + M.FoldWhileIO(ma, schedule, initialState, folder, predicate); public static K FoldWhileIO( this K ma, @@ -468,7 +466,7 @@ public static K FoldWhileIO( Func folder, Func<(S State, A Value), bool> predicate) where M : Monad => - ma.MapIO(io => io.FoldWhile(initialState, folder, predicate)); + M.FoldWhileIO(ma, initialState, folder, predicate); public static K FoldUntilIO( this K ma, @@ -477,7 +475,7 @@ public static K FoldUntilIO( Func folder, Func stateIs) where M : Monad => - ma.MapIO(io => io.FoldUntil(schedule, initialState, folder, stateIs)); + M.FoldUntilIO(ma, schedule, initialState, folder, stateIs); public static K FoldUntilIO( this K ma, @@ -485,7 +483,7 @@ public static K FoldUntilIO( Func folder, Func stateIs) where M : Monad => - ma.MapIO(io => io.FoldUntil(initialState, folder, stateIs)); + M.FoldUntilIO(ma, initialState, folder, stateIs); public static K FoldUntilIO( this K ma, @@ -494,7 +492,7 @@ public static K FoldUntilIO( Func folder, Func valueIs) where M : Monad => - ma.MapIO(io => io.FoldUntil(schedule, initialState, folder, valueIs)); + M.FoldUntilIO(ma, schedule, initialState, folder, valueIs); public static K FoldUntilIO( this K ma, @@ -502,7 +500,7 @@ public static K FoldUntilIO( Func folder, Func valueIs) where M : Monad => - ma.MapIO(io => io.FoldUntil(initialState, folder, valueIs)); + M.FoldUntilIO(ma, initialState, folder, valueIs); public static K FoldUntilIO( this K ma, @@ -510,7 +508,7 @@ public static K FoldUntilIO( Func folder, Func<(S State, A Value), bool> predicate) where M : Monad => - ma.MapIO(io => io.FoldUntil(initialState, folder, predicate)); + M.FoldUntilIO(ma, initialState, folder, predicate); public static K FoldUntilIO( this K ma, @@ -519,129 +517,5 @@ public static K FoldUntilIO( Func folder, Func<(S State, A Value), bool> predicate) where M : Monad => - ma.MapIO(io => io.FoldUntil(schedule, initialState, folder, predicate)); - - //////////////////////////////////////////////////////////////////////////////////////////////////////////////// - // - // Zipping - // - - /// - /// Takes two IO monads and zips their result - /// - /// - /// Asynchronous operations will run concurrently - /// - /// Tuple of IO monads to run - /// Monad trait type - /// First IO monad bound value type - /// Second IO monad bound value type - /// IO monad - public static K ZipIO( - this (K First, K Second) tuple) - where M : Monad => - fun((A a, B b) => (a, b)) - .Map(tuple.First) - .Apply(tuple.Second); - - /// - /// Takes two IO monads and zips their result - /// - /// - /// Asynchronous operations will run concurrently - /// - /// Tuple of IO monads to run - /// Monad trait type - /// First IO monad bound value type - /// Second IO monad bound value type - /// Third IO monad bound value type - /// IO monad - public static K ZipIO( - this (K First, - K Second, - K Third) tuple) - where M : Monad => - fun((A a, B b, C c) => (a, b, c)) - .Map(tuple.First) - .Apply(tuple.Second) - .Apply(tuple.Third); - - /// - /// Takes two IO monads and zips their result - /// - /// - /// Asynchronous operations will run concurrently - /// - /// Tuple of IO monads to run - /// Monad trait type - /// First IO monad bound value type - /// Second IO monad bound value type - /// Third IO monad bound value type - /// Fourth IO monad bound value type - /// IO monad - public static K ZipIO( - this (K First, - K Second, - K Third, - K Fourth) tuple) - where M : Monad => - fun((A a, B b, C c, D d) => (a, b, c, d)) - .Map(tuple.First) - .Apply(tuple.Second) - .Apply(tuple.Third) - .Apply(tuple.Fourth); - - /// - /// Takes two IO monads and zips their result - /// - /// - /// Asynchronous operations will run concurrently - /// - /// Monad trait type - /// First IO monad bound value type - /// Second IO monad bound value type - /// IO monad - public static K ZipIO( - this K First, - K Second) - where M : Monad => - (First, Second).ZipIO(); - - /// - /// Takes two IO monads and zips their result - /// - /// - /// Asynchronous operations will run concurrently - /// - /// Monad trait type - /// First IO monad bound value type - /// Second IO monad bound value type - /// Third IO monad bound value type - /// IO monad - public static K ZipIO( - this K First, - K Second, - K Third) - where M : Monad => - (First, Second, Third).ZipIO(); - - /// - /// Takes two IO monads and zips their result - /// - /// - /// Asynchronous operations will run concurrently - /// - /// Monad trait type - /// First IO monad bound value type - /// Second IO monad bound value type - /// Third IO monad bound value type - /// Fourth IO monad bound value type - /// IO monad - public static K ZipIO( - this K First, - K Second, - K Third, - K Fourth) - where M : Monad => - (First, Second, Third, Fourth).ZipIO(); + M.FoldUntilIO(ma, schedule, initialState, folder, predicate); } diff --git a/LanguageExt.Core/Effects/IO/IO.Module.cs b/LanguageExt.Core/Effects/IO/IO.Module.cs index c0a0e522c..7e7db7475 100644 --- a/LanguageExt.Core/Effects/IO/IO.Module.cs +++ b/LanguageExt.Core/Effects/IO/IO.Module.cs @@ -74,7 +74,7 @@ public static IO lift(Action f) => /// Result of the computation public static K local(K ma) where M : Monad => - ma.LocalIO(); + M.LocalIO(ma); /// /// Creates a local cancellation environment @@ -163,18 +163,6 @@ public static IO combine(K ma, K mb) => public static K mapIO(K ma, Func, IO> f) where M : Monad => M.MapIO(ma, f); - - /// - /// Queue this IO operation to run on the thread-pool. - /// - /// Maximum time that the forked IO operation can run for. `None` for no timeout. - /// Returns a `ForkIO` data-structure that contains two IO effects that can be used to either cancel - /// the forked IO operation or to await the result of it. - /// - [Pure] - [MethodImpl(Opt.Default)] - public static IO> fork(K ma, Option timeout = default) => - ma.As().Fork(timeout); /// /// Queue this IO operation to run on the thread-pool. @@ -187,7 +175,7 @@ public static IO> fork(K ma, Option timeout = defa [MethodImpl(Opt.Default)] public static K> fork(K ma, Option timeout = default) where M : Monad => - mapIO(ma, mio => fork(mio , timeout)); + M.ForkIO(ma, timeout); /// /// Yield the thread for the specified duration or until cancelled. @@ -199,7 +187,7 @@ public static K> fork(K ma, Option timeout = public static IO yieldFor(Duration duration) => Math.Abs(duration.Milliseconds) < 0.00000001 ? unitIO - : IO.LiftAsync(env => yieldFor(duration, env.Token)); + : IO.LiftAsync(e => yieldFor(duration, e.Token)); /// /// Yield the thread for the specified duration or until cancelled. @@ -211,7 +199,7 @@ public static IO yieldFor(Duration duration) => public static IO yieldFor(TimeSpan timeSpan) => Math.Abs(timeSpan.TotalMilliseconds) < 0.00000001 ? unitIO - : IO.LiftAsync(env => yieldFor(timeSpan, env.Token)); + : IO.LiftAsync(e => yieldFor(timeSpan, e.Token)); //////////////////////////////////////////////////////////////////////////////////////////////////////////////////// // diff --git a/LanguageExt.Core/Effects/IO/IO.Monad.cs b/LanguageExt.Core/Effects/IO/IO.Monad.cs index 508f00582..1ef80f6c6 100644 --- a/LanguageExt.Core/Effects/IO/IO.Monad.cs +++ b/LanguageExt.Core/Effects/IO/IO.Monad.cs @@ -59,4 +59,394 @@ static K MonadIO.MapIO(K ma, Func, IO> f) => static K Final.Finally(K fa, K @finally) => new IOFinal(fa, @finally, pure); + + + + + /// + /// Creates a local cancellation environment + /// + /// + /// A local cancellation environment stops other IO computations, that rely on the same + /// environmental cancellation token, from being taken down by a regional cancellation. + /// + /// If a `IO.cancel` is invoked locally then it will still create an exception that + /// propagates upwards and so catching cancellations is still important. + /// + /// Computation to run within the local context + /// Bound value + /// Result of the computation + static K MonadIO.LocalIO(K ma) => + ma.As().Local(); + + /// + /// Make this IO computation run on the `SynchronizationContext` that was captured at the start + /// of the IO chain (i.e. the one embedded within the `EnvIO` environment that is passed through + /// all IO computations) + /// + static K MonadIO.PostIO(K ma) => + ma.As().Post(); + + /// + /// Await a forked operation + /// + static K MonadIO.Await(K> ma) => + ma.As().Bind(f => f.Await); + + /// + /// Queue this IO operation to run on the thread-pool. + /// + /// Maximum time that the forked IO operation can run for. `None` for no timeout. + /// Returns a `ForkIO` data-structure that contains two IO effects that can be used to either cancel + /// the forked IO operation or to await the result of it. + /// + static K> MonadIO.ForkIO(K ma, Option timeout) => + ma.As().Fork(timeout); + + /// + /// Timeout operation if it takes too long + /// + static K MonadIO.TimeoutIO(K ma, TimeSpan timeout) => + ma.As().Timeout(timeout); + + /// + /// The IO monad tracks resources automatically, this creates a local resource environment + /// to run this computation in. Once the computation has completed any resources acquired + /// are automatically released. Imagine this as the ultimate `using` statement. + /// + static K MonadIO.BracketIO(K ma) => + ma.As().Bracket(); + + /// + /// When acquiring, using, and releasing various resources, it can be quite convenient to write a function to manage + /// the acquisition and releasing, taking a function of the acquired value that specifies an action to be performed + /// in between. + /// + /// Resource acquisition + /// Function to use the acquired resource + /// Function to invoke to release the resource + static K MonadIO.BracketIO( + K Acq, + Func> Use, + Func> Fin) => + Acq.As().Bracket(Use, Fin); + + /// + /// When acquiring, using, and releasing various resources, it can be quite convenient to write a function to manage + /// the acquisition and releasing, taking a function of the acquired value that specifies an action to be performed + /// in between. + /// + /// Resource acquisition + /// Function to use the acquired resource + /// Function to run to handle any exceptions + /// Function to invoke to release the resource + static K MonadIO.BracketIO( + K Acq, + Func> Use, + Func> Catch, + Func> Fin) => + Acq.As().Bracket(Use, Catch, Fin); + + //////////////////////////////////////////////////////////////////////////////////////////////////////////////////// + // + // Repeating the effect + // + + /// + /// Keeps repeating the computation forever, or until an error occurs + /// + /// + /// Any resources acquired within a repeated IO computation will automatically be released. This also means you can't + /// acquire resources and return them from within a repeated computation. + /// + /// The result of the last invocation + static K MonadIO.RepeatIO(K ma) => + ma.As().Repeat(); + + /// + /// Keeps repeating the computation, until the scheduler expires, or an error occurs + /// + /// + /// Any resources acquired within a repeated IO computation will automatically be released. This also means you can't + /// acquire resources and return them from within a repeated computation. + /// + /// Scheduler strategy for repeating + /// The result of the last invocation + static K MonadIO.RepeatIO( + K ma, + Schedule schedule) => + ma.As().Repeat(schedule); + + /// + /// Keeps repeating the computation until the predicate returns false, or an error occurs + /// + /// + /// Any resources acquired within a repeated IO computation will automatically be released. This also means you can't + /// acquire resources and return them from within a repeated computation. + /// + /// Keep repeating while this predicate returns `true` for each computed value + /// The result of the last invocation + static K MonadIO.RepeatWhileIO( + K ma, + Func predicate) => + ma.As().RepeatWhile(predicate); + + /// + /// Keeps repeating the computation, until the scheduler expires, or the predicate returns false, or an error occurs + /// + /// + /// Any resources acquired within a repeated IO computation will automatically be released. This also means you can't + /// acquire resources and return them from within a repeated computation. + /// + /// Scheduler strategy for repeating + /// Keep repeating while this predicate returns `true` for each computed value + /// The result of the last invocation + static K MonadIO.RepeatWhileIO( + K ma, + Schedule schedule, + Func predicate) => + ma.As().RepeatWhile(schedule, predicate); + + /// + /// Keeps repeating the computation until the predicate returns true, or an error occurs + /// + /// + /// Any resources acquired within a repeated IO computation will automatically be released. This also means you can't + /// acquire resources and return them from within a repeated computation. + /// + /// Keep repeating until this predicate returns `true` for each computed value + /// The result of the last invocation + static K MonadIO.RepeatUntilIO( + K ma, + Func predicate) => + ma.As().RepeatUntil(predicate); + + /// + /// Keeps repeating the computation, until the scheduler expires, or the predicate returns true, or an error occurs + /// + /// + /// Any resources acquired within a repeated IO computation will automatically be released. This also means you can't + /// acquire resources and return them from within a repeated computation. + /// + /// Scheduler strategy for repeating + /// Keep repeating until this predicate returns `true` for each computed value + /// The result of the last invocation + static K MonadIO.RepeatUntilIO( + K ma, + Schedule schedule, + Func predicate) => + ma.As().RepeatUntil(schedule, predicate); + + //////////////////////////////////////////////////////////////////////////////////////////////////////////////////// + // + // Retrying the effect when it fails + // + + /// + /// Retry if the IO computation fails + /// + /// + /// This variant will retry forever + /// + /// + /// Any resources acquired within a retrying IO computation will automatically be released *if* the operation fails. + /// So, successive retries will not grow the acquired resources on each retry iteration. Any successful operation that + /// acquires resources will have them tracked in the usual way. + /// + static K MonadIO.RetryIO(K ma) => + ma.As().Retry(); + + /// + /// Retry if the IO computation fails + /// + /// + /// This variant will retry until the schedule expires + /// + /// + /// Any resources acquired within a retrying IO computation will automatically be released *if* the operation fails. + /// So, successive retries will not grow the acquired resources on each retry iteration. Any successful operation that + /// acquires resources will have them tracked in the usual way. + /// + static K MonadIO.RetryIO( + K ma, + Schedule schedule) => + ma.As().Retry(schedule); + + /// + /// Retry if the IO computation fails + /// + /// + /// This variant will keep retrying whilst the predicate returns `true` for the error generated at each iteration; + /// at which point the last raised error will be thrown. + /// + /// + /// Any resources acquired within a retrying IO computation will automatically be released *if* the operation fails. + /// So, successive retries will not grow the acquired resources on each retry iteration. Any successful operation that + /// acquires resources will have them tracked in the usual way. + /// + static K MonadIO.RetryWhileIO( + K ma, + Func predicate) => + ma.As().RetryWhile(predicate); + + /// + /// Retry if the IO computation fails + /// + /// + /// This variant will keep retrying whilst the predicate returns `true` for the error generated at each iteration; + /// or, until the schedule expires; at which point the last raised error will be thrown. + /// + /// + /// Any resources acquired within a retrying IO computation will automatically be released *if* the operation fails. + /// So, successive retries will not grow the acquired resources on each retry iteration. Any successful operation that + /// acquires resources will have them tracked in the usual way. + /// + static K MonadIO.RetryWhileIO( + K ma, + Schedule schedule, + Func predicate) => + ma.As().RetryWhile(schedule, predicate); + + /// + /// Retry if the IO computation fails + /// + /// + /// This variant will keep retrying until the predicate returns `true` for the error generated at each iteration; + /// at which point the last raised error will be thrown. + /// + /// + /// Any resources acquired within a retrying IO computation will automatically be released *if* the operation fails. + /// So, successive retries will not grow the acquired resources on each retry iteration. Any successful operation that + /// acquires resources will have them tracked in the usual way. + /// + static K MonadIO.RetryUntilIO( + K ma, + Func predicate) => + ma.As().RetryUntil(predicate); + + /// + /// Retry if the IO computation fails + /// + /// + /// This variant will keep retrying until the predicate returns `true` for the error generated at each iteration; + /// or, until the schedule expires; at which point the last raised error will be thrown. + /// + /// + /// Any resources acquired within a retrying IO computation will automatically be released *if* the operation fails. + /// So, successive retries will not grow the acquired resources on each retry iteration. Any successful operation that + /// acquires resources will have them tracked in the usual way. + /// + static K MonadIO.RetryUntilIO( + K ma, + Schedule schedule, + Func predicate) => + ma.As().RetryUntil(schedule, predicate); + + //////////////////////////////////////////////////////////////////////////////////////////////////////////////////// + // + // Folding + // + + static K MonadIO.FoldIO( + K ma, + Schedule schedule, + S initialState, + Func folder) => + ma.As().Fold(schedule, initialState, folder); + + static K MonadIO.FoldIO( + K ma, + S initialState, + Func folder) => + ma.As().Fold(initialState, folder); + + static K MonadIO.FoldWhileIO( + K ma, + Schedule schedule, + S initialState, + Func folder, + Func stateIs) => + ma.As().FoldWhile(schedule, initialState, folder, stateIs); + + static K MonadIO.FoldWhileIO( + K ma, + S initialState, + Func folder, + Func stateIs) => + ma.As().FoldWhile(initialState, folder, stateIs); + + static K MonadIO.FoldWhileIO( + K ma, + Schedule schedule, + S initialState, + Func folder, + Func valueIs) => + ma.As().FoldWhile(schedule, initialState, folder, valueIs); + + static K MonadIO.FoldWhileIO( + K ma, + S initialState, + Func folder, + Func valueIs) => + ma.As().FoldWhile(initialState, folder, valueIs); + + static K MonadIO.FoldWhileIO( + K ma, + Schedule schedule, + S initialState, + Func folder, + Func<(S State, A Value), bool> predicate) => + ma.As().FoldWhile(schedule, initialState, folder, predicate); + + static K MonadIO.FoldWhileIO( + K ma, + S initialState, + Func folder, + Func<(S State, A Value), bool> predicate) => + ma.As().FoldWhile(initialState, folder, predicate); + + static K MonadIO.FoldUntilIO( + K ma, + Schedule schedule, + S initialState, + Func folder, + Func stateIs) => + ma.As().FoldUntil(schedule, initialState, folder, stateIs); + + static K MonadIO.FoldUntilIO( + K ma, + S initialState, + Func folder, + Func stateIs) => + ma.As().FoldUntil(initialState, folder, stateIs); + + static K MonadIO.FoldUntilIO( + K ma, + Schedule schedule, + S initialState, + Func folder, + Func valueIs) => + ma.As().FoldUntil(schedule, initialState, folder, valueIs); + + static K MonadIO.FoldUntilIO( + K ma, + S initialState, + Func folder, + Func valueIs) => + ma.As().FoldUntil(initialState, folder, valueIs); + + static K MonadIO.FoldUntilIO( + K ma, + S initialState, + Func folder, + Func<(S State, A Value), bool> predicate) => + ma.As().FoldUntil(initialState, folder, predicate); + + static K MonadIO.FoldUntilIO( + K ma, + Schedule schedule, + S initialState, + Func folder, + Func<(S State, A Value), bool> predicate) => + ma.As().FoldUntil(schedule, initialState, folder, predicate); } diff --git a/LanguageExt.Core/Effects/IO/IO.Prelude.cs b/LanguageExt.Core/Effects/IO/IO.Prelude.cs index 19eb59bda..23028acf4 100644 --- a/LanguageExt.Core/Effects/IO/IO.Prelude.cs +++ b/LanguageExt.Core/Effects/IO/IO.Prelude.cs @@ -72,29 +72,7 @@ public static K tailIO(K ma) [MethodImpl(MethodImplOptions.AggressiveInlining)] public static K postIO(K ma) where M : Monad => - ma.PostIO(); - - /// - /// Make this IO computation run on the `SynchronizationContext` that was captured at the start - /// of the IO chain (i.e. the one embedded within the `EnvIO` environment that is passed through - /// all IO computations) - /// - [Pure] - [MethodImpl(MethodImplOptions.AggressiveInlining)] - public static K post(K ma) => - ma.As().PostIO(); - - /// - /// Queue this IO operation to run on the thread-pool. - /// - /// Maximum time that the forked IO operation can run for. `None` for no timeout. - /// Returns a `ForkIO` data-structure that contains two IO effects that can be used to either cancel - /// the forked IO operation or to await the result of it. - /// - [Pure] - [MethodImpl(Opt.Default)] - public static IO> fork(K ma, Option timeout = default) => - ma.ForkIO(timeout).As(); + M.PostIO(ma); /// /// Queue this IO operation to run on the thread-pool. @@ -107,7 +85,7 @@ public static IO> fork(K ma, Option timeout = defa [MethodImpl(Opt.Default)] public static K> fork(K ma, Option timeout = default) where M : Monad => - ma.ForkIO(timeout); + M.ForkIO(ma, timeout); /// /// Queue this IO operation to run on the thread-pool. @@ -150,19 +128,7 @@ public static IO>> fork(StreamT ma, Option [MethodImpl(Opt.Default)] public static K awaitIO(K> ma) where M : Monad => - ma.Await(); - - /// - /// Queue this IO operation to run on the thread-pool. - /// - /// Maximum time that the forked IO operation can run for. `None` for no timeout. - /// Returns a `ForkIO` data-structure that contains two IO effects that can be used to either cancel - /// the forked IO operation or to await the result of it. - /// - [Pure] - [MethodImpl(Opt.Default)] - public static IO awaitIO(K> ma) => - ma.Await().As(); + M.Await(ma); /// /// Yield the thread for the specified duration or until cancelled. @@ -193,22 +159,6 @@ public static K> awaitAll(params K[] ms) where M : Monad => awaitAll(ms.ToSeqUnsafe()); - /// - /// Awaits all operations - /// - /// Operations to await - /// Sequence of results - public static IO> awaitAll(params K[] ms) => - awaitAll(ms.ToSeqUnsafe()); - - /// - /// Awaits all operations - /// - /// Operations to await - /// Sequence of results - public static IO> awaitAll(params IO[] ms) => - awaitAll(ms.ToSeqUnsafe()); - /// /// Awaits all forks /// @@ -218,14 +168,6 @@ public static K> awaitAll(params K>[] forks) where M : Monad => awaitAll(forks.ToSeqUnsafe()); - /// - /// Awaits all - /// - /// IO operations to await - /// Sequence of results - public static IO> awaitAll(params IO>[] forks) => - awaitAll(forks.ToSeqUnsafe()); - /// /// Awaits all /// @@ -243,14 +185,6 @@ public static K> awaitAll(Seq> ms) where M : Monad => ms.Traverse(f => f.ToIO()) .Bind(awaitAll); - - /// - /// Awaits all operations - /// - /// Operations to await - /// Sequence of results - public static IO> awaitAll(Seq> ms) => - awaitAll(ms.Map(f => f.As())); /// /// Awaits all operations diff --git a/LanguageExt.Core/Immutable Collections/Prelude.Collections.cs b/LanguageExt.Core/Immutable Collections/Prelude.Collections.cs index 45bcd52d1..6b565e6bf 100644 --- a/LanguageExt.Core/Immutable Collections/Prelude.Collections.cs +++ b/LanguageExt.Core/Immutable Collections/Prelude.Collections.cs @@ -192,6 +192,22 @@ public static Arr Sort(this Arr xs) where OrdA : Ord => public static A[] Sort(this A[] xs) where OrdA : Ord => xs.OrderBy(identity, OrdComparer.Default).ToArray(); + /// + /// Forever sequence of units + /// + [Pure] + public static Iterable Units + { + get + { + return Go().AsIterable(); + IEnumerable Go() + { + while (true) yield return default; + } + } + } + /// /// Lazy sequence of natural numbers up to Int32.MaxValue /// diff --git a/LanguageExt.Core/Traits/Monads/MonadIO/MonadIO.Trait.cs b/LanguageExt.Core/Traits/Monads/MonadIO/MonadIO.Trait.cs index adc90cbef..deab4a6af 100644 --- a/LanguageExt.Core/Traits/Monads/MonadIO/MonadIO.Trait.cs +++ b/LanguageExt.Core/Traits/Monads/MonadIO/MonadIO.Trait.cs @@ -53,7 +53,7 @@ public static virtual K LiftIO(IO ma) => /// Extract the IO monad from within the M monad (usually as part of a monad-transformer stack). /// public static virtual K> ToIO(K ma) => - throw new ExceptionalException(Errors.UnliftIONotSupported); + throw new ExceptionalException(Errors.ToIONotSupported); /// /// Extract the IO monad from within the `M` monad (usually as part of a monad-transformer stack). Then perform @@ -61,4 +61,391 @@ public static virtual K> ToIO(K ma) => /// public static virtual K MapIO(K ma, Func, IO> f) => M.ToIO(ma).Bind(io => M.LiftIO(f(io))); + + /// + /// Creates a local cancellation environment + /// + /// + /// A local cancellation environment stops other IO computations, that rely on the same + /// environmental cancellation token, from being taken down by a regional cancellation. + /// + /// If a `IO.cancel` is invoked locally then it will still create an exception that + /// propagates upwards and so catching cancellations is still important. + /// + /// Computation to run within the local context + /// Bound value + /// Result of the computation + public static virtual K LocalIO(K ma) => + ma.MapIO(io => io.Local()); + + /// + /// Make this IO computation run on the `SynchronizationContext` that was captured at the start + /// of the IO chain (i.e. the one embedded within the `EnvIO` environment that is passed through + /// all IO computations) + /// + public static virtual K PostIO(K ma) => + ma.MapIO(io => io.Post()); + + /// + /// Await a forked operation + /// + public static virtual K Await(K> ma) => + ma.MapIO(io => io.Bind(f => f.Await)); + + /// + /// Queue this IO operation to run on the thread-pool. + /// + /// Maximum time that the forked IO operation can run for. `None` for no timeout. + /// Returns a `ForkIO` data-structure that contains two IO effects that can be used to either cancel + /// the forked IO operation or to await the result of it. + /// + public static virtual K> ForkIO(K ma, Option timeout = default) => + ma.MapIO(io => io.Fork(timeout)); + + /// + /// Timeout operation if it takes too long + /// + public static virtual K TimeoutIO(K ma, TimeSpan timeout) => + ma.MapIO(io => io.Timeout(timeout)); + + /// + /// The IO monad tracks resources automatically, this creates a local resource environment + /// to run this computation in. Once the computation has completed any resources acquired + /// are automatically released. Imagine this as the ultimate `using` statement. + /// + public static virtual K BracketIO(K ma) => + ma.MapIO(io => io.Bracket()); + + /// + /// When acquiring, using, and releasing various resources, it can be quite convenient to write a function to manage + /// the acquisition and releasing, taking a function of the acquired value that specifies an action to be performed + /// in between. + /// + /// Resource acquisition + /// Function to use the acquired resource + /// Function to invoke to release the resource + public static virtual K BracketIO( + K Acq, + Func> Use, + Func> Fin) => + Acq.MapIO(io => io.Bracket(Use, Fin)); + + /// + /// When acquiring, using, and releasing various resources, it can be quite convenient to write a function to manage + /// the acquisition and releasing, taking a function of the acquired value that specifies an action to be performed + /// in between. + /// + /// Resource acquisition + /// Function to use the acquired resource + /// Function to run to handle any exceptions + /// Function to invoke to release the resource + public static virtual K BracketIO( + K Acq, + Func> Use, + Func> Catch, + Func> Fin) => + Acq.MapIO(io => io.Bracket(Use, Catch, Fin)); + + //////////////////////////////////////////////////////////////////////////////////////////////////////////////////// + // + // Repeating the effect + // + + /// + /// Keeps repeating the computation forever, or until an error occurs + /// + /// + /// Any resources acquired within a repeated IO computation will automatically be released. This also means you can't + /// acquire resources and return them from within a repeated computation. + /// + /// The result of the last invocation + public static virtual K RepeatIO(K ma) => + ma.MapIO(io => io.Repeat()); + + /// + /// Keeps repeating the computation, until the scheduler expires, or an error occurs + /// + /// + /// Any resources acquired within a repeated IO computation will automatically be released. This also means you can't + /// acquire resources and return them from within a repeated computation. + /// + /// Scheduler strategy for repeating + /// The result of the last invocation + public static virtual K RepeatIO( + K ma, + Schedule schedule) => + ma.MapIO(io => io.Repeat(schedule)); + + /// + /// Keeps repeating the computation until the predicate returns false, or an error occurs + /// + /// + /// Any resources acquired within a repeated IO computation will automatically be released. This also means you can't + /// acquire resources and return them from within a repeated computation. + /// + /// Keep repeating while this predicate returns `true` for each computed value + /// The result of the last invocation + public static virtual K RepeatWhileIO( + K ma, + Func predicate) => + ma.MapIO(io => io.RepeatWhile(predicate)); + + /// + /// Keeps repeating the computation, until the scheduler expires, or the predicate returns false, or an error occurs + /// + /// + /// Any resources acquired within a repeated IO computation will automatically be released. This also means you can't + /// acquire resources and return them from within a repeated computation. + /// + /// Scheduler strategy for repeating + /// Keep repeating while this predicate returns `true` for each computed value + /// The result of the last invocation + public static virtual K RepeatWhileIO( + K ma, + Schedule schedule, + Func predicate) => + ma.MapIO(io => io.RepeatWhile(schedule, predicate)); + + /// + /// Keeps repeating the computation until the predicate returns true, or an error occurs + /// + /// + /// Any resources acquired within a repeated IO computation will automatically be released. This also means you can't + /// acquire resources and return them from within a repeated computation. + /// + /// Keep repeating until this predicate returns `true` for each computed value + /// The result of the last invocation + public static virtual K RepeatUntilIO( + K ma, + Func predicate) => + ma.MapIO(io => io.RepeatUntil(predicate)); + + /// + /// Keeps repeating the computation, until the scheduler expires, or the predicate returns true, or an error occurs + /// + /// + /// Any resources acquired within a repeated IO computation will automatically be released. This also means you can't + /// acquire resources and return them from within a repeated computation. + /// + /// Scheduler strategy for repeating + /// Keep repeating until this predicate returns `true` for each computed value + /// The result of the last invocation + public static virtual K RepeatUntilIO( + K ma, + Schedule schedule, + Func predicate) => + ma.MapIO(io => io.RepeatUntil(schedule, predicate)); + + //////////////////////////////////////////////////////////////////////////////////////////////////////////////////// + // + // Retrying the effect when it fails + // + + /// + /// Retry if the IO computation fails + /// + /// + /// This variant will retry forever + /// + /// + /// Any resources acquired within a retrying IO computation will automatically be released *if* the operation fails. + /// So, successive retries will not grow the acquired resources on each retry iteration. Any successful operation that + /// acquires resources will have them tracked in the usual way. + /// + public static virtual K RetryIO(K ma) => + ma.MapIO(io => io.Retry()); + + /// + /// Retry if the IO computation fails + /// + /// + /// This variant will retry until the schedule expires + /// + /// + /// Any resources acquired within a retrying IO computation will automatically be released *if* the operation fails. + /// So, successive retries will not grow the acquired resources on each retry iteration. Any successful operation that + /// acquires resources will have them tracked in the usual way. + /// + public static virtual K RetryIO( + K ma, + Schedule schedule) => + ma.MapIO(io => io.Retry(schedule)); + + /// + /// Retry if the IO computation fails + /// + /// + /// This variant will keep retrying whilst the predicate returns `true` for the error generated at each iteration; + /// at which point the last raised error will be thrown. + /// + /// + /// Any resources acquired within a retrying IO computation will automatically be released *if* the operation fails. + /// So, successive retries will not grow the acquired resources on each retry iteration. Any successful operation that + /// acquires resources will have them tracked in the usual way. + /// + public static virtual K RetryWhileIO( + K ma, + Func predicate) => + ma.MapIO(io => io.RetryWhile(predicate)); + + /// + /// Retry if the IO computation fails + /// + /// + /// This variant will keep retrying whilst the predicate returns `true` for the error generated at each iteration; + /// or, until the schedule expires; at which point the last raised error will be thrown. + /// + /// + /// Any resources acquired within a retrying IO computation will automatically be released *if* the operation fails. + /// So, successive retries will not grow the acquired resources on each retry iteration. Any successful operation that + /// acquires resources will have them tracked in the usual way. + /// + public static virtual K RetryWhileIO( + K ma, + Schedule schedule, + Func predicate) => + ma.MapIO(io => io.RetryWhile(schedule, predicate)); + + /// + /// Retry if the IO computation fails + /// + /// + /// This variant will keep retrying until the predicate returns `true` for the error generated at each iteration; + /// at which point the last raised error will be thrown. + /// + /// + /// Any resources acquired within a retrying IO computation will automatically be released *if* the operation fails. + /// So, successive retries will not grow the acquired resources on each retry iteration. Any successful operation that + /// acquires resources will have them tracked in the usual way. + /// + public static virtual K RetryUntilIO( + K ma, + Func predicate) => + ma.MapIO(io => io.RetryUntil(predicate)); + + /// + /// Retry if the IO computation fails + /// + /// + /// This variant will keep retrying until the predicate returns `true` for the error generated at each iteration; + /// or, until the schedule expires; at which point the last raised error will be thrown. + /// + /// + /// Any resources acquired within a retrying IO computation will automatically be released *if* the operation fails. + /// So, successive retries will not grow the acquired resources on each retry iteration. Any successful operation that + /// acquires resources will have them tracked in the usual way. + /// + public static virtual K RetryUntilIO( + K ma, + Schedule schedule, + Func predicate) => + ma.MapIO(io => io.RetryUntil(schedule, predicate)); + + //////////////////////////////////////////////////////////////////////////////////////////////////////////////////// + // + // Folding + // + + public static virtual K FoldIO( + K ma, + Schedule schedule, + S initialState, + Func folder) => + ma.MapIO(io => io.Fold(schedule, initialState, folder)); + + public static virtual K FoldIO( + K ma, + S initialState, + Func folder) => + ma.MapIO(io => io.Fold(initialState, folder)); + + public static virtual K FoldWhileIO( + K ma, + Schedule schedule, + S initialState, + Func folder, + Func stateIs) => + ma.MapIO(io => io.FoldWhile(schedule, initialState, folder, stateIs)); + + public static virtual K FoldWhileIO( + K ma, + S initialState, + Func folder, + Func stateIs) => + ma.MapIO(io => io.FoldWhile(initialState, folder, stateIs)); + + public static virtual K FoldWhileIO( + K ma, + Schedule schedule, + S initialState, + Func folder, + Func valueIs) => + ma.MapIO(io => io.FoldWhile(schedule, initialState, folder, valueIs)); + + public static virtual K FoldWhileIO( + K ma, + S initialState, + Func folder, + Func valueIs) => + ma.MapIO(io => io.FoldWhile(initialState, folder, valueIs)); + + public static virtual K FoldWhileIO( + K ma, + Schedule schedule, + S initialState, + Func folder, + Func<(S State, A Value), bool> predicate) => + ma.MapIO(io => io.FoldWhile(schedule, initialState, folder, predicate)); + + public static virtual K FoldWhileIO( + K ma, + S initialState, + Func folder, + Func<(S State, A Value), bool> predicate) => + ma.MapIO(io => io.FoldWhile(initialState, folder, predicate)); + + public static virtual K FoldUntilIO( + K ma, + Schedule schedule, + S initialState, + Func folder, + Func stateIs) => + ma.MapIO(io => io.FoldUntil(schedule, initialState, folder, stateIs)); + + public static virtual K FoldUntilIO( + K ma, + S initialState, + Func folder, + Func stateIs) => + ma.MapIO(io => io.FoldUntil(initialState, folder, stateIs)); + + public static virtual K FoldUntilIO( + K ma, + Schedule schedule, + S initialState, + Func folder, + Func valueIs) => + ma.MapIO(io => io.FoldUntil(schedule, initialState, folder, valueIs)); + + public static virtual K FoldUntilIO( + K ma, + S initialState, + Func folder, + Func valueIs) => + ma.MapIO(io => io.FoldUntil(initialState, folder, valueIs)); + + public static virtual K FoldUntilIO( + K ma, + S initialState, + Func folder, + Func<(S State, A Value), bool> predicate) => + ma.MapIO(io => io.FoldUntil(initialState, folder, predicate)); + + public static virtual K FoldUntilIO( + K ma, + Schedule schedule, + S initialState, + Func folder, + Func<(S State, A Value), bool> predicate) => + ma.MapIO(io => io.FoldUntil(schedule, initialState, folder, predicate)); } diff --git a/LanguageExt.Pipes/Concurrent/Inbox/Inbox.DSL.cs b/LanguageExt.Pipes/Concurrent/Inbox/Inbox.DSL.cs index 632b3e531..2748c40d0 100644 --- a/LanguageExt.Pipes/Concurrent/Inbox/Inbox.DSL.cs +++ b/LanguageExt.Pipes/Concurrent/Inbox/Inbox.DSL.cs @@ -5,14 +5,26 @@ namespace LanguageExt.Pipes.Concurrent; -record InboxWriter(ChannelWriter Writer) : Inbox +record InboxWriter(ChannelWriter Writer, string Label) : Inbox { public override Inbox ContraMap(Func f) => new InboxContraMap(f, this); public override IO Post(A value) => - from f in IO.liftVAsync(e => Writer.WaitToWriteAsync(e.Token)) - from r in f ? IO.liftVAsync(() => Writer.WriteAsync(value).ToUnit()) + from f in IO.liftVAsync(async e => + { + Console.WriteLine($"Post({value}) to {Label} - Pre: WaitToWriteAsync"); + var r = await Writer.WaitToWriteAsync(e.Token); + Console.WriteLine($"Post({value}) '{r}' to {Label} - Post: WaitToWriteAsync"); + return r; + }) + from r in f ? IO.liftVAsync(async () => + { + Console.WriteLine($"Post({value}) to {Label} - Pre: WriteAsync"); + var r = await Writer.WriteAsync(value).ToUnit(); + Console.WriteLine($"Post({value}) to {Label} - Post: WriteAsync"); + return r; + }) : IO.fail(Errors.NoSpaceInInbox) select r; @@ -90,7 +102,7 @@ public override IO Post(A value) => { [] => unitIO, _ => IO.fail(Error.Many(es)) - }) + }).As() }; public override IO Complete() => diff --git a/LanguageExt.Pipes/Concurrent/Inbox/Inbox.cs b/LanguageExt.Pipes/Concurrent/Inbox/Inbox.cs index de61d7316..19e22d68e 100644 --- a/LanguageExt.Pipes/Concurrent/Inbox/Inbox.cs +++ b/LanguageExt.Pipes/Concurrent/Inbox/Inbox.cs @@ -83,9 +83,7 @@ public Inbox Combine(Func f, Inbox rhs) => /// `ConsumerT` public ConsumerT ToConsumerT() where M : Monad => - ConsumerT.awaiting() - .Bind(Post) - .Bind(_ => ToConsumerT()); + ConsumerT.repeat(ConsumerT.awaiting().Bind(Post)); /// /// Predicate function + /// Effect runtime type /// Stream value to consume and produce /// Pipe public static Pipe filter(Func f) => @@ -60,6 +85,7 @@ public static Pipe filter(Func f) => /// Create a pipe from a mapping function /// /// Convert the `Inbox` to a `Consumer` pipe component diff --git a/LanguageExt.Pipes/Concurrent/Mailbox/Mailbox.Module.cs b/LanguageExt.Pipes/Concurrent/Mailbox/Mailbox.Module.cs index 9d88b2928..937245e4c 100644 --- a/LanguageExt.Pipes/Concurrent/Mailbox/Mailbox.Module.cs +++ b/LanguageExt.Pipes/Concurrent/Mailbox/Mailbox.Module.cs @@ -7,19 +7,21 @@ public static class Mailbox /// /// Create a new unbounded mailbox /// + /// Label for debugging purposes /// Value type /// Constructed mailbox with an `Inbox` and an `Outbox` - public static Mailbox spawn() => - spawn(Buffer.Unbounded); + public static Mailbox spawn(string label = "[unlabeled]") => + spawn(Buffer.Unbounded, label); /// /// Create a new mailbox with the buffer settings provided /// /// Buffer settings + /// Label for debugging purposes /// Value type /// Constructed mailbox with an `Inbox` and an `Outbox` /// Thrown for invalid buffer settings - public static Mailbox spawn(Buffer buffer) + public static Mailbox spawn(Buffer buffer, string label = "[unlabeled]") { Ch.Channel channel; switch (buffer) @@ -68,6 +70,6 @@ public static Mailbox spawn(Buffer buffer) throw new NotSupportedException(); } - return new Mailbox(new InboxWriter(channel.Writer), new OutboxReader(channel.Reader)); + return new Mailbox(new InboxWriter(channel.Writer, label), new OutboxReader(channel.Reader, label)); } } diff --git a/LanguageExt.Pipes/Concurrent/Outbox/Outbox.DSL.cs b/LanguageExt.Pipes/Concurrent/Outbox/Outbox.DSL.cs index a0a2801e2..47cb45ace 100644 --- a/LanguageExt.Pipes/Concurrent/Outbox/Outbox.DSL.cs +++ b/LanguageExt.Pipes/Concurrent/Outbox/Outbox.DSL.cs @@ -44,7 +44,7 @@ internal override ValueTask ReadyToRead(CancellationToken token) => new(false); } -record OutboxReader(ChannelReader Reader) : Outbox +record OutboxReader(ChannelReader Reader, string Label) : Outbox { public override Outbox Map(Func f) => new OutboxMap(this, f); @@ -56,8 +56,20 @@ public override Outbox ApplyBack(Outbox> ff) => new OutboxApply(this, ff); public override IO Read() => - IO.liftVAsync(e => Reader.WaitToReadAsync(e.Token)) - .Bind(f => f ? IO.liftVAsync(e => Reader.ReadAsync(e.Token)) + IO.liftVAsync(async e => + { + Console.WriteLine($"Read from {Label} - Pre: WaitToReadAsync"); + var r = await Reader.WaitToReadAsync(e.Token); + Console.WriteLine($"Read '{r}' from {Label} - Post: WaitToReadAsync"); + return r; + }) + .Bind(f => f ? IO.liftVAsync(async e => + { + Console.WriteLine($"Read from {Label} - Pre: ReadAsync"); + var r = await Reader.ReadAsync(e.Token); + Console.WriteLine($"Read '{r}' from {Label} - Post: ReadAsync"); + return r; + }) : IO.fail(Errors.OutboxChannelClosed)); internal override ValueTask ReadyToRead(CancellationToken token) => diff --git a/LanguageExt.Pipes/Concurrent/Outbox/Outbox.cs b/LanguageExt.Pipes/Concurrent/Outbox/Outbox.cs index d43d72f6b..ebfd574e4 100644 --- a/LanguageExt.Pipes/Concurrent/Outbox/Outbox.cs +++ b/LanguageExt.Pipes/Concurrent/Outbox/Outbox.cs @@ -80,8 +80,7 @@ public Outbox Bind(Func> f) => /// `ProducerT` public ProducerT ToProducerT() where M : Monad => - Read().Bind(ProducerT.yield) - .Bind(_ => ToProducerT()); + PipeT.yieldRepeatIO(Read()); /// + /// Effect runtime type /// Stream value to consume - /// Lifted monad type /// Bound value type /// public static Consumer liftT(ValueTask> f) => @@ -101,8 +101,8 @@ public static Consumer liftT(ValueTask /// /// Create a consumer that simply returns the bound value of the lifted monad without yielding anything /// + /// Effect runtime type /// Stream value to consume - /// Lifted monad type /// Bound value type /// public static Consumer liftM(K, A> ma) => @@ -111,8 +111,8 @@ public static Consumer liftM(K, A> ma) => /// /// Create a consumer that simply returns the bound value of the lifted monad without yielding anything /// + /// Effect runtime type /// Stream value to consume - /// Lifted monad type /// Bound value type /// public static Consumer liftIO(IO ma) => @@ -121,8 +121,8 @@ public static Consumer liftIO(IO ma) => /// /// Continually repeat the provided operation /// + /// Effect runtime type /// Stream value to consume - /// Lifted monad type /// Bound value type /// public static Consumer repeat(Consumer ma) => @@ -131,8 +131,8 @@ public static Consumer repeat(Consumer ma) => /// /// Repeat the provided operation based on the schedule provided /// + /// Effect runtime type /// Stream value to consume - /// Lifted monad type /// Bound value type /// public static Consumer repeat(Schedule schedule, Consumer ma) => @@ -141,8 +141,8 @@ public static Consumer repeat(Schedule schedule, Consumer< /// /// Continually lift & repeat the provided operation /// + /// Effect runtime type /// Stream value to consume - /// Lifted monad type /// Bound value type /// public static Consumer repeatM(K, A> ma) => @@ -151,8 +151,8 @@ public static Consumer repeatM(K, A> ma) => /// /// Repeat the provided operation based on the schedule provided /// + /// Effect runtime type /// Stream value to consume - /// Lifted monad type /// Bound value type /// public static Consumer repeatM(Schedule schedule, K, A> ma) => diff --git a/LanguageExt.Pipes/Effect/Effect.Module.cs b/LanguageExt.Pipes/Effect/Effect.Module.cs index 423237c92..22ae47ded 100644 --- a/LanguageExt.Pipes/Effect/Effect.Module.cs +++ b/LanguageExt.Pipes/Effect/Effect.Module.cs @@ -13,6 +13,7 @@ public static class Effect /// /// Create an effect that simply returns a bound value without yielding anything /// + /// Effect runtime type /// Bound value type /// public static Effect pure(A value) => @@ -21,6 +22,7 @@ public static Effect pure(A value) => /// /// Create an effect that always fails /// + /// Effect runtime type /// Bound value type /// public static Effect error(Error value) => @@ -29,6 +31,7 @@ public static Effect error(Error value) => /// /// Create an effect that yields nothing at all /// + /// Effect runtime type /// Bound value type /// public static Effect empty() => @@ -37,6 +40,7 @@ public static Effect empty() => /// /// Create an effect that lazily returns a bound value without yielding anything /// + /// Effect runtime type /// Bound value type /// public static Effect lift(Func f) => @@ -45,6 +49,7 @@ public static Effect lift(Func f) => /// /// Create an effect that simply returns the bound value of the lifted monad without yielding anything /// + /// Effect runtime type /// Bound value type /// public static Effect liftM(K, A> ma) => @@ -53,6 +58,7 @@ public static Effect liftM(K, A> ma) => /// /// Create an effect that simply returns the bound value of the lifted monad without yielding anything /// + /// Effect runtime type /// Bound value type /// public static Effect liftIO(IO ma) => @@ -61,6 +67,7 @@ public static Effect liftIO(IO ma) => /// /// Create a lazy proxy /// + /// Effect runtime type /// Bound value type /// public static Effect liftT(Func> f) => @@ -69,6 +76,7 @@ public static Effect liftT(Func> f) => /// /// Create an asynchronous lazy proxy /// + /// Effect runtime type /// Bound value type /// public static Effect liftT(Func>> f) => @@ -77,6 +85,7 @@ public static Effect liftT(Func>> f) => /// /// Create an asynchronous proxy /// + /// Effect runtime type /// Bound value type /// public static Effect liftT(ValueTask> f) => @@ -85,6 +94,7 @@ public static Effect liftT(ValueTask> f) => /// /// Continually repeat the provided operation /// + /// Effect runtime type /// Bound value type /// public static Effect repeat(Effect ma) => @@ -93,6 +103,7 @@ public static Effect repeat(Effect ma) => /// /// Repeat the provided operation based on the schedule provided /// + /// Effect runtime type /// Bound value type /// public static Effect repeat(Schedule schedule, Effect ma) => @@ -101,6 +112,7 @@ public static Effect repeat(Schedule schedule, Effect ma) = /// /// Continually lift & repeat the provided operation /// + /// Effect runtime type /// Bound value type /// public static Effect repeatM(K, A> ma) => @@ -109,6 +121,7 @@ public static Effect repeatM(K, A> ma) => /// /// Repeat the provided operation based on the schedule provided /// + /// Effect runtime type /// Bound value type /// public static Effect repeatM(Schedule schedule, K, A> ma) => diff --git a/LanguageExt.Pipes/Effect/Effect.Monad.cs b/LanguageExt.Pipes/Effect/Effect.Monad.cs index dd7c25087..7e5a034fb 100644 --- a/LanguageExt.Pipes/Effect/Effect.Monad.cs +++ b/LanguageExt.Pipes/Effect/Effect.Monad.cs @@ -48,4 +48,9 @@ static K, A> Applicative>.Actions( fas.Select(fa => fa.As().Proxy) .Actions() .ToEffect(); + + static K, ForkIO> MonadIO>.ForkIO( + K, A> ma, + Option timeout) => + Effect.liftM(ma.As().Run().ForkIO(timeout)); } diff --git a/LanguageExt.Pipes/EffectT/EffectT.Extensions.cs b/LanguageExt.Pipes/EffectT/EffectT.Extensions.cs index 10a1b5dc3..db9f3a7ba 100644 --- a/LanguageExt.Pipes/EffectT/EffectT.Extensions.cs +++ b/LanguageExt.Pipes/EffectT/EffectT.Extensions.cs @@ -25,6 +25,12 @@ public static EffectT As(this K, A> ma) public static Effect ToEff(this K>, A> ma) => ma.As(); + /// + /// Convert to the `Eff` version of `Effect` + /// + public static EffectT, A> FromEff(this K, A> ma) => + new(ma.As().Proxy); + /// /// Monad bind /// diff --git a/LanguageExt.Pipes/EffectT/EffectT.Monad.cs b/LanguageExt.Pipes/EffectT/EffectT.Monad.cs index 3e07491f0..a3f070db6 100644 --- a/LanguageExt.Pipes/EffectT/EffectT.Monad.cs +++ b/LanguageExt.Pipes/EffectT/EffectT.Monad.cs @@ -49,4 +49,9 @@ static K, A> Applicative>.Actions( fas.Select(fa => fa.As().Proxy) .Actions() .ToEffect(); + + static K, ForkIO> MonadIO>.ForkIO( + K, A> ma, + Option timeout) => + MonadT.lift, M, ForkIO>(ma.As().Run().ForkIO(timeout)); } diff --git a/LanguageExt.Pipes/Pipe/Pipe.Module.cs b/LanguageExt.Pipes/Pipe/Pipe.Module.cs index ad94ca4f2..443231dbc 100644 --- a/LanguageExt.Pipes/Pipe/Pipe.Module.cs +++ b/LanguageExt.Pipes/Pipe/Pipe.Module.cs @@ -14,6 +14,7 @@ public static class Pipe /// /// Yield a value downstream /// + /// Effect runtime type /// Stream value to consume /// Stream value to produce /// @@ -23,6 +24,7 @@ public static Pipe yield(OUT value) => /// /// Yield all values downstream /// + /// Effect runtime type /// Stream value to consume /// Stream value to produce /// @@ -32,15 +34,37 @@ public static Pipe yieldAll(IEnumerable val /// /// Yield all values downstream /// + /// Effect runtime type /// Stream value to consume /// Stream value to produce /// public static Pipe yieldAll(IAsyncEnumerable values) => PipeT.yieldAll, IN, OUT>(values); + /// + /// Evaluate the `M` monad repeatedly, yielding its bound values downstream + /// + /// Effect runtime type + /// Stream value to consume + /// Stream value to produce + /// + public static Pipe yieldRepeat(K, OUT> ma) => + PipeT.yieldRepeat, IN, OUT>(ma); + + /// + /// Evaluate the `IO` monad repeatedly, yielding its bound values downstream + /// + /// Effect runtime type + /// Stream value to consume + /// Stream value to produce + /// + public static Pipe yieldRepeatIO(IO ma) => + PipeT.yieldRepeatIO, IN, OUT>(ma); + /// /// Await a value from upstream /// + /// Effect runtime type /// Stream value to consume /// Stream value to produce /// Pipe @@ -51,6 +75,7 @@ public static Pipe awaiting() => /// Create a pipe that filters out values that return `false` when applied to a predicate function /// /// Convert `Outbox` to a `Producer` pipe component diff --git a/LanguageExt.Pipes/Consumer/Consumer.Module.cs b/LanguageExt.Pipes/Consumer/Consumer.Module.cs index 4d770323f..289d93ca3 100644 --- a/LanguageExt.Pipes/Consumer/Consumer.Module.cs +++ b/LanguageExt.Pipes/Consumer/Consumer.Module.cs @@ -13,8 +13,8 @@ public static class Consumer /// /// Await a value from upstream /// + /// Effect runtime type /// Stream value to consume - /// Lifted monad type /// public static Consumer awaiting() => PipeT.awaiting, IN, Void>().ToConsumer(); @@ -22,8 +22,8 @@ public static Consumer awaiting() => /// /// Await a value from upstream and then ignore it /// + /// Effect runtime type /// Stream value to consume - /// Lifted monad type /// public static Consumer awaitIgnore() => new PipeTAwait, Unit>(_ => PipeT.pure, Unit>(default)).ToConsumer(); @@ -31,8 +31,8 @@ public static Consumer awaitIgnore() => /// /// Create a consumer that simply returns a bound value without awaiting anything /// + /// Effect runtime type /// Stream value to await - /// Lifted monad type /// Bound value type /// public static Consumer pure(A value) => @@ -41,8 +41,8 @@ public static Consumer pure(A value) => /// /// Create a consumer that always fails /// + /// Effect runtime type /// Stream value to await - /// Lifted monad type /// Bound value type /// public static Consumer error(Error value) => @@ -51,8 +51,8 @@ public static Consumer error(Error value) => /// /// Create a consumer that yields nothing at all /// + /// Effect runtime type /// Stream value to consume - /// Lifted monad type /// Bound value type /// public static Consumer empty() => @@ -61,8 +61,8 @@ public static Consumer empty() => /// /// Create a consumer that simply returns a bound value without yielding anything /// + /// Effect runtime type /// Stream value to consume - /// Lifted monad type /// Bound value type /// public static Consumer lift(Func f) => @@ -71,8 +71,8 @@ public static Consumer lift(Func f) => /// /// Create a lazy consumer /// + /// Effect runtime type /// Stream value to consume - /// Lifted monad type /// Bound value type /// public static Consumer liftT(Func> f) => @@ -81,8 +81,8 @@ public static Consumer liftT(Func> f) /// /// Create an asynchronous lazy consumer /// + /// Effect runtime type /// Stream value to consume - /// Lifted monad type /// Bound value type /// public static Consumer liftT(Func>> f) => @@ -91,8 +91,8 @@ public static Consumer liftT(Func /// Create an asynchronous consumer /// /// Mapping function + /// Effect runtime type /// Stream value to consume /// Stream value to produce /// Pipe @@ -70,6 +96,7 @@ public static Pipe map(Func f) => /// Create a pipe from a mapping function /// /// Mapping function + /// Effect runtime type /// Stream value to consume /// Stream value to produce /// Pipe @@ -79,6 +106,7 @@ public static Pipe mapM(Func, OUT> /// /// Create a pipe that simply returns a bound value without yielding anything /// + /// Effect runtime type /// Stream value to consume /// Stream value to produce /// Bound value type @@ -89,6 +117,7 @@ public static Pipe pure(A value) => /// /// Create a pipe that always fails /// + /// Effect runtime type /// Stream value to consume /// Stream value to produce /// Bound value type @@ -99,6 +128,7 @@ public static Pipe error(Error value) => /// /// Create a pipe that yields nothing at all /// + /// Effect runtime type /// Stream value to consume /// Stream value to produce /// Bound value type @@ -109,6 +139,7 @@ public static Pipe empty() => /// /// Create a pipe that simply returns a bound value without yielding anything /// + /// Effect runtime type /// Stream value to consume /// Stream value to produce /// Bound value type @@ -119,6 +150,7 @@ public static Pipe lift(Func f) => /// /// Create a lazy pipe /// + /// Effect runtime type /// Stream value to consume /// Stream value to produce /// Bound value type @@ -129,6 +161,7 @@ public static Pipe liftT(Func /// Create an asynchronous lazy pipe /// + /// Effect runtime type /// Stream value to consume /// Stream value to produce /// Bound value type @@ -139,6 +172,7 @@ public static Pipe liftT(Func /// Create an asynchronous pipe /// + /// Effect runtime type /// Stream value to consume /// Stream value to produce /// Bound value type @@ -149,6 +183,7 @@ public static Pipe liftT(ValueTask /// Create a pipe that simply returns the bound value of the lifted monad without yielding anything /// + /// Effect runtime type /// Stream value to consume /// Stream value to produce /// Bound value type @@ -159,6 +194,7 @@ public static Pipe liftM(K, A> ma) => /// /// Create a pipe that simply returns the bound value of the lifted monad without yielding anything /// + /// Effect runtime type /// Stream value to consume /// Stream value to produce /// Bound value type @@ -169,6 +205,7 @@ public static Pipe liftM(ValueTask, A> /// /// Create a pipe that simply returns the bound value of the lifted monad without yielding anything /// + /// Effect runtime type /// Stream value to consume /// Stream value to produce /// Bound value type @@ -179,6 +216,7 @@ public static Pipe liftIO(IO ma) => /// /// Continually repeat the provided operation /// + /// Effect runtime type /// Stream value to consume /// Stream value to produce /// Bound value type @@ -189,6 +227,7 @@ public static Pipe repeat(Pipe m /// /// Repeat the provided operation based on the schedule provided /// + /// Effect runtime type /// Stream value to consume /// Stream value to produce /// Bound value type @@ -199,6 +238,7 @@ public static Pipe repeat(Schedule schedule, Pip /// /// Continually lift & repeat the provided operation /// + /// Effect runtime type /// Stream value to consume /// Stream value to produce /// Bound value type @@ -209,6 +249,7 @@ public static Pipe repeatM(K, A> ma) => /// /// Repeat the provided operation based on the schedule provided /// + /// Effect runtime type /// Stream value to consume /// Stream value to produce /// Bound value type @@ -224,6 +265,7 @@ public static Pipe repeatM(Schedule schedule, K< /// Fold function /// Initial state /// Pipe to fold + /// Effect runtime type /// Stream value to consume /// Stream value to produce /// Bound value type @@ -243,6 +285,7 @@ public static Pipe fold( /// Until predicate /// Initial state /// Pipe to fold + /// Effect runtime type /// Stream value to consume /// Stream value to produce /// Bound value type @@ -263,6 +306,7 @@ public static Pipe foldUntil( /// Until predicate /// Initial state /// Pipe to fold + /// Effect runtime type /// Stream value to consume /// Stream value to produce /// Bound value type @@ -283,6 +327,7 @@ public static Pipe foldUntil( /// Until predicate /// Initial state /// Pipe to fold + /// Effect runtime type /// Stream value to consume /// Stream value to produce /// Bound value type @@ -303,6 +348,7 @@ public static Pipe foldWhile( /// Until predicate /// Initial state /// Pipe to fold + /// Effect runtime type /// Stream value to consume /// Stream value to produce /// Bound value type @@ -322,6 +368,7 @@ public static Pipe foldWhile( /// Schedule to run each item /// Fold function /// Initial state + /// Effect runtime type /// Stream value to consume /// Stream value to produce /// @@ -338,6 +385,7 @@ public static Pipe fold( /// Fold function /// Until predicate /// Initial state + /// Effect runtime type /// Stream value to consume /// Stream value to produce /// @@ -355,6 +403,7 @@ public static Pipe foldUntil( /// Fold function /// Until predicate /// Initial state + /// Effect runtime type /// Stream value to consume /// Stream value to produce /// @@ -372,6 +421,7 @@ public static Pipe foldUntil( /// Fold function /// Until predicate /// Initial state + /// Effect runtime type /// Stream value to consume /// Stream value to produce /// @@ -389,6 +439,7 @@ public static Pipe foldWhile( /// Fold function /// Until predicate /// Initial state + /// Effect runtime type /// Stream value to consume /// Stream value to produce /// diff --git a/LanguageExt.Pipes/Pipe/Pipe.cs b/LanguageExt.Pipes/Pipe/Pipe.cs index 7b5a5e419..789f4f27f 100644 --- a/LanguageExt.Pipes/Pipe/Pipe.cs +++ b/LanguageExt.Pipes/Pipe/Pipe.cs @@ -1,6 +1,7 @@ using System; using System.Diagnostics.Contracts; using System.Threading.Tasks; +using LanguageExt.Pipes.Concurrent; using LanguageExt.Traits; namespace LanguageExt.Pipes; @@ -36,7 +37,7 @@ public Consumer Compose(Consumer rhs) => [Pure] public Consumer Compose(ConsumerT, A> rhs) => Proxy.Compose(rhs.Proxy); - + [Pure] public static Pipe operator | (Pipe lhs, Pipe rhs) => lhs.Compose(rhs); diff --git a/LanguageExt.Pipes/PipeT/PipeT.Module.cs b/LanguageExt.Pipes/PipeT/PipeT.Module.cs index 3ee934cd1..412855289 100644 --- a/LanguageExt.Pipes/PipeT/PipeT.Module.cs +++ b/LanguageExt.Pipes/PipeT/PipeT.Module.cs @@ -46,6 +46,28 @@ public static PipeT yieldAll(IEnumerable valu public static PipeT yieldAll(IAsyncEnumerable values) where M : Monad => new PipeTYieldAllAsync(values.Select(yield), pure); + + /// + /// Evaluate the `M` monad repeatedly, yielding its bound values downstream + /// + /// Stream value to consume + /// Stream value to produce + /// Lifted monad type + /// + public static PipeT yieldRepeat(K ma) + where M : Monad => + new PipeTYieldAll(Units.Select(_ => ma.Bind(yield)), pure); + + /// + /// Evaluate the `IO` monad repeatedly, yielding its bound values downstream + /// + /// Stream value to consume + /// Stream value to produce + /// Lifted monad type + /// + public static PipeT yieldRepeatIO(IO ma) + where M : Monad => + new PipeTYieldAll(Units.Select(_ => ma.Bind(yield)), pure); /// /// Await a value from upstream diff --git a/LanguageExt.Pipes/PipeT/PipeT.Monad.cs b/LanguageExt.Pipes/PipeT/PipeT.Monad.cs index b4264e878..fa56916e5 100644 --- a/LanguageExt.Pipes/PipeT/PipeT.Monad.cs +++ b/LanguageExt.Pipes/PipeT/PipeT.Monad.cs @@ -50,4 +50,9 @@ static K, B> MonadIO>.MapIO(K, IO> MonadIO>.ToIO(K, A> ma) => ma.MapIO(IO.pure); + + static K, ForkIO> MonadIO>.ForkIO( + K, A> ma, + Option timeout) => + MonadT.lift, M, ForkIO>(ma.As().Run().ForkIO(timeout)); } diff --git a/LanguageExt.Pipes/Producer/Producer.Module.cs b/LanguageExt.Pipes/Producer/Producer.Module.cs index b3398b425..98ed1f998 100644 --- a/LanguageExt.Pipes/Producer/Producer.Module.cs +++ b/LanguageExt.Pipes/Producer/Producer.Module.cs @@ -1,10 +1,7 @@ using System; -using System.Collections.Concurrent; using System.Collections.Generic; -using System.Runtime.CompilerServices; using System.Threading.Tasks; using LanguageExt.Common; -using LanguageExt.Pipes; using LanguageExt.Pipes.Concurrent; using LanguageExt.Traits; using static LanguageExt.Prelude; @@ -19,8 +16,8 @@ public static class Producer /// /// Yield a value downstream /// + /// Effect runtime type /// Stream value to produce - /// Lifted monad type /// public static Producer yield(OUT value) => PipeT.yield, Unit, OUT>(value); @@ -28,8 +25,8 @@ public static Producer yield(OUT value) => /// /// Yield all values downstream /// + /// Effect runtime type /// Stream value to produce - /// Lifted monad type /// public static Producer yieldAll(IEnumerable values) => PipeT.yieldAll, Unit, OUT>(values); @@ -37,17 +34,37 @@ public static Producer yieldAll(IEnumerable values) /// /// Yield all values downstream /// + /// Effect runtime type /// Stream value to produce - /// Lifted monad type /// public static Producer yieldAll(IAsyncEnumerable values) => PipeT.yieldAll, Unit, OUT>(values); + /// + /// Evaluate the `M` monad repeatedly, yielding its bound values downstream + /// + /// Effect runtime type + /// Stream value to consume + /// Stream value to produce + /// + public static Producer yieldRepeat(K, OUT> ma) => + PipeT.yieldRepeat, Unit, OUT>(ma); + + /// + /// Evaluate the `IO` monad repeatedly, yielding its bound values downstream + /// + /// Effect runtime type + /// Stream value to consume + /// Stream value to produce + /// + public static Producer yieldRepeatIO(IO ma) => + PipeT.yieldRepeatIO, Unit, OUT>(ma); + /// /// Create a producer that simply returns a bound value without yielding anything /// + /// Effect runtime type /// Stream value to produce - /// Lifted monad type /// Bound value type /// public static Producer pure(A value) => @@ -56,8 +73,8 @@ public static Producer pure(A value) => /// /// Create a producer that always fails /// + /// Effect runtime type /// Stream value to produce - /// Lifted monad type /// Bound value type /// public static Producer error(Error value) => @@ -66,8 +83,8 @@ public static Producer error(Error value) => /// /// Create a producer that yields nothing at all /// + /// Effect runtime type /// Stream value to produce - /// Lifted monad type /// Bound value type /// public static Producer empty() => @@ -76,8 +93,8 @@ public static Producer empty() => /// /// Create a producer that lazily returns a bound value without yielding anything /// + /// Effect runtime type /// Stream value to produce - /// Lifted monad type /// Bound value type /// public static Producer lift(Func f) => @@ -86,8 +103,8 @@ public static Producer lift(Func f) => /// /// Create a producer that simply returns the bound value of the lifted monad without yielding anything /// + /// Effect runtime type /// Stream value to produce - /// Lifted monad type /// Bound value type /// public static Producer liftM(K, A> ma) => @@ -96,8 +113,8 @@ public static Producer liftM(K, A> ma) => /// /// Create a producer that simply returns the bound value of the lifted monad without yielding anything /// + /// Effect runtime type /// Stream value to produce - /// Lifted monad type /// Bound value type /// public static Producer liftIO(IO ma) => @@ -107,7 +124,6 @@ public static Producer liftIO(IO ma) => /// Create a lazy proxy /// /// Stream value to produce - /// Lifted monad type /// Bound value type /// public static Producer liftT(Func> f) => @@ -116,8 +132,8 @@ public static Producer liftT(Func> /// /// Create an asynchronous lazy proxy /// + /// Effect runtime type /// Stream value to produce - /// Lifted monad type /// Bound value type /// public static Producer liftT(Func>> f) => @@ -126,8 +142,8 @@ public static Producer liftT(Func /// Create an asynchronous proxy /// + /// Effect runtime type /// Stream value to produce - /// Lifted monad type /// Bound value type /// public static Producer liftT(ValueTask> f) => @@ -136,8 +152,8 @@ public static Producer liftT(ValueTask /// Continually repeat the provided operation /// + /// Effect runtime type /// Stream value to produce - /// Lifted monad type /// Bound value type /// public static Producer repeat(Producer ma) => @@ -146,8 +162,8 @@ public static Producer repeat(Producer ma) = /// /// Repeat the provided operation based on the schedule provided /// + /// Effect runtime type /// Stream value to produce - /// Lifted monad type /// Bound value type /// public static Producer repeat(Schedule schedule, Producer ma) => @@ -157,7 +173,6 @@ public static Producer repeat(Schedule schedule, Produce /// Continually lift & repeat the provided operation /// /// Stream value to produce - /// Lifted monad type /// Bound value type /// public static Producer repeatM(K, A> ma) => @@ -166,8 +181,8 @@ public static Producer repeatM(K, A> ma) => /// /// Repeat the provided operation based on the schedule provided /// + /// Effect runtime type /// Stream value to produce - /// Lifted monad type /// Bound value type /// public static Producer repeatM(Schedule schedule, K, A> ma) => @@ -181,8 +196,8 @@ public static Producer repeatM(Schedule schedule, KFold function /// Initial state /// Pipe to fold + /// Effect runtime type /// Stream value to produce - /// Lifted monad type /// Bound value type /// public static Producer fold( @@ -200,8 +215,8 @@ public static Producer fold( /// Until predicate /// Initial state /// Pipe to fold + /// Effect runtime type /// Stream value to produce - /// Lifted monad type /// Bound value type /// public static Producer foldUntil( @@ -220,8 +235,8 @@ public static Producer foldUntil( /// Until predicate /// Initial state /// Pipe to fold + /// Effect runtime type /// Stream value to produce - /// Lifted monad type /// Bound value type /// public static Producer foldUntil( @@ -240,8 +255,8 @@ public static Producer foldUntil( /// Until predicate /// Initial state /// Pipe to fold + /// Effect runtime type /// Stream value to produce - /// Lifted monad type /// Bound value type /// public static Producer foldWhile( @@ -260,8 +275,8 @@ public static Producer foldWhile( /// Until predicate /// Initial state /// Pipe to fold + /// Effect runtime type /// Stream value to produce - /// Lifted monad type /// Bound value type /// public static Producer foldWhile( @@ -271,14 +286,26 @@ public static Producer foldWhile( OUT Init, Producer Item) => PipeT.foldWhile(Time, Fold, Pred, Init, Item.Proxy); + + /// + /// Merge multiple producers + /// + /// Producers to merge + /// Buffer settings + /// Effect runtime type + /// Stream value to produce + /// Merged producer + public static Producer merge( + params Producer[] producers) => + merge(toSeq(producers)); /// /// Merge multiple producers /// /// Producers to merge /// Buffer settings + /// Effect runtime type /// Stream value to produce - /// Lifted monad type /// Merged producer public static Producer merge( Seq> producers, diff --git a/LanguageExt.Pipes/ProducerT/ProducerT.Module.cs b/LanguageExt.Pipes/ProducerT/ProducerT.Module.cs index 6c8678c16..7e1bac8e1 100644 --- a/LanguageExt.Pipes/ProducerT/ProducerT.Module.cs +++ b/LanguageExt.Pipes/ProducerT/ProducerT.Module.cs @@ -46,6 +46,28 @@ public static ProducerT yieldAll(IAsyncEnumerable val where M : Monad => PipeT.yieldAll(values); + /// + /// Evaluate the `M` monad repeatedly, yielding its bound values downstream + /// + /// Stream value to consume + /// Stream value to produce + /// Lifted monad type + /// + public static ProducerT yieldRepeat(K ma) + where M : Monad => + PipeT.yieldRepeat(ma); + + /// + /// Evaluate the `IO` monad repeatedly, yielding its bound values downstream + /// + /// Stream value to consume + /// Stream value to produce + /// Lifted monad type + /// + public static ProducerT yieldRepeatIO(IO ma) + where M : Monad => + PipeT.yieldRepeatIO(ma); + /// /// Create a producer that simply returns a bound value without yielding anything /// @@ -304,6 +326,19 @@ public static ProducerT foldWhile( ProducerT Item) where M : Monad => PipeT.foldWhile(Time, Fold, Pred, Init, Item.Proxy); + + /// + /// Merge multiple producers + /// + /// Producers to merge + /// Buffer settings + /// Stream value to produce + /// Lifted monad type + /// Merged producer + public static ProducerT merge( + params ProducerT[] producers) + where M : Monad => + merge(toSeq(producers)); /// /// Merge multiple producers @@ -320,10 +355,17 @@ public static ProducerT merge( { if (producers.Count == 0) return pure(default); - return from mailbox in Pure(Mailbox.spawn(settings ?? Buffer.Unbounded)) - from forks in producers.Traverse(p => (p | mailbox.ToConsumerT()).ForkIO()).As().Run() + return from mailbox in Pure(Mailbox.spawn(settings ?? Buffer.Unbounded, "merge")) + from forks in forkEffects(producers, mailbox) from _ in mailbox.ToProducerT() from x in forks.Traverse(f => f.Cancel).As() select unit; } + + static K>> forkEffects( + Seq> producers, + Mailbox mailbox) + where M : Monad => + producers.Map(p => (p | mailbox.ToConsumerT()).Run()) + .Traverse(ma => ma.ForkIO()); } diff --git a/LanguageExt.Tests/PipesTests.cs b/LanguageExt.Tests/PipesTests.cs index c9a6da735..845ac9ffb 100644 --- a/LanguageExt.Tests/PipesTests.cs +++ b/LanguageExt.Tests/PipesTests.cs @@ -1,7 +1,7 @@ -using LanguageExt.Pipes; -using LanguageExt.Sys.Test; +using LanguageExt.Sys.Test; using Xunit; -using static LanguageExt.Pipes.Proxy; +using static LanguageExt.Pipes.Producer; +using static LanguageExt.Pipes.Consumer; namespace LanguageExt.Tests; @@ -11,12 +11,11 @@ public class PipesTests public void MergeSynchronousProducersSucceeds() { using var rt = Runtime.New(); - compose(Producer.merge>( - yield(1), - yield(1)), - awaiting().Map(ignore)) - .RunEffect().As() - .Run(rt, EnvIO.New()) - .Ignore(); + + (merge(yield(1), yield(1)) + | awaiting().Map(ignore)) + .Run().As() + .Run(rt, EnvIO.New()) + .Ignore(); } } diff --git a/LanguageExt.Tests/Sys/Diag/ActivityTests.cs b/LanguageExt.Tests/Sys/Diag/ActivityTests.cs index ac6a15551..846846dc8 100644 --- a/LanguageExt.Tests/Sys/Diag/ActivityTests.cs +++ b/LanguageExt.Tests/Sys/Diag/ActivityTests.cs @@ -273,7 +273,7 @@ public static void Case24() "test", ActivityKind.Client, HashMap(("1", "a" as object), ("2", "b")), - A.kind.ZipIO(A.tags)) + A.kind.Zip(A.tags)) .ArrangeAndAct(); kind.IsSome.Should().BeTrue(); kind.Case.Should().Be(ActivityKind.Client); @@ -299,7 +299,7 @@ public static void Case27() var (kind, tags) = A.span("test", ActivityKind.Client, HashMap(("1", "a" as object), ("2", "b")), - A.kind.ZipIO(A.tags)) + A.kind.Zip(A.tags)) .ArrangeAndAct(); Assert.True(kind.IsSome); @@ -323,7 +323,7 @@ from result in A.span( HashMap(("1", "a" as object), ("2", "b")), Seq.Empty, DateTimeOffset.Now, - A.kind.ZipIO(A.tags)) + A.kind.Zip(A.tags)) select result ) .ArrangeAndAct(); diff --git a/Samples/EffectsExamples/Examples/QueueExample.cs b/Samples/EffectsExamples/Examples/QueueExample.cs index 267d3d755..09684860f 100644 --- a/Samples/EffectsExamples/Examples/QueueExample.cs +++ b/Samples/EffectsExamples/Examples/QueueExample.cs @@ -29,8 +29,8 @@ public static class QueueExample public static Eff main() { // Create two queues. Queues are Producers that have an Enqueue function - var queue1 = Mailbox.spawn(); - var queue2 = Mailbox.spawn(); + var queue1 = Mailbox.spawn("mailbox-1"); + var queue2 = Mailbox.spawn("mailbox-2"); // Compose the queues with a pipe that prepends some text to what they produce var queues = Seq(queue1.ToProducer() | prepend("Queue 1: "), @@ -39,8 +39,8 @@ public static Eff main() // Run the queues in a forked task // Repeatedly read from the console and write to one of the two queues depending on // whether the first char is 1 or 2 - var effect = from f in fork(Producer.merge(queues) | writeLine) - from x in repeat(Console.readLines | writeToQueue(queue1, queue2)) + var effect = from f in fork(Producer.merge(queues) | Consumer.repeat(writeLine)) + from x in Console.readLines | writeToQueue(queue1, queue2) | Schedule.Forever from _ in f.Cancel // cancels the forked task select unit; @@ -66,7 +66,8 @@ from u in guard(x.Length > 0, Error.New("exiting")) /// Pipe that prepends the text provided to the awaited value and then yields it /// static Pipe prepend(string x) => - from l in Pipe.awaiting() + from l in Pipe.awaiting() + from TO_REMOVE in Console.writeLine($"PREPENDING: {x}") from _ in Pipe.yield($"{x}{l}") select unit; diff --git a/Samples/PipesExamples/Program.cs b/Samples/PipesExamples/Program.cs index 5add49694..9faf5e9f6 100644 --- a/Samples/PipesExamples/Program.cs +++ b/Samples/PipesExamples/Program.cs @@ -35,7 +35,7 @@ from _2 in Consumer.lift(writeLine($"post-await: {x}")) var p = yieldAll(Range(1, 10000000)); -var o = foldUntil(Time: Schedule.recurs(50), +var o = foldUntil(Time: Schedule.spaced(10) | Schedule.recurs(3), Fold: (s, v) => s + v, Pred: v => v.Value % 10000 == 0, Init: 0, @@ -53,57 +53,3 @@ from _ in writeLine($"{x}") static IO writeLine(object? value) => IO.lift(() => Console.WriteLine(value)); - -public record DbEnv; -public record Db(ReaderT RunDb) : K -{ - public Db Select(Func m) => this.Kind().Select(m).As(); - public Db SelectMany(Func> b, Func p) => this.Kind().SelectMany(b, p).As(); - public Db SelectMany(Func> b, Func p) => this.Kind().SelectMany(b, p).As(); -} - -public static class DbExtensions -{ - public static Db As(this K ma) => - (Db)ma; -} -public class Db : Monad, Fallible, Readable -{ - public static K Bind(K ma, Func> f) => - new Db(ma.As().RunDb.Bind(x => f(x).As().RunDb)); - - public static K Map(Func f, K ma) => - new Db(ma.As().RunDb.Map(f)); - - public static K Pure(A value) => - new Db(ReaderT.pure(value)); - - public static K Apply(K> mf, K ma) => - new Db(mf.As().RunDb.Apply(ma.As().RunDb)); - - public static K Fail(Error error) => - new Db(ReaderT.liftIO(error)); - - public static K Catch(K fa, Func Predicate, Func> Fail) => - from env in Readable.ask() - from res in fa.As() - .RunDb - .runReader(env) - .Catch(Predicate, x => Fail(x).As().RunDb.runReader(env)) - select res; - - public static K Asks(Func f) => - new Db(ReaderT.asks(f)); - - public static K Local(Func f, K ma) => - new Db(ReaderT.local(f, ma.As().RunDb)); - - public static K LiftIO(IO ma) => - new Db(ReaderT.liftIO(ma)); - - public static K MapIO(K ma, Func, IO> f) => - new Db(ma.As().RunDb.MapIO(f).As()); - - public static K> ToIO(K ma) => - ma.MapIO(IO.pure); -} diff --git a/Samples/TestBed.Web/Program.cs b/Samples/TestBed.Web/Program.cs index 031a7557f..13f2209b0 100644 --- a/Samples/TestBed.Web/Program.cs +++ b/Samples/TestBed.Web/Program.cs @@ -1,4 +1,5 @@ -using static LanguageExt.Prelude; +using LanguageExt; +using static LanguageExt.Prelude; var builder = WebApplication.CreateBuilder(args); var app = builder.Build(); diff --git a/Samples/TestBed/PipesTest.cs b/Samples/TestBed/PipesTest.cs index ac2783602..12c8b87a4 100644 --- a/Samples/TestBed/PipesTest.cs +++ b/Samples/TestBed/PipesTest.cs @@ -5,15 +5,17 @@ using LanguageExt.Sys; using LanguageExt.Sys.Live; using static LanguageExt.Prelude; -using static LanguageExt.Pipes.Proxy; +using static LanguageExt.Pipes.Producer; +using static LanguageExt.Pipes.Consumer; +using static LanguageExt.Pipes.Pipe; using static LanguageExt.UnitsOfMeasure; namespace TestBed; public static class PipesTestBed { - static Producer, Unit> numbers(int n, int failOn) => - from _ in yield(n) + static Producer numbers(int n, int failOn) => + from _ in yield(n) from t in Time.sleepFor(1000 * ms) from x in failOn == n ? FailEff(Error.New($"failed on {n}")) @@ -23,42 +25,32 @@ from r in n < 0 : numbers(n - 1, failOn) select r; - public static Effect, Unit> effect => - Producer.merge(numbers(10, 5), numbers(20, 0)) | writeLine(); + public static Effect effect => + merge(numbers(10, 5), numbers(20, 0)) | writeLine(); - static Consumer, Unit> writeLine() => - from x in awaiting() + static Consumer writeLine() => + from x in awaiting() from _ in Console.writeLine($"{x}") from r in writeLine() select r; - public static Effect, Unit> effect1 => + public static Effect effect1 => repeat(producer) | doubleIt | consumer; - static Producer, Unit> producer => + static Producer producer => from _1 in Console.writeLine("before") - from _2 in yieldAll(Range(1, 5)) + from _2 in yieldAll(Range(1, 5)) from _3 in Console.writeLine("after") select unit; - static Pipe, Unit> doubleIt => - from x in awaiting() - from _ in yield(x * 2) + static Pipe doubleIt => + from x in awaiting() + from _ in yield(x * 2) select unit; - static Consumer, Unit> consumer => - from i in awaiting() + static Consumer consumer => + from i in awaiting() from _ in Console.writeLine(i.ToString()) from r in consumer select r; - - public static Effect, Unit> effect2 => - Producer.repeatM(Time.nowUTC) - | Pipe.mapM, Unit>(dt => - from _1 in Console.setColour(ConsoleColor.Green) - from _2 in Console.writeLine(dt.ToLongTimeString()) - from _3 in Console.resetColour - select dt) - | writeLine(); - } diff --git a/Samples/TestBed/Program.cs b/Samples/TestBed/Program.cs index 51855533e..085c93981 100644 --- a/Samples/TestBed/Program.cs +++ b/Samples/TestBed/Program.cs @@ -7,71 +7,11 @@ //////////////////////////////////////////////////////////////////////////////////////////////////////// using System; -using System.Collections.Generic; -using System.Linq; using LanguageExt; -using System.Text; -using LanguageExt.Sys; -using LanguageExt.Pipes; -using LanguageExt.Sys.IO; -using System.Reactive.Linq; -using System.Threading; -using LanguageExt.Sys.Live; using System.Threading.Tasks; -using LanguageExt.Common; -using LanguageExt.Traits; using TestBed; using static LanguageExt.Prelude; -using static LanguageExt.Pipes.Proxy; -public interface IAsyncQueue -{ - Task DequeueAsync(); -} - -public static class Ext -{ - static Producer, Unit> ToProducer(this IAsyncQueue q) - { - return yieldAll(go()); - - async IAsyncEnumerable go() - { - while (true) - { - yield return await q.DequeueAsync(); - } - } - } - - public static Producer, Unit> ToProducer(this IAsyncQueue[] qs) => - Producer.merge(qs.AsIterable().Map(q => q.ToProducer()).ToSeq()); -} - -public static class YourPrelude -{ - public static Eff OptionalAff(this Func> task) => - OptionalEff(task, Errors.None); - - public static Eff OptionalEff(this Func> task, Error fail) => - liftEff(async () => - await task() switch - { - null => fail, - var x => FinSucc(x) - }); - - public static Eff OptionalAff(this Func> task) => - OptionalAff(task, Errors.None); - - public static Eff OptionalAff(this Func> task, Error fail) => - liftEff(async () => - await task() switch - { - null => fail, - var x => FinSucc(x) - }); -} public class Program { @@ -168,6 +108,7 @@ from y in IO.lift(() => Console.WriteLine(t)) var mr = my.Run("Paul").Run().Run(); } + /* public static void PipesTest() { // Create two queues. Queues are Producers that have an Enqueue function @@ -356,6 +297,7 @@ from l in Consumer.awaiting, string>() from a in Console, Runtime>.writeLine(l) from n in writeLine2 select unit; + */ /*static Server incrementer(int question) => @@ -372,6 +314,7 @@ from _2 in Client.lift(Console.writeLine($"Cli select unit;*/ + /* static Pipe, Unit> pipeMap => - Pipe.map((string x) => $"Hello {x}"); + Pipe.map((string x) => $"Hello {x}");*/ }