Skip to content

Commit

Permalink
Refactor QueueT and introduce Mailbox and Inbox for concurrency
Browse files Browse the repository at this point in the history
This commit removes `QueueT` and its associated files, replacing it with `Mailbox` and `Inbox` for improved concurrency handling. It also introduces new contravariant functor traits (`Cofunctor` and `Divisible`) and updates relevant methods for better abstraction and composition. Additionally, minor naming adjustments (e.g., `Swap` to `SwapMaybe`) were made for clarity and consistency.
  • Loading branch information
louthy committed Feb 5, 2025
1 parent 9de34be commit e709ccb
Show file tree
Hide file tree
Showing 45 changed files with 1,917 additions and 208 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -12,15 +12,27 @@ public static StreamT<M, A> AsStream<M, A>(this IAsyncEnumerable<A> ma)
where M : Monad<M> =>
StreamT.lift<M, A>(ma);

public static async IAsyncEnumerable<B> Map<A, B>(this IAsyncEnumerable<A> ma, Func<A, B> f)
public static async IAsyncEnumerable<B> MapAsync<A, B>(this IAsyncEnumerable<A> ma, Func<A, B> f)
{
await foreach (var a in ma)
{
yield return f(a);
}
}

public static async IAsyncEnumerable<B> Bind<A, B>(this IAsyncEnumerable<A> ma, Func<A, IAsyncEnumerable<B>> f)
public static async IAsyncEnumerable<B> ApplyAsync<A, B>(this IAsyncEnumerable<Func<A, B>> ff, IAsyncEnumerable<A> fa)
{
await foreach (var f in ff)
{
// ReSharper disable once PossibleMultipleEnumeration
await foreach (var a in fa)
{
yield return f(a);
}
}
}

public static async IAsyncEnumerable<B> BindAsync<A, B>(this IAsyncEnumerable<A> ma, Func<A, IAsyncEnumerable<B>> f)
{
await foreach (var a in ma)
{
Expand All @@ -31,7 +43,7 @@ public static async IAsyncEnumerable<B> Bind<A, B>(this IAsyncEnumerable<A> ma,
}
}

public static async IAsyncEnumerable<A> Filter<A>(
public static async IAsyncEnumerable<A> FilterAsync<A>(
this IAsyncEnumerable<A> ma,
Func<A, bool> f)
{
Expand Down
32 changes: 32 additions & 0 deletions LanguageExt.Core/Common/Errors.cs
Original file line number Diff line number Diff line change
Expand Up @@ -201,4 +201,36 @@ public static class Errors
/// IO DSL extension error
/// </summary>
public static readonly Error IODSLExtension = new Exceptional(IODSLExtensionText, IODSLExtensionCode);

/// <summary>
/// Inbox no space available error message
/// </summary>
public const string NoSpaceInInboxText =
"No inbox space available";

/// <summary>
/// Inbox no space available error code
/// </summary>
public const int NoSpaceInInboxCode = -2000000013;

/// <summary>
/// Inbox no space available error
/// </summary>
public static readonly Error NoSpaceInInbox = new Expected(NoSpaceInInboxText, NoSpaceInInboxCode);

/// <summary>
/// Outbox channel has been closed
/// </summary>
public const string OutboxChannelClosedText =
"Outbox channel has been closed";

/// <summary>
/// Outbox channel has been closed code
/// </summary>
public const int OutboxChannelClosedCode = -2000000014;

/// <summary>
/// Outbox channel has been closed error
/// </summary>
public static readonly Error OutboxChannelClosed = new Expected(OutboxChannelClosedText, OutboxChannelClosedCode);
}
6 changes: 3 additions & 3 deletions LanguageExt.Core/Concurrency/Atom/Atom.cs
Original file line number Diff line number Diff line change
Expand Up @@ -119,7 +119,7 @@ public A Swap(Func<A, A> f)
/// * If there is no validator for the Atom then the return value is always the snapshot of
/// the successful `f` function.
/// </returns>
public A Swap(Func<A, Option<A>> f)
public A SwapMaybe(Func<A, Option<A>> f)
{
f = f ?? throw new ArgumentNullException(nameof(f));

Expand Down Expand Up @@ -176,8 +176,8 @@ public IO<A> SwapIO(Func<A, A> f) =>
/// * If there is no validator for the Atom then the return value is always the snapshot of
/// the successful `f` function.
/// </returns>
public IO<A> SwapIO(Func<A, Option<A>> f) =>
IO.lift(_ => Swap(f));
public IO<A> SwapMaybeIO(Func<A, Option<A>> f) =>
IO.lift(_ => SwapMaybe(f));

/// <summary>
/// Value accessor (read and write)
Expand Down
2 changes: 1 addition & 1 deletion LanguageExt.Core/Concurrency/Prelude.Concurrency.cs
Original file line number Diff line number Diff line change
Expand Up @@ -410,7 +410,7 @@ public static A swap<A>(Atom<A> ma, Func<A, A> f) =>
/// the successful `f` function.
/// </returns>
public static A swap<A>(Atom<A> ma, Func<A, Option<A>> f) =>
ma.Swap(f);
ma.SwapMaybe(f);

/// <summary>
/// Atomically updates the value by passing the old value to `f` and updating
Expand Down
2 changes: 1 addition & 1 deletion LanguageExt.Core/Effects/IO/IO.Prelude.Concurreny.cs
Original file line number Diff line number Diff line change
Expand Up @@ -435,7 +435,7 @@ public static IO<A> swapIO<M, A>(Atom<M, A> ma, Func<M, A, A> f) =>
/// the successful `f` function.
/// </returns>
public static IO<A> swapIO<A>(Atom<A> ma, Func<A, Option<A>> f) =>
ma.SwapIO(f);
ma.SwapMaybeIO(f);

/// <summary>
/// Atomically updates the value by passing the old value to `f` and updating
Expand Down
28 changes: 14 additions & 14 deletions LanguageExt.Core/Effects/StreamT/StreamT.Extensions.cs
Original file line number Diff line number Diff line change
Expand Up @@ -334,7 +334,7 @@ public static StreamT<M, A> Merge<M, A>(
/// <returns>Stream of values</returns>
public static StreamT<M, A> Somes<M, A>(this IAsyncEnumerable<OptionT<M, A>> stream)
where M : Monad<M> =>
from xs in StreamT<M, StreamT<M, A>>.Lift(stream.Map(mx => mx.ToStream()))
from xs in StreamT<M, StreamT<M, A>>.Lift(stream.MapAsync(mx => mx.ToStream()))
from x in xs
select x;

Expand All @@ -347,7 +347,7 @@ from x in xs
/// <returns>Stream of values</returns>
public static StreamT<M, A> SomesStream<M, A>(this IAsyncEnumerable<Option<A>> stream)
where M : Monad<M> =>
from xs in StreamT<M, StreamT<M, A>>.Lift(stream.Map(mx => mx.ToStream<M>()))
from xs in StreamT<M, StreamT<M, A>>.Lift(stream.MapAsync(mx => mx.ToStream<M>()))
from x in xs
select x;

Expand Down Expand Up @@ -386,7 +386,7 @@ from x in xs
/// <returns>Stream of values</returns>
public static StreamT<M, A> Rights<M, L, A>(this IAsyncEnumerable<EitherT<L, M, A>> stream)
where M : Monad<M> =>
from xs in StreamT<M, StreamT<M, A>>.Lift(stream.Map(mx => mx.ToStream()))
from xs in StreamT<M, StreamT<M, A>>.Lift(stream.MapAsync(mx => mx.ToStream()))
from x in xs
select x;

Expand All @@ -399,7 +399,7 @@ from x in xs
/// <returns>Stream of values</returns>
public static StreamT<M, A> RightsStream<M, L, A>(this IAsyncEnumerable<Either<L, A>> stream)
where M : Monad<M> =>
from xs in StreamT<M, StreamT<M, A>>.Lift(stream.Map(mx => mx.ToStream<M>()))
from xs in StreamT<M, StreamT<M, A>>.Lift(stream.MapAsync(mx => mx.ToStream<M>()))
from x in xs
select x;

Expand Down Expand Up @@ -438,7 +438,7 @@ from x in xs
/// <returns>Stream of values</returns>
public static StreamT<M, L> Lefts<M, L, A>(this IAsyncEnumerable<EitherT<L, M, A>> stream)
where M : Monad<M> =>
from xs in StreamT<M, StreamT<M, L>>.Lift(stream.Map(mx => mx.LeftToStream()))
from xs in StreamT<M, StreamT<M, L>>.Lift(stream.MapAsync(mx => mx.LeftToStream()))
from x in xs
select x;

Expand All @@ -451,7 +451,7 @@ from x in xs
/// <returns>Stream of values</returns>
public static StreamT<M, L> LeftsStream<M, L, A>(this IAsyncEnumerable<Either<L, A>> stream)
where M : Monad<M> =>
from xs in StreamT<M, StreamT<M, L>>.Lift(stream.Map(mx => mx.LeftToStream<M>()))
from xs in StreamT<M, StreamT<M, L>>.Lift(stream.MapAsync(mx => mx.LeftToStream<M>()))
from x in xs
select x;

Expand Down Expand Up @@ -490,7 +490,7 @@ from x in xs
/// <returns>Stream of values</returns>
public static StreamT<M, A> Succs<M, A>(this IAsyncEnumerable<FinT<M, A>> stream)
where M : Monad<M> =>
from xs in StreamT<M, StreamT<M, A>>.Lift(stream.Map(mx => mx.ToStream()))
from xs in StreamT<M, StreamT<M, A>>.Lift(stream.MapAsync(mx => mx.ToStream()))
from x in xs
select x;

Expand All @@ -503,7 +503,7 @@ from x in xs
/// <returns>Stream of values</returns>
public static StreamT<M, A> SuccsStream<M, A>(this IAsyncEnumerable<Fin<A>> stream)
where M : Monad<M> =>
from xs in StreamT<M, StreamT<M, A>>.Lift(stream.Map(mx => mx.ToStream<M>()))
from xs in StreamT<M, StreamT<M, A>>.Lift(stream.MapAsync(mx => mx.ToStream<M>()))
from x in xs
select x;

Expand Down Expand Up @@ -542,7 +542,7 @@ from x in xs
/// <returns>Stream of values</returns>
public static StreamT<M, Error> Fails<M, A>(this IAsyncEnumerable<FinT<M, A>> stream)
where M : Monad<M> =>
from xs in StreamT<M, StreamT<M, Error>>.Lift(stream.Map(mx => mx.FailToStream()))
from xs in StreamT<M, StreamT<M, Error>>.Lift(stream.MapAsync(mx => mx.FailToStream()))
from x in xs
select x;

Expand All @@ -555,7 +555,7 @@ from x in xs
/// <returns>Stream of values</returns>
public static StreamT<M, Error> FailsStream<M, A>(this IAsyncEnumerable<Fin<A>> stream)
where M : Monad<M> =>
from xs in StreamT<M, StreamT<M, Error>>.Lift(stream.Map(mx => mx.FailToStream<M>()))
from xs in StreamT<M, StreamT<M, Error>>.Lift(stream.MapAsync(mx => mx.FailToStream<M>()))
from x in xs
select x;

Expand Down Expand Up @@ -595,7 +595,7 @@ from x in xs
public static StreamT<M, A> Succs<M, L, A>(this IAsyncEnumerable<ValidationT<L, M, A>> stream)
where L : Monoid<L>
where M : Monad<M> =>
from xs in StreamT<M, StreamT<M, A>>.Lift(stream.Map(mx => mx.ToStream()))
from xs in StreamT<M, StreamT<M, A>>.Lift(stream.MapAsync(mx => mx.ToStream()))
from x in xs
select x;

Expand All @@ -609,7 +609,7 @@ from x in xs
public static StreamT<M, A> SuccsStream<M, L, A>(this IAsyncEnumerable<Validation<L, A>> stream)
where L : Monoid<L>
where M : Monad<M> =>
from xs in StreamT<M, StreamT<M, A>>.Lift(stream.Map(mx => mx.ToStream<M>()))
from xs in StreamT<M, StreamT<M, A>>.Lift(stream.MapAsync(mx => mx.ToStream<M>()))
from x in xs
select x;

Expand Down Expand Up @@ -651,7 +651,7 @@ from x in xs
public static StreamT<M, L> Fails<M, L, A>(this IAsyncEnumerable<ValidationT<L, M, A>> stream)
where L : Monoid<L>
where M : Monad<M> =>
from xs in StreamT<M, StreamT<M, L>>.Lift(stream.Map(mx => mx.FailToStream()))
from xs in StreamT<M, StreamT<M, L>>.Lift(stream.MapAsync(mx => mx.FailToStream()))
from x in xs
select x;

Expand All @@ -665,7 +665,7 @@ from x in xs
public static StreamT<M, L> FailsStream<M, L, A>(this IAsyncEnumerable<Validation<L, A>> stream)
where L : Monoid<L>
where M : Monad<M> =>
from xs in StreamT<M, StreamT<M, L>>.Lift(stream.Map(mx => mx.FailToStream<M>()))
from xs in StreamT<M, StreamT<M, L>>.Lift(stream.MapAsync(mx => mx.FailToStream<M>()))
from x in xs
select x;

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -320,6 +320,9 @@ static async IAsyncEnumerable<A> Go(IteratorAsync<A> ma, IteratorAsync<A> mb)
var a = ma.Clone();
var b = mb.Clone();

// TODO: Replace this with a WaitAny tasks for each side, replacing the
// task as each one yields.

while (!await a.IsEmpty && !await b.IsEmpty)
{
yield return await a.Head;
Expand Down
18 changes: 18 additions & 0 deletions LanguageExt.Core/Traits/Cofunctor/Cofunctor.Extensions.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,18 @@
using System;

namespace LanguageExt.Traits;

public static class CofunctorExtensions
{
/// <summary>
/// The class of contravariant functors.
/// Whereas one can think of a `Functor` as containing or producing values, a contravariant functor is a functor that
/// can be thought of as consuming values.
///
/// Contravariant functors are referred to colloquially as Cofunctor, even though the dual of a `Functor` is just
/// a `Functor`.
/// </summary>
public static K<F, B> Contramap<F, A, B>(this K<F, B> fb, Func<A, B> f)
where F : Cofunctor<F> =>
F.Contramap(fb, f);
}
18 changes: 18 additions & 0 deletions LanguageExt.Core/Traits/Cofunctor/Cofunctor.Module.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,18 @@
using System;

namespace LanguageExt.Traits;

public static class Cofunctor
{
/// <summary>
/// The class of contravariant functors.
/// Whereas one can think of a `Functor` as containing or producing values, a contravariant functor is a functor that
/// can be thought of as consuming values.
///
/// Contravariant functors are referred to colloquially as Cofunctor, even though the dual of a `Functor` is just
/// a `Functor`.
/// </summary>
public static K<F, B> contraMap<F, A, B>(K<F, B> fb, Func<A, B> f)
where F : Cofunctor<F> =>
F.Contramap(fb, f);
}
19 changes: 19 additions & 0 deletions LanguageExt.Core/Traits/Cofunctor/Cofunctor.Prelude.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,19 @@
using System;
using LanguageExt.Traits;

namespace LanguageExt;

public static partial class Prelude
{
/// <summary>
/// The class of contravariant functors.
/// Whereas one can think of a `Functor` as containing or producing values, a contravariant functor is a functor that
/// can be thought of as consuming values.
///
/// Contravariant functors are referred to colloquially as Cofunctor, even though the dual of a `Functor` is just
/// a `Functor`.
/// </summary>
public static K<F, B> contraMap<F, A, B>(K<F, B> fb, Func<A, B> f)
where F : Cofunctor<F> =>
F.Contramap(fb, f);
}
17 changes: 17 additions & 0 deletions LanguageExt.Core/Traits/Cofunctor/Cofunctor.Trait.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,17 @@
using System;

namespace LanguageExt.Traits;

/// <summary>
/// The class of contravariant functors.
/// Whereas one can think of a `Functor` as containing or producing values, a contravariant functor is a functor that
/// can be thought of as consuming values.
///
/// Contravariant functors are referred to colloquially as Cofunctor, even though the dual of a `Functor` is just
/// a `Functor`.
/// </summary>
/// <typeparam name="F">Self referring type</typeparam>
public interface Cofunctor<F>
{
public static abstract K<F, B> Contramap<A, B>(K<F, B> fb, Func<A, B> f);
}
49 changes: 49 additions & 0 deletions LanguageExt.Core/Traits/Decidable/Decidable.Module.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,49 @@
using System;
using static LanguageExt.Prelude;

namespace LanguageExt.Traits;

/// <summary>
/// A `Divisible` contravariant functor is the contravariant analogue of `Applicative`.
///
/// Continuing the intuition that 'Contravariant' functors (`Cofunctor`) consume input, a 'Divisible'
/// contravariant functor also has the ability to be composed "beside" another contravariant
/// functor.
/// </summary>
/// <typeparam name="F">Self referring type</typeparam>
public static class Decidable
{
/// <summary>
/// Acts as identity to 'Choose'.
/// </summary>
public static K<F, A> lose<F, A>(Func<A, Void> f)
where F : Decidable<F> =>
F.Lose(f);

/// <summary>
/// Acts as identity to 'Choose'.
/// </summary>
/// <remarks>
/// lost = lose(identity)
/// </remarks>
public static K<F, Void> lost<F>()
where F : Decidable<F> =>
lose<F, Void>(identity);

/// <summary>
/// Fan out the input
/// </summary>
public static K<F, A> route<F, A, B, C>(Func<A, Either<B, C>> f, K<F, B> fb, K<F, C> fc)
where F : Decidable<F> =>
F.Route(f, fb, fc);

/// <summary>
/// Fan out the input
/// </summary>
/// <remarks>
/// route(fb, fc) = route(id, fb, fc)
/// </remarks>
public static K<F, Either<A, B>> route<F, A, B>(K<F, A> fa, K<F, B> fb)
where F : Decidable<F> =>
route<F, Either<A, B>, A, B>(identity, fa, fb);
}
16 changes: 16 additions & 0 deletions LanguageExt.Core/Traits/Decidable/Decidable.Prelude.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,16 @@
using System;
using LanguageExt.Traits;

namespace LanguageExt;

/// <summary>
/// A `Divisible` contravariant functor is the contravariant analogue of `Applicative`.
///
/// Continuing the intuition that 'Contravariant' functors (`Cofunctor`) consume input, a 'Divisible'
/// contravariant functor also has the ability to be composed "beside" another contravariant
/// functor.
/// </summary>
/// <typeparam name="F">Self referring type</typeparam>
public static partial class Prelude
{
}
Loading

0 comments on commit e709ccb

Please sign in to comment.