Skip to content

Commit

Permalink
Rename in terms of "use"
Browse files Browse the repository at this point in the history
  • Loading branch information
nickrobinson251 committed Oct 10, 2023
1 parent e714c70 commit 34f8f1b
Show file tree
Hide file tree
Showing 2 changed files with 80 additions and 78 deletions.
46 changes: 24 additions & 22 deletions src/pools.jl
Original file line number Diff line number Diff line change
Expand Up @@ -8,21 +8,23 @@ import Base: acquire, release
Pool{K, T}(max::Int=4096)
A threadsafe object for managing a pool of objects of type `T`, optionally keyed by objects
of type `K`. Objects can be requested by calling `acquire(f, pool, [key])`, where `f` is a
of type `K`.
Objects can be requested by calling `acquire(f, pool, [key])`, where `f` is a
function that returns a new object of type `T`.
The `key` argument is optional and can be used to lookup objects that match a certain criteria
(a Dict is used internally, so matching is `isequal`).
(a `Dict` is used internally, so matching is `isequal`).
The `max` argument will limit the number of objects
that can be acquired at any given time. If the limit has been reached, `acquire` will
block until an object is returned to the pool via `release`.
The `max` argument will limit the number of objects that can be in use at any given time.
If the max usage has been reached, `acquire` will block until an object is released
via `release`.
By default, `release(pool, obj)` will return the object to the pool for reuse.
`release(pool)` will return the "permit" to the pool while not returning
any object for reuse.
- `release(pool, obj)` will return the object to the pool for reuse.
- `release(pool)` will decrement the number in use but not return any object for reuse.
- `drain!` can be used to remove objects that have been returned to the pool for reuse;
it does *not* release any objects that are in use.
`drain!` can be used to remove any cached objects for reuse, but it does *not* release
any active acquires.
See also `acquire`, `release`, `Pools.max_usage`, `Pools.in_use`, `Pools.in_pool`, `drain!`.
"""
mutable struct Pool{K, T}
lock::Threads.Condition
Expand Down Expand Up @@ -58,27 +60,27 @@ Base.valtype(::Type{<:Pool{<:Any, T}}) where {T} = T
Base.valtype(p::Pool) = valtype(typeof(p))

"""
Pools.max(pool::Pool) -> Int
Pools.max_usage(pool::Pool) -> Int
Return the maximum number of objects permitted to be in use at the same time.
See `Pools.permits(pool)` for the number of objects currently in use.
See `Pools.in_use(pool)` for the number of objects currently in use.
"""
max(pool::Pool) = Base.@lock pool.lock pool.max
max_usage(pool::Pool) = Base.@lock pool.lock pool.max

"""
Pools.permits(pool::Pool) -> Int
Pools.in_use(pool::Pool) -> Int
Return the number of objects currently in use. Less than or equal to `Pools.max(pool)`.
Return the number of objects currently in use. Less than or equal to `Pools.max_usage(pool)`.
"""
permits(pool::Pool) = Base.@lock pool.lock pool.cur
in_use(pool::Pool) = Base.@lock pool.lock pool.cur

"""
Pools.depth(pool::Pool) -> Int
Pools.in_pool(pool::Pool) -> Int
Return the number of objects in the pool available for reuse.
"""
depth(pool::Pool) = Base.@lock pool.lock mapreduce(length, +, values(pool.keyedvalues); init=0)
depth(pool::Pool{Nothing}) = Base.@lock pool.lock length(pool.values)
in_pool(pool::Pool) = Base.@lock pool.lock mapreduce(length, +, values(pool.keyedvalues); init=0)
in_pool(pool::Pool{Nothing}) = Base.@lock pool.lock length(pool.values)

"""
drain!(pool)
Expand All @@ -101,7 +103,7 @@ end
TRUE(x) = true

@noinline keyerror(key, K) = throw(ArgumentError("invalid key `$key` provided for pool key type $K"))
@noinline releaseerror() = throw(ArgumentError("cannot release permit when pool is empty"))
@noinline releaseerror() = throw(ArgumentError("cannot release when no objects are in use"))

# NOTE: assumes you have the lock!
function releasepermit(pool::Pool)
Expand Down Expand Up @@ -155,10 +157,10 @@ end
release(pool::Pool{K, T}, obj::T)
release(pool::Pool{K, T})
Return an object to a `pool`, optionally keyed by the provided `key`.
Release an object from usage by a `pool`, optionally keyed by the provided `key`.
If `obj` is provided, it will be returned to the pool for reuse.
Otherwise, if `nothing` is returned, or `release(pool)` is called,
just the "permit" will be returned to the pool.
the usage count will be decremented without an object being returned to the pool for reuse.
"""
function Base.release(pool::Pool{K, T}, key, obj::Union{T, Nothing}=nothing) where {K, T}
key isa K || keyerror(key, K)
Expand Down
112 changes: 56 additions & 56 deletions test/pools.jl
Original file line number Diff line number Diff line change
Expand Up @@ -7,43 +7,43 @@ using ConcurrentUtilities.Pools, Test
@test keytype(pool) === Nothing
@test valtype(pool) === Int

@test Pools.max(pool) == 3
@test Pools.permits(pool) == 0
@test Pools.depth(pool) == 0
@test Pools.max_usage(pool) == 3
@test Pools.in_use(pool) == 0
@test Pools.in_pool(pool) == 0

# acquire an object from the pool
x1 = acquire(() -> 1, pool)
# no existing objects in the pool, so our function was called to create a new one
@test x1 == 1
@test Pools.max(pool) == 3
@test Pools.permits(pool) == 1
@test Pools.depth(pool) == 0
@test Pools.max_usage(pool) == 3
@test Pools.in_use(pool) == 1
@test Pools.in_pool(pool) == 0

# release back to the pool for reuse
release(pool, x1)
@test Pools.permits(pool) == 0
@test Pools.depth(pool) == 1
@test Pools.in_use(pool) == 0
@test Pools.in_pool(pool) == 1

# acquire another object from the pool
x1 = acquire(() -> 2, pool)
# this time, the pool had an existing object, so our function was not called
@test x1 == 1
@test Pools.permits(pool) == 1
@test Pools.depth(pool) == 0
@test Pools.in_use(pool) == 1
@test Pools.in_pool(pool) == 0

# but now there are no objects to reuse again, so the next acquire will call our function
x2 = acquire(() -> 2, pool)
@test x2 == 2
@test Pools.permits(pool) == 2
@test Pools.depth(pool) == 0
@test Pools.in_use(pool) == 2
@test Pools.in_pool(pool) == 0

x3 = acquire(() -> 3, pool)
@test x3 == 3
@test Pools.permits(pool) == 3
@test Pools.depth(pool) == 0
@test Pools.in_use(pool) == 3
@test Pools.in_pool(pool) == 0

# the pool is now at `Pools.max`, so the next acquire will block until an object is released
@test Pools.permits(pool) == Pools.max(pool)
# the pool is now at `Pools.max_usage`, so the next acquire will block until an object is released
@test Pools.in_use(pool) == Pools.max_usage(pool)
tsk = @async acquire(() -> 4, pool; forcenew=true)
yield()
@test !istaskdone(tsk)
Expand All @@ -53,8 +53,8 @@ using ConcurrentUtilities.Pools, Test
x1 = fetch(tsk)
# even though we released 1 for reuse, we passed forcenew, so our function was called to create new
@test x1 == 4
@test Pools.permits(pool) == 3
@test Pools.depth(pool) == 1
@test Pools.in_use(pool) == 3
@test Pools.in_pool(pool) == 1

# error to try and provide a key to a non-keyed pool
@test_throws ArgumentError acquire(() -> 1, pool, 1)
Expand All @@ -63,54 +63,54 @@ using ConcurrentUtilities.Pools, Test
release(pool, x1)
release(pool, x2)
release(pool, x3)
@test Pools.permits(pool) == 0
@test Pools.depth(pool) == 4
@test Pools.in_use(pool) == 0
@test Pools.in_pool(pool) == 4

# acquire an object, but checking isvalid
x1 = acquire(() -> 5, pool; isvalid=x -> x == 1)
@test x1 == 1
@test Pools.permits(pool) == 1
@test Pools.in_use(pool) == 1

# no valid objects, so our function was called to create a new one
x2 = acquire(() -> 6, pool; isvalid=x -> x == 1)
@test x2 == 6
@test Pools.permits(pool) == 2
@test Pools.in_use(pool) == 2

# we have one permit left, we now throw while creating a new object
# and we want to test that the permit isn't permanently lost for the pool
@test_throws ErrorException acquire(() -> error("oops"), pool; forcenew=true)
@test Pools.permits(pool) == 2
@test Pools.in_use(pool) == 2

# we can still acquire a new object
x3 = acquire(() -> 7, pool; forcenew=true)
@test x3 == 7
@test Pools.permits(pool) == 3
@test Pools.in_use(pool) == 3

# release objects back to the pool
drain!(pool)
release(pool, x1)
release(pool, x2)
release(pool, x3)
@test Pools.permits(pool) == 0
@test Pools.depth(pool) == 3
@test Pools.in_use(pool) == 0
@test Pools.in_pool(pool) == 3

# try to do an invalid release
@test_throws ArgumentError release(pool, 10)

# test that the invalid release didn't push the object to our pool for reuse
x1 = acquire(() -> 8, pool)
@test x1 == 7
@test Pools.permits(pool) == 1
@test Pools.depth(pool) == 2
@test Pools.in_use(pool) == 1
@test Pools.in_pool(pool) == 2
# calling drain! removes all objects for reuse
drain!(pool)
@test Pools.permits(pool) == 1
@test Pools.depth(pool) == 0
@test Pools.in_use(pool) == 1
@test Pools.in_pool(pool) == 0

x2 = acquire(() -> 9, pool)
@test x2 == 9
@test Pools.permits(pool) == 2
@test Pools.depth(pool) == 0
@test Pools.in_use(pool) == 2
@test Pools.in_pool(pool) == 0
end

@testset "keyed pool" begin
Expand All @@ -119,44 +119,44 @@ using ConcurrentUtilities.Pools, Test
@test keytype(pool) === String
@test valtype(pool) === Int

@test Pools.max(pool) == 3
@test Pools.permits(pool) == 0
@test Pools.depth(pool) == 0
@test Pools.max_usage(pool) == 3
@test Pools.in_use(pool) == 0
@test Pools.in_pool(pool) == 0

# acquire an object from the pool
x1 = acquire(() -> 1, pool, "a")
# no existing objects in the pool, so our function was called to create a new one
@test x1 == 1
@test Pools.permits(pool) == 1
@test Pools.depth(pool) == 0
@test Pools.in_use(pool) == 1
@test Pools.in_pool(pool) == 0

# release back to the pool for reuse
release(pool, "a", x1)
@test Pools.permits(pool) == 0
@test Pools.depth(pool) == 1
@test Pools.in_use(pool) == 0
@test Pools.in_pool(pool) == 1

# test for a different key
x2 = acquire(() -> 2, pool, "b")
# there's an existing object, but for a different key, so we don't reuse
@test x2 == 2
@test Pools.permits(pool) == 1
@test Pools.depth(pool) == 1
@test Pools.in_use(pool) == 1
@test Pools.in_pool(pool) == 1

# acquire another object from the pool
x1 = acquire(() -> 2, pool, "a")
# this time, the pool had an existing object, so our function was not called
@test x1 == 1
@test Pools.permits(pool) == 2
@test Pools.depth(pool) == 0
@test Pools.in_use(pool) == 2
@test Pools.in_pool(pool) == 0

x3 = acquire(() -> 3, pool, "a")
@test x3 == 3
@test Pools.permits(pool) == 3
@test Pools.depth(pool) == 0
@test Pools.in_use(pool) == 3
@test Pools.in_pool(pool) == 0

# the pool is now at capacity, so the next acquire will block until an object is released
# even though we've acquired using different keys, the capacity is shared across the pool
@test Pools.permits(pool) == Pools.max(pool)
@test Pools.in_use(pool) == Pools.max_usage(pool)
tsk = @async acquire(() -> 4, pool, "c"; forcenew=true)
yield()
@test !istaskdone(tsk)
Expand All @@ -166,27 +166,27 @@ using ConcurrentUtilities.Pools, Test
x1 = fetch(tsk)
# even though we released 1 for reuse, we passed forcenew, so our function was called to create new
@test x1 == 4
@test Pools.permits(pool) == 3
@test Pools.depth(pool) == 1
@test Pools.in_use(pool) == 3
@test Pools.in_pool(pool) == 1

# error to try and provide an invalid key to a keyed pool
@test_throws ArgumentError acquire(() -> 1, pool, 1)
@test Pools.permits(pool) == 3
@test Pools.depth(pool) == 1
@test Pools.in_use(pool) == 3
@test Pools.in_pool(pool) == 1

# error to *not* provide a key to a keyed pool
@test_throws ArgumentError acquire(() -> 1, pool)
@test Pools.permits(pool) == 3
@test Pools.depth(pool) == 1
@test Pools.in_use(pool) == 3
@test Pools.in_pool(pool) == 1

# error to *not* provide a key when releasing to a keyed pool
@test_throws ArgumentError release(pool)
@test Pools.permits(pool) == 3
@test Pools.depth(pool) == 1
@test Pools.in_use(pool) == 3
@test Pools.in_pool(pool) == 1

# error to release an invalid key back to the pool
@test_throws KeyError release(pool, "z", 1)
@test_broken Pools.permits(pool) == 3
@test Pools.depth(pool) == 1
@test_broken Pools.in_use(pool) == 3
@test Pools.in_pool(pool) == 1
end
end

0 comments on commit 34f8f1b

Please sign in to comment.