Skip to content

Commit 8eb1a6a

Browse files
committed
TEMP: Add stream migration support
1 parent d43a262 commit 8eb1a6a

File tree

1 file changed

+17
-0
lines changed

1 file changed

+17
-0
lines changed

src/stream.jl

Lines changed: 17 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -138,6 +138,20 @@ end
138138
remove_waiters!(stream::Stream, waiter::Integer) =
139139
remove_waiters!(stream::Stream, Int[waiter])
140140

141+
function migrate_stream!(stream::Stream, w::Integer=myid())
142+
# Take lock to prevent any further modifications
143+
# N.B. Serialization automatically unlocks
144+
remotecall_wait(stream.ref.handle.owner, stream.ref.handle) do ref
145+
lock((MemPool.poolget(ref)::StreamStore).lock)
146+
end
147+
148+
# Perform migration of the StreamStore
149+
# MemPool will block access to the new ref until the migration completes
150+
if stream.ref.handle.owner != w
151+
MemPool.migrate!(stream.ref.handle, w)
152+
end
153+
end
154+
141155
struct NullStream end
142156
Base.put!(ns::NullStream, x) = nothing
143157
Base.take!(ns::NullStream) = throw(ConcurrencyViolationError("Cannot `take!` from a `NullStream`"))
@@ -245,6 +259,9 @@ function (sf::StreamingFunction)(args...; kwargs...)
245259
end
246260
end
247261
end
262+
if sf.stream isa Stream
263+
migrate_stream!(sf.stream)
264+
end
248265
try
249266
kwarg_names = map(name->Val{name}(), map(first, (kwargs...,)))
250267
kwarg_values = map(last, (kwargs...,))

0 commit comments

Comments
 (0)