Skip to content

Commit 7791258

Browse files
committed
Use fiber.cancel
1 parent 2344c46 commit 7791258

File tree

1 file changed

+4
-10
lines changed

1 file changed

+4
-10
lines changed

queue/abstract.lua

Lines changed: 4 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -73,7 +73,6 @@ function tube.put(self, data, opts)
7373
end
7474

7575
local conds = {}
76-
local releasing_sessions = {}
7776

7877
function tube.take(self, timeout)
7978
timeout = time(timeout or TIMEOUT_INFINITY)
@@ -95,13 +94,7 @@ function tube.take(self, timeout)
9594
conds[fid]:free()
9695
box.space._queue_consumers:delete{ sid, fid }
9796

98-
-- We don't take a task if the session is in a disconnecting state
99-
if releasing_sessions[fid] == nil then
100-
task = self.raw:take()
101-
else
102-
releasing_sessions[fid] = nil
103-
return nil
104-
end
97+
task = self.raw:take()
10598

10699
if task ~= nil then
107100
return self.raw:normalize_task(task)
@@ -363,8 +356,9 @@ function method._on_consumer_disconnect()
363356
box.space._queue_consumers:delete{ waiter[1], waiter[2] }
364357
local cond = conds[waiter[2]]
365358
if cond then
366-
releasing_sessions[waiter[2]] = true
367-
cond:signal(waiter[2])
359+
fiber_w = fiber.find(waiter[2])
360+
fiber_w:cancel()
361+
conds[waiter[2]] = nil
368362
end
369363
end
370364

0 commit comments

Comments
 (0)