From a37e97d02f369a8bec32897f1fa8c668727d0016 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Johannes=20M=C3=BCller?= Date: Thu, 8 Dec 2022 23:15:46 +0100 Subject: [PATCH] Optimize uniqueness filter in `Channel.select_impl` (#12814) --- src/channel.cr | 26 +++++++++++++++++++------- 1 file changed, 19 insertions(+), 7 deletions(-) diff --git a/src/channel.cr b/src/channel.cr index 59fe55e38cf4..5a7a5957c786 100644 --- a/src/channel.cr +++ b/src/channel.cr @@ -425,20 +425,19 @@ class Channel(T) # This is to avoid deadlocks between concurrent `select` calls ops_locks = ops .to_a - .uniq!(&.lock_object_id) - .sort_by!(&.lock_object_id) + .unstable_sort_by!(&.lock_object_id) - ops_locks.each &.lock + each_skip_duplicates(ops_locks, &.lock) ops.each_with_index do |op, index| state = op.execute case state in .delivered? - ops_locks.each &.unlock + each_skip_duplicates(ops_locks, &.unlock) return index, op.result in .closed? - ops_locks.each &.unlock + each_skip_duplicates(ops_locks, &.unlock) return index, op.default_result in .none? # do nothing @@ -446,7 +445,7 @@ class Channel(T) end if non_blocking - ops_locks.each &.unlock + each_skip_duplicates(ops_locks, &.unlock) return ops.size, NotReady.new end @@ -456,7 +455,7 @@ class Channel(T) shared_state = SelectContextSharedState.new(SelectState::Active) contexts = ops.map &.create_context_and_wait(shared_state) - ops_locks.each &.unlock + each_skip_duplicates(ops_locks, &.unlock) Crystal::Scheduler.reschedule contexts.each_with_index do |context, index| @@ -475,6 +474,19 @@ class Channel(T) raise "BUG: Fiber was awaken from select but no action was activated" end + private def self.each_skip_duplicates(ops_locks) + # Avoid deadlocks from trying to lock the same lock twice. + # `ops_lock` is sorted by `lock_object_id`, so identical onces will be in + # a row and we skip repeats while iterating. + last_lock_id = nil + ops_locks.each do |op| + if op.lock_object_id != last_lock_id + last_lock_id = op.lock_object_id + yield op + end + end + end + # :nodoc: def send_select_action(value : T) SendAction.new(self, value)