Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Optimize uniqueness filter in Channel.select_impl #12814

Conversation

straight-shoota
Copy link
Member

@straight-shoota straight-shoota commented Dec 1, 2022

In Channel.select_impl uses a list of locks that need to be locked before running each action and unlocked afterwards. The locks are retrieved from the SelectActions (each action doubles as a lock), but some actions share the same lock (indicated by lock_object_id, which is usually Channel). Locking the same lock twice would result in a deadlock, so duplicates needs to be avoided.
This was originally implemented using uniq!(&.lock_object_id).

But this method is actually relatively costly. It's O(N²) for cross-comparing all items. For larger sizes (> 11 items) it would even allocate a Hash.

The locks also need to be iterated in a constant order to avoid deadlock between concurrent select actions. Thus they are sorted using sort_by!(&.lock_object_id) and actions with identical lock_object_ids are consequently ordered in a row. This allows to filter repeated ids on iteration with a cursor for tracking the previous id.
This filtering is negligible overhead. Iteration happens exactly twice, once for lock and once for unlock. For unlock there is probably not even a need to avoid duplicates because it doesn't block. But it's cheap enough to do regardless.
This should be a net performance improvement over uniq!.

(It would certainly also be more efficient to materialize uniqueness with a similar iterative approach after sorting. But since the collection needs to be iterated only twice, it should be much more efficient to filter on the fly and ensure uniqueness while iterating.)

As an additional change, sort_by! is replaced by unstable_sort_by!. There's no need for stable sort characteristics.

Ref: #12756

@straight-shoota straight-shoota changed the title Skip uniq! in Channel.select_impl Optimize uniqueness filter in Channel.select_impl Dec 1, 2022
@asterite
Copy link
Member

asterite commented Dec 3, 2022

Looks great! Any benchmark? Just to get a sense of what's the improvement on each PR.

@straight-shoota
Copy link
Member Author

@asterite I first thought benchmarks for this wouldn't be easy, but that's not really true.

 non_blocking_select 237.34k (  4.21µs) (±10.05%)  12.3kB/op   4.45× slower
non_uniq_select_impl   1.06M (947.24ns) (± 6.55%)  2.03kB/op        fastest

The ratio is relatively stable for different sizes and ranges between 3x and 5x faster for the new implementation.

CHANNELS = Array(Channel(Nil)).new(100) { Channel(Nil).new }

ARY = CHANNELS.map &.receive_select_action

Benchmark.ips do |x|
  x.report "non_blocking_select" do
    Channel.non_blocking_select(ARY)
  end
  x.report "non_uniq_select_impl" do
    Channel.non_uniq_select_impl(ARY, true)
  end
end

class Channel
  def self.non_uniq_select_impl(ops : Indexable(Channel::SelectAction), non_blocking)
    # Sort the operations by the channel they contain
    # This is to avoid deadlocks between concurrent `select` calls
    ops_locks = ops
      .to_a
      .unstable_sort_by!(&.lock_object_id)

    each_skip_duplicates(ops_locks, &.lock)

    ops.each_with_index do |op, index|
      state = op.execute

      case state
      in .delivered?
        each_skip_duplicates(ops_locks, &.unlock)
        return index, op.result
      in .closed?
        each_skip_duplicates(ops_locks, &.unlock)
        return index, op.default_result
      in .none?
        # do nothing
      end
    end

    if non_blocking
      each_skip_duplicates(ops_locks, &.unlock)
      return ops.size, NotReady.new
    end

    # Because `channel#close` may clean up a long list, `select_context.try_trigger` may
    # be called after the select return. In order to prevent invalid address access,
    # the state is allocated in the heap.
    shared_state = SelectContextSharedState.new(SelectState::Active)
    contexts = ops.map &.create_context_and_wait(shared_state)

    each_skip_duplicates(ops_locks, &.unlock)
    Crystal::Scheduler.reschedule

    contexts.each_with_index do |context, index|
      op = ops[index]
      op.lock
      op.unwait(context)
      op.unlock
    end

    contexts.each_with_index do |context, index|
      if context.activated?
        return index, ops[index].wait_result(context)
      end
    end

    raise "BUG: Fiber was awaken from select but no action was activated"
  end

  private def self.each_skip_duplicates(ops_locks)
    # Avoid deadlocks from trying to lock the same lock twice.
    # `ops_lock` is sorted by `lock_object_id`, so identical onces will be in
    # a row and we skip repeats while iterating.
    last_lock_id = nil
    ops_locks.each do |op|
      if op.lock_object_id != last_lock_id
        last_lock_id = op.lock_object_id
        yield op
      end
    end
  end
end

@straight-shoota straight-shoota added this to the 1.7.0 milestone Dec 7, 2022
@straight-shoota straight-shoota self-assigned this Dec 8, 2022
@straight-shoota straight-shoota merged commit a37e97d into crystal-lang:master Dec 8, 2022
@straight-shoota straight-shoota deleted the refactor/channel-select-uniq branch December 8, 2022 22:15
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Projects
None yet
Development

Successfully merging this pull request may close these issues.

3 participants