@@ -256,14 +256,27 @@ function del_clients(pairs::Vector)
256
256
end
257
257
end
258
258
259
- const any_gc_flag = Condition ()
259
+ # The task below is coalescing the `flush_gc_msgs` call
260
+ # across multiple producers, see `send_del_client`,
261
+ # and `send_add_client`.
262
+ # XXX : Is this worth the additional complexity?
263
+ # `flush_gc_msgs` has to iterate over all connected workers.
264
+ const any_gc_flag = Threads. Condition ()
260
265
function start_gc_msgs_task ()
261
- errormonitor (@async while true
262
- wait (any_gc_flag)
263
- flush_gc_msgs ()
264
- end )
266
+ errormonitor (
267
+ Threads. @spawn begin
268
+ while true
269
+ lock (any_gc_flag) do
270
+ # this might miss events
271
+ wait (any_gc_flag)
272
+ end
273
+ flush_gc_msgs () # handles throws internally
274
+ end
275
+ end
276
+ )
265
277
end
266
278
279
+ # Function can be called within a finalizer
267
280
function send_del_client (rr)
268
281
if rr. where == myid ()
269
282
del_client (rr)
@@ -281,11 +294,27 @@ function send_del_client_no_lock(rr)
281
294
end
282
295
end
283
296
297
+ function publish_del_msg! (w:: Worker , msg)
298
+ lock (w. msg_lock) do
299
+ push! (w. del_msgs, msg)
300
+ @atomic w. gcflag = true
301
+ end
302
+ lock (any_gc_flag) do
303
+ notify (any_gc_flag)
304
+ end
305
+ end
306
+
284
307
function process_worker (rr)
285
308
w = worker_from_id (rr. where):: Worker
286
- push! (w. del_msgs, (remoteref_id (rr), myid ()))
287
- w. gcflag = true
288
- notify (any_gc_flag)
309
+ msg = (remoteref_id (rr), myid ())
310
+
311
+ # Needs to aquire a lock on the del_msg queue
312
+ T = Threads. @spawn begin
313
+ publish_del_msg! ($ w, $ msg)
314
+ end
315
+ Base. errormonitor (T)
316
+
317
+ return
289
318
end
290
319
291
320
function add_client (id, client)
@@ -310,9 +339,13 @@ function send_add_client(rr::AbstractRemoteRef, i)
310
339
# to the processor that owns the remote ref. it will add_client
311
340
# itself inside deserialize().
312
341
w = worker_from_id (rr. where)
313
- push! (w. add_msgs, (remoteref_id (rr), i))
314
- w. gcflag = true
315
- notify (any_gc_flag)
342
+ lock (w. msg_lock) do
343
+ push! (w. add_msgs, (remoteref_id (rr), i))
344
+ @atomic w. gcflag = true
345
+ end
346
+ lock (any_gc_flag) do
347
+ notify (any_gc_flag)
348
+ end
316
349
end
317
350
end
318
351
0 commit comments