Skip to content

Add implementation of TaskSeq.except/exceptOfSeq #83

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 2 commits into from
Nov 9, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 3 additions & 1 deletion README.md
Original file line number Diff line number Diff line change
Expand Up @@ -88,7 +88,8 @@ The following is the progress report:
| | `distinctBy` | `dictinctBy` | `distinctByAsync` | |
| ✅ [#2][] | `empty` | `empty` | | |
| ✅ [#23][] | `exactlyOne` | `exactlyOne` | | |
| | `except` | `except` | | |
| ✅ [#83][] | `except` | `except` | | |
| ✅ [#83][] | | `exceptOfSeq` | | |
| ✅ [#70][] | `exists` | `exists` | `existsAsync` | |
| | `exists2` | `exists2` | | |
| ✅ [#23][] | `filter` | `filter` | `filterAsync` | |
Expand Down Expand Up @@ -611,4 +612,5 @@ module TaskSeq =
[#76]: https://github.com/fsprojects/FSharp.Control.TaskSeq/pull/76
[#81]: https://github.com/fsprojects/FSharp.Control.TaskSeq/pull/81
[#82]: https://github.com/fsprojects/FSharp.Control.TaskSeq/pull/82
[#83]: https://github.com/fsprojects/FSharp.Control.TaskSeq/pull/83

Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@
<Compile Include="TaskSeq.Delay.Tests.fs" />
<Compile Include="TaskSeq.Empty.Tests.fs" />
<Compile Include="TaskSeq.ExactlyOne.Tests.fs" />
<Compile Include="TaskSeq.Except.Tests.fs" />
<Compile Include="TaskSeq.Exists.Tests.fs" />
<Compile Include="TaskSeq.Filter.Tests.fs" />
<Compile Include="TaskSeq.FindIndex.Tests.fs" />
Expand Down
141 changes: 141 additions & 0 deletions src/FSharp.Control.TaskSeq.Test/TaskSeq.Except.Tests.fs
Original file line number Diff line number Diff line change
@@ -0,0 +1,141 @@
module TaskSeq.Tests.Except

open System
open Xunit
open FsUnit.Xunit
open FsToolkit.ErrorHandling

open FSharp.Control

//
// TaskSeq.except
// TaskSeq.exceptOfSeq
//


module EmptySeq =
[<Theory; ClassData(typeof<TestEmptyVariants>)>]
let ``TaskSeq-except`` variant =
Gen.getEmptyVariant variant
|> TaskSeq.except (Gen.getEmptyVariant variant)
|> verifyEmpty

[<Theory; ClassData(typeof<TestEmptyVariants>)>]
let ``TaskSeq-exceptOfSeq`` variant =
Gen.getEmptyVariant variant
|> TaskSeq.exceptOfSeq Seq.empty
|> verifyEmpty

[<Theory; ClassData(typeof<TestEmptyVariants>)>]
let ``TaskSeq-except v2`` variant =
Gen.getEmptyVariant variant
|> TaskSeq.except TaskSeq.empty
|> verifyEmpty

[<Theory; ClassData(typeof<TestEmptyVariants>)>]
let ``TaskSeq-except v3`` variant =
TaskSeq.empty
|> TaskSeq.except (Gen.getEmptyVariant variant)
|> verifyEmpty

[<Theory; ClassData(typeof<TestEmptyVariants>)>]
let ``TaskSeq-except no side effect in exclude seq if source seq is empty`` variant =
let mutable i = 0

let exclude = taskSeq {
i <- i + 1
yield 12
}

TaskSeq.empty
|> TaskSeq.except exclude
|> verifyEmpty
|> Task.map (fun () -> i |> should equal 0) // exclude seq is only enumerated after first item in source

module Immutable =
[<Theory; ClassData(typeof<TestImmTaskSeq>)>]
let ``TaskSeq-except removes duplicates`` variant =
TaskSeq.ofList [ 1; 1; 2; 3; 4; 12; 12; 12; 13; 13; 13; 13; 13; 99 ]
|> TaskSeq.except (Gen.getSeqImmutable variant)
|> TaskSeq.toArrayAsync
|> Task.map (should equal [| 12; 13; 99 |])

[<Fact>]
let ``TaskSeq-except removes duplicates with empty itemsToExcept`` () =
TaskSeq.ofList [ 1; 1; 2; 3; 4; 12; 12; 12; 13; 13; 13; 13; 13; 99 ]
|> TaskSeq.except TaskSeq.empty
|> TaskSeq.toArrayAsync
|> Task.map (should equal [| 1; 2; 3; 4; 12; 13; 99 |])

[<Theory; ClassData(typeof<TestImmTaskSeq>)>]
let ``TaskSeq-except removes everything`` variant =
Gen.getSeqImmutable variant
|> TaskSeq.except (Gen.getSeqImmutable variant)
|> verifyEmpty

[<Theory; ClassData(typeof<TestImmTaskSeq>)>]
let ``TaskSeq-except removes everything with duplicates`` variant =
taskSeq {
yield! Gen.getSeqImmutable variant
yield! Gen.getSeqImmutable variant
yield! Gen.getSeqImmutable variant
yield! Gen.getSeqImmutable variant
}
|> TaskSeq.except (Gen.getSeqImmutable variant)
|> verifyEmpty

[<Theory; ClassData(typeof<TestImmTaskSeq>)>]
let ``TaskSeq-exceptOfSeq removes duplicates`` variant =
TaskSeq.ofList [ 1; 1; 2; 3; 4; 12; 12; 12; 13; 13; 13; 13; 13; 99 ]
|> TaskSeq.exceptOfSeq [ 1..10 ]
|> TaskSeq.toArrayAsync
|> Task.map (should equal [| 12; 13; 99 |])

[<Fact>]
let ``TaskSeq-exceptOfSeq removes duplicates with empty itemsToExcept`` () =
TaskSeq.ofList [ 1; 1; 2; 3; 4; 12; 12; 12; 13; 13; 13; 13; 13; 99 ]
|> TaskSeq.exceptOfSeq Seq.empty
|> TaskSeq.toArrayAsync
|> Task.map (should equal [| 1; 2; 3; 4; 12; 13; 99 |])

[<Theory; ClassData(typeof<TestImmTaskSeq>)>]
let ``TaskSeq-exceptOfSeq removes everything`` variant =
Gen.getSeqImmutable variant
|> TaskSeq.exceptOfSeq [ 1..10 ]
|> verifyEmpty

[<Theory; ClassData(typeof<TestImmTaskSeq>)>]
let ``TaskSeq-exceptOfSeq removes everything with duplicates`` variant =
taskSeq {
yield! Gen.getSeqImmutable variant
yield! Gen.getSeqImmutable variant
yield! Gen.getSeqImmutable variant
yield! Gen.getSeqImmutable variant
}
|> TaskSeq.exceptOfSeq [ 1..10 ]
|> verifyEmpty

module SideEffects =
[<Theory; ClassData(typeof<TestSideEffectTaskSeq>)>]
let ``TaskSeq-except removes duplicates`` variant =
TaskSeq.ofList [ 1; 1; 2; 3; 4; 12; 12; 12; 13; 13; 13; 13; 13; 99 ]
|> TaskSeq.except (Gen.getSeqWithSideEffect variant)
|> TaskSeq.toArrayAsync
|> Task.map (should equal [| 12; 13; 99 |])

[<Theory; ClassData(typeof<TestSideEffectTaskSeq>)>]
let ``TaskSeq-except removes everything`` variant =
Gen.getSeqWithSideEffect variant
|> TaskSeq.except (Gen.getSeqWithSideEffect variant)
|> verifyEmpty

[<Theory; ClassData(typeof<TestSideEffectTaskSeq>)>]
let ``TaskSeq-except removes everything with duplicates`` variant =
taskSeq {
yield! Gen.getSeqWithSideEffect variant
yield! Gen.getSeqWithSideEffect variant
yield! Gen.getSeqWithSideEffect variant
yield! Gen.getSeqWithSideEffect variant
}
|> TaskSeq.except (Gen.getSeqWithSideEffect variant)
|> verifyEmpty
25 changes: 2 additions & 23 deletions src/FSharp.Control.TaskSeq/TaskSeq.fs
Original file line number Diff line number Diff line change
Expand Up @@ -56,29 +56,6 @@ module TaskSeq =
e.DisposeAsync().AsTask().Wait()
}

// FIXME: incomplete and incorrect code!!! TODO: still needed?
let toSeqOfTasks (source: taskSeq<'T>) = seq {
let e = source.GetAsyncEnumerator(CancellationToken())

// TODO: check this!
try
let mutable go = false

while go do
yield task {
let! step = e.MoveNextAsync()
go <- step

if step then
return e.Current
else
return Unchecked.defaultof<_> // FIXME!
}

finally
e.DisposeAsync().AsTask().Wait()
}

let toArrayAsync source =
Internal.toResizeArrayAsync source
|> Task.map (fun a -> a.ToArray())
Expand Down Expand Up @@ -281,6 +258,8 @@ module TaskSeq =
let tryFindAsync predicate source = Internal.tryFind (PredicateAsync predicate) source
let tryFindIndex predicate source = Internal.tryFindIndex (Predicate predicate) source
let tryFindIndexAsync predicate source = Internal.tryFindIndex (PredicateAsync predicate) source
let except itemsToExclude source = Internal.except itemsToExclude source
let exceptOfSeq itemsToExclude source = Internal.exceptOfSeq itemsToExclude source

let exists predicate source =
Internal.tryFind (Predicate predicate) source
Expand Down
38 changes: 38 additions & 0 deletions src/FSharp.Control.TaskSeq/TaskSeq.fsi
Original file line number Diff line number Diff line change
Expand Up @@ -500,6 +500,44 @@ module TaskSeq =
/// <exception cref="T:ArgumentNullException">Thrown when the input sequence is null.</exception>
val existsAsync: predicate: ('T -> #Task<bool>) -> source: taskSeq<'T> -> Task<bool>

/// <summary>
/// Returns a new task sequence with the distinct elements of the second task sequence which do not appear in the
/// <paramref name="itemsToExclude" />, using generic hash and equality comparisons to compare values.
/// </summary>
///
/// <remarks>
/// Note that this function returns a task sequence that digests the whole of the first input task sequence as soon as
/// the result sequence first gets awaited or iterated. As a result this function should not be used with
/// large or infinite sequences in the first parameter. The function makes no assumption on the ordering of the first input
/// sequence.
/// </remarks>
///
/// <param name="itemsToExclude">A task sequence whose elements that also occur in the second sequence will cause those elements to be removed from the returned sequence.</param>
/// <param name="source">A sequence whose elements that are not also in first will be returned.</param>
/// <returns>A sequence that contains the set difference of the elements of two sequences.</returns>
///
/// <exception cref="T:ArgumentNullException">Thrown when either of the two input sequences is null.</exception>
val except<'T when 'T: equality> : itemsToExclude: taskSeq<'T> -> source: taskSeq<'T> -> taskSeq<'T>

/// <summary>
/// Returns a new task sequence with the distinct elements of the second task sequence which do not appear in the
/// <paramref name="itemsToExclude" />, using generic hash and equality comparisons to compare values.
/// </summary>
///
/// <remarks>
/// Note that this function returns a task sequence that digests the whole of the first input task sequence as soon as
/// the result sequence first gets awaited or iterated. As a result this function should not be used with
/// large or infinite sequences in the first parameter. The function makes no assumption on the ordering of the first input
/// sequence.
/// </remarks>
///
/// <param name="itemsToExclude">A task sequence whose elements that also occur in the second sequence will cause those elements to be removed from the returned sequence.</param>
/// <param name="source">A sequence whose elements that are not also in first will be returned.</param>
/// <returns>A sequence that contains the set difference of the elements of two sequences.</returns>
///
/// <exception cref="T:ArgumentNullException">Thrown when either of the two input sequences is null.</exception>
val exceptOfSeq<'T when 'T: equality> : itemsToExclude: seq<'T> -> source: taskSeq<'T> -> taskSeq<'T>

/// <summary>
/// Zips two task sequences, returning a taskSeq of the tuples of each sequence, in order. May raise ArgumentException
/// if the sequences are or unequal length.
Expand Down
100 changes: 100 additions & 0 deletions src/FSharp.Control.TaskSeq/TaskSeqInternal.fs
Original file line number Diff line number Diff line change
Expand Up @@ -517,3 +517,103 @@ module internal TaskSeqInternal =
| true -> yield item
| false -> ()
}
// Consider turning using an F# version of this instead?
// https://github.com/i3arnon/ConcurrentHashSet
type ConcurrentHashSet<'T when 'T: equality>(ct) =
let _rwLock = new ReaderWriterLockSlim()
let hashSet = HashSet<'T>(Array.empty, HashIdentity.Structural)

member _.Add item =
_rwLock.EnterWriteLock()

try
hashSet.Add item
finally
_rwLock.ExitWriteLock()

member _.AddMany items =
_rwLock.EnterWriteLock()

try
for item in items do
hashSet.Add item |> ignore

finally
_rwLock.ExitWriteLock()

member _.AddManyAsync(source: taskSeq<'T>) = task {
use e = source.GetAsyncEnumerator(ct)
let mutable go = true
let! step = e.MoveNextAsync()
go <- step

while go do
// NOTE: r/w lock cannot cross thread boundaries. Should we use SemaphoreSlim instead?
// or alternatively, something like this: https://github.com/StephenCleary/AsyncEx/blob/8a73d0467d40ca41f9f9cf827c7a35702243abb8/src/Nito.AsyncEx.Coordination/AsyncReaderWriterLock.cs#L16
// not sure how they compare.

_rwLock.EnterWriteLock()

try
hashSet.Add e.Current |> ignore
finally
_rwLock.ExitWriteLock()

let! step = e.MoveNextAsync()
go <- step
}

interface IAsyncDisposable with
override _.DisposeAsync() =
if not (isNull _rwLock) then
_rwLock.Dispose()

ValueTask.CompletedTask

let except itemsToExclude (source: taskSeq<_>) = taskSeq {
use e = source.GetAsyncEnumerator(CancellationToken())
let mutable go = true
let! step = e.MoveNextAsync()
go <- step

if step then
// only create hashset by the time we actually start iterating
use hashSet = new ConcurrentHashSet<_>(CancellationToken())
do! hashSet.AddManyAsync itemsToExclude

while go do
let current = e.Current

// if true, it was added, and therefore unique, so we return it
// if false, it existed, and therefore a duplicate, and we skip
if hashSet.Add current then
yield current

let! step = e.MoveNextAsync()
go <- step

}

let exceptOfSeq itemsToExclude (source: taskSeq<_>) = taskSeq {
use e = source.GetAsyncEnumerator(CancellationToken())
let mutable go = true
let! step = e.MoveNextAsync()
go <- step

if step then
// only create hashset by the time we actually start iterating
use hashSet = new ConcurrentHashSet<_>(CancellationToken())
do hashSet.AddMany itemsToExclude

while go do
let current = e.Current

// if true, it was added, and therefore unique, so we return it
// if false, it existed, and therefore a duplicate, and we skip
if hashSet.Add current then
yield current

let! step = e.MoveNextAsync()
go <- step

}