Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Introduce self-deregistering closures for on / onany #48

Merged
merged 12 commits into from
Oct 5, 2020
21 changes: 19 additions & 2 deletions docs/src/index.md
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@ using Observables

observable = Observable(0)

h = on(observable) do val
obs_func = on(observable) do val
println("Got an update: ", val)
end

Expand All @@ -22,7 +22,24 @@ observable[]
To remove a handler use `off` with the return value of `on`:

```@repl manual
off(observable, h)
off(obs_func)
```

### Weak Connections

If you use `on` with `weak = true`, the connection will be removed when
the return value of `on` is garbage collected.
This can make it easier to clean up connections that are not used anymore.


```julia
obs_func = on(observable, weak = true) do val
println("Got an update: ", val)
end
# as long as obs_func is reachable the connection will stay

obs_func = nothing
# now garbage collection can at any time clear the connection
```

### Async operations
Expand Down
101 changes: 93 additions & 8 deletions src/Observables.jl
Original file line number Diff line number Diff line change
Expand Up @@ -84,24 +84,80 @@ end

Base.show(io::IO, ::MIME"application/prs.juno.inline", x::Observable) = x


"""
mutable struct ObserverFunction <: Function

Fields:

f::Function
observable::AbstractObservable
weak::Bool

`ObserverFunction` is intended as the return value for `on` because
we can remove the created closure from `obsfunc.observable`'s listener vectors when
ObserverFunction goes out of scope - as long as the `weak` flag is set.
If the `weak` flag is not set, nothing happens
when the ObserverFunction goes out of scope and it can be safely ignored.
It can still be useful because it is easier to call `off(obsfunc)` instead of `off(observable, f)`
to release the connection later.
"""
mutable struct ObserverFunction <: Function
f
observable::AbstractObservable
weak::Bool

function ObserverFunction(f, observable::AbstractObservable, weak)
obsfunc = new(f, observable, weak)

# If the weak flag is set, deregister the function f from the observable
# storing it in its listeners once the ObserverFunction is garbage collected.
# This should free all resources associated with f unless there
# is another reference to it somewhere else.
if obsfunc.weak
finalizer(obsfunc) do obsfunc
off(obsfunc)
end
end

obsfunc
end
end


"""
on(f, observable::AbstractObservable)
on(f, observable::AbstractObservable; weak = false)

Adds function `f` as listener to `observable`. Whenever `observable`'s value
is set via `observable[] = val` `f` is called with `val`.

Returns an `ObserverFunction` that wraps `f` and `observable` and allows to
disconnect easily by calling `off(observerfunction)` instead of `off(f, observable)`.

If `weak = true` is set, the new connection will be removed as soon as the returned `ObserverFunction`
is not referenced anywhere and is garbage collected. This is useful if some parent object
makes connections to outside observables and stores the resulting `ObserverFunction` instances.
Then, once that parent object is garbage collected, the weak
observable connections are removed automatically.
"""
function on(f, observable::AbstractObservable)
function on(f, observable::AbstractObservable; weak = false)
push!(listeners(observable), f)
for g in addhandler_callbacks
g(f, observable)
end
return f
# Return a ObserverFunction so that the caller is responsible
# to keep a reference to it around as long as they want the connection to
# persist. If the ObserverFunction is garbage collected, f will be released from
# observable's listeners as well.
return ObserverFunction(f, observable, weak)
end

"""
off(observable::AbstractObservable, f)

Removes `f` from listeners of `observable`.

Returns `true` if `f` could be removed, otherwise `false`.
"""
function off(observable::AbstractObservable, f)
callbacks = listeners(observable)
Expand All @@ -111,10 +167,29 @@ function off(observable::AbstractObservable, f)
for g in removehandler_callbacks
g(observable, f)
end
return
return true
end
end
throw(KeyError(f))
return false
end


function off(observable::AbstractObservable, obsfunc::ObserverFunction)
f = obsfunc.f
# remove the function inside obsfunc as usual
off(observable, f)
end

"""
off(obsfunc::ObserverFunction)

Remove the listener function `obsfunc.f` from the listeners of `obsfunc.observable`.
Once `obsfunc` goes out of scope, this should allow `obsfunc.f` and all the values
it might have closed over to be garbage collected (unless there
are other references to it).
"""
function off(obsfunc::ObserverFunction)
off(obsfunc.observable, obsfunc)
end

"""
Expand Down Expand Up @@ -220,16 +295,26 @@ end
"""
onany(f, args...)

Calls `f` on updates to any oservable refs in `args`.
Calls `f` on updates to any observable refs in `args`.
`args` may contain any number of `Observable` objects.
`f` will be passed the values contained in the refs as the respective argument.
All other objects in `args` are passed as-is.
"""
function onany(f, args...)
function onany(f, args...; weak = false)
callback = OnUpdate(f, args)

# store all returned ObserverFunctions
obsfuncs = ObserverFunction[]
for observable in args
(observable isa AbstractObservable) && on(callback, observable)
if observable isa AbstractObservable
obsfunc = on(callback, observable, weak = weak)
push!(obsfuncs, obsfunc)
end
end

# same principle as with `on`, this collection needs to be
# stored by the caller or the connections made will be cut
obsfuncs
end

struct MapUpdater{F, T} <: InternalFunction
Expand Down
14 changes: 10 additions & 4 deletions src/observablepair.jl
Original file line number Diff line number Diff line change
Expand Up @@ -6,14 +6,20 @@ struct ObservablePair{S, T} <: AbstractObservable{T}
excluded::Vector{Function}
function ObservablePair(first::AbstractObservable{S}, second::AbstractObservable{T}; f = identity, g = identity) where {S, T}
excluded = Function[]
first2second = on(first) do val

# the two observables should trigger each other, but only in one direction
# as otherwise there will be an infinite loop of updates

first2second_observerfunc = on(first) do val
setindex!(second, f(val), notify = !in(excluded))
end
push!(excluded, first2second)
second2first = on(second) do val
push!(excluded, first2second_observerfunc.f) # in notify, the wrapped function is compared

second2first_observerfunc = on(second) do val
setindex!(first, g(val), notify = !in(excluded))
end
push!(excluded, second2first)
push!(excluded, second2first_observerfunc.f)

new{S, T}(first, second, f, g, excluded)
end
end
Expand Down
70 changes: 68 additions & 2 deletions test/runtests.jl
Original file line number Diff line number Diff line change
Expand Up @@ -13,9 +13,9 @@ using Test
end
r[] = 2

off(r, f)
@test off(r, f)
@test !(f in r.listeners)
@test_throws KeyError off(r, f)
@test off(r, f) == false
r[] = 3 # shouldn't call test
end

Expand Down Expand Up @@ -44,6 +44,72 @@ end
@test r2[] == 4
end

@testset "disconnect observerfuncs" begin

x = Observable(1)
y = Observable(2)
z = Observable(3)

of1 = on(x) do x
println(x)
end

of2_3 = onany(y, z) do y, z
println(y, z)
end

off(of1)
off.(of2_3)
for obs in (x, y, z)
@test isempty(obs.listeners)
end
end

# this struct is just supposed to show that a value in memory was released
mutable struct ToFinalize
val

function ToFinalize(val, finalized_flag::Ref)
tf = new(val)
finalizer(tf) do tf
finalized_flag[] = true
end
end
end

@testset "weak connections" begin
a = Observable(1)

finalized_flag = Ref(false)

function create_a_dangling_listener()
t = ToFinalize(1, finalized_flag)

obsfunc = on(a; weak = true) do a
t.val += 1
end

a[] = 2
@test t.val == 2

# obsfunc falls out of scope here and should deregister the closure
# when it gets garbage collected, which should in turn free t
nothing
end

GC.enable(false)
create_a_dangling_listener()
@test length(Observables.listeners(a)) == 1
@test finalized_flag[] == false

GC.enable(true)
GC.gc()
# somehow this needs a double sweep, maybe first obsfunc, then ToFinalize?
GC.gc()
@test isempty(Observables.listeners(a))
@test finalized_flag[] == true
end
jkrumbiegel marked this conversation as resolved.
Show resolved Hide resolved

@testset "macros" begin
a = Observable(2)
b = Observable(3)
Expand Down