From c99741e5badc97c58c40536fe956132bd58a4b95 Mon Sep 17 00:00:00 2001 From: Viktor Sych Date: Fri, 3 May 2024 11:15:54 +0300 Subject: [PATCH] [fix] delayed_sidekiq race condition (#937) --- .github/workflows/ruby.yml | 12 +++++ CHANGELOG.md | 1 + README.md | 9 ++-- gemfiles/base.gemfile | 2 +- lib/chewy/strategy/delayed_sidekiq.rb | 8 +-- .../strategy/delayed_sidekiq/scheduler.rb | 49 +++++++++++++------ lib/chewy/strategy/delayed_sidekiq/worker.rb | 36 +++++++++++--- spec/chewy/strategy/delayed_sidekiq_spec.rb | 13 ++--- 8 files changed, 94 insertions(+), 36 deletions(-) diff --git a/.github/workflows/ruby.yml b/.github/workflows/ruby.yml index 979dadf97..825a8d4a8 100644 --- a/.github/workflows/ruby.yml +++ b/.github/workflows/ruby.yml @@ -23,6 +23,18 @@ jobs: env: BUNDLE_GEMFILE: gemfiles/${{ matrix.gemfile }}.gemfile + services: + redis: + # Docker Hub image + image: redis + ports: + - '6379:6379' + # Set health checks to wait until redis has started + options: >- + --health-cmd "redis-cli ping" + --health-interval 10s + --health-timeout 5s + --health-retries 5 steps: - uses: actions/checkout@v4 - uses: ruby/setup-ruby@v1 diff --git a/CHANGELOG.md b/CHANGELOG.md index bfe3a60f2..1df3819ae 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -9,6 +9,7 @@ * [#933](https://github.com/toptal/chewy/pull/933): Relax allowed `elasticsearch` dependency versions. ([@mjankowski][]) ### Bugs Fixed +* [#937](https://github.com/toptal/chewy/pull/937): Fix for race condition while using the `delayed_sidekiq` strategy. Also, fix for Redis bloating in case of reindexing error ([@skcc321](https://github.com/skcc321)) * [#947](https://github.com/toptal/chewy/pull/947): Fix intermittent time-based failure in delayed sidekiq spec. ([@mjankowski][]) diff --git a/README.md b/README.md index 4082fd038..23c13993e 100644 --- a/README.md +++ b/README.md @@ -776,9 +776,12 @@ Chewy.settings[:sidekiq] = {queue: :low} #### `:delayed_sidekiq` -It accumulates ids of records to be reindexed during the latency window in redis and then does the reindexing of all accumulated records at once. -The strategy is very useful in case of frequently mutated records. -It supports `update_fields` option, so it will try to select just enough data from the DB +It accumulates IDs of records to be reindexed during the latency window in Redis and then performs the reindexing of all accumulated records at once. +This strategy is very useful in the case of frequently mutated records. +It supports the `update_fields` option, so it will attempt to select just enough data from the database. + +Keep in mind, this strategy does not guarantee reindexing in the event of Sidekiq worker termination or an error during the reindexing phase. +This behavior is intentional to prevent continuous growth of Redis db. There are three options that can be defined in the index: ```ruby diff --git a/gemfiles/base.gemfile b/gemfiles/base.gemfile index 37da307c5..f4e0c5075 100644 --- a/gemfiles/base.gemfile +++ b/gemfiles/base.gemfile @@ -1,8 +1,8 @@ gem 'database_cleaner' gem 'elasticsearch-extensions' gem 'method_source' -gem 'mock_redis' gem 'rake' +gem 'redis', require: false gem 'rspec', '>= 3.7.0' gem 'rspec-collection_matchers' gem 'rspec-its' diff --git a/lib/chewy/strategy/delayed_sidekiq.rb b/lib/chewy/strategy/delayed_sidekiq.rb index 0bdd25c88..23b373b2d 100644 --- a/lib/chewy/strategy/delayed_sidekiq.rb +++ b/lib/chewy/strategy/delayed_sidekiq.rb @@ -9,11 +9,11 @@ class DelayedSidekiq < Sidekiq # leak and potential flaky tests. def self.clear_timechunks! ::Sidekiq.redis do |redis| - timechunk_sets = redis.smembers(Chewy::Strategy::DelayedSidekiq::Scheduler::ALL_SETS_KEY) - break if timechunk_sets.empty? + keys_to_delete = redis.keys("#{Scheduler::KEY_PREFIX}*") - redis.pipelined do |pipeline| - timechunk_sets.each { |set| pipeline.del(set) } + # Delete keys one by one + keys_to_delete.each do |key| + redis.del(key) end end end diff --git a/lib/chewy/strategy/delayed_sidekiq/scheduler.rb b/lib/chewy/strategy/delayed_sidekiq/scheduler.rb index f1010a3ee..d931c338b 100644 --- a/lib/chewy/strategy/delayed_sidekiq/scheduler.rb +++ b/lib/chewy/strategy/delayed_sidekiq/scheduler.rb @@ -12,13 +12,43 @@ class Strategy class DelayedSidekiq require_relative 'worker' + LUA_SCRIPT = <<~LUA + local timechunk_key = KEYS[1] + local timechunks_key = KEYS[2] + local serialize_data = ARGV[1] + local at = ARGV[2] + local ttl = tonumber(ARGV[3]) + + local schedule_job = false + + -- Check if the 'sadd?' method is available + if redis.call('exists', 'sadd?') == 1 then + redis.call('sadd?', timechunk_key, serialize_data) + else + redis.call('sadd', timechunk_key, serialize_data) + end + + -- Set expiration for timechunk_key + redis.call('expire', timechunk_key, ttl) + + -- Check if timechunk_key exists in the sorted set + if not redis.call('zrank', timechunks_key, timechunk_key) then + -- Add timechunk_key to the sorted set + redis.call('zadd', timechunks_key, at, timechunk_key) + -- Set expiration for timechunks_key + redis.call('expire', timechunks_key, ttl) + schedule_job = true + end + + return schedule_job + LUA + class Scheduler DEFAULT_TTL = 60 * 60 * 24 # in seconds DEFAULT_LATENCY = 10 DEFAULT_MARGIN = 2 DEFAULT_QUEUE = 'chewy' KEY_PREFIX = 'chewy:delayed_sidekiq' - ALL_SETS_KEY = "#{KEY_PREFIX}:all_sets".freeze FALLBACK_FIELDS = 'all' FIELDS_IDS_SEPARATOR = ';' IDS_SEPARATOR = ',' @@ -67,21 +97,8 @@ def initialize(type, ids, options = {}) # | chewy:delayed_sidekiq:CitiesIndex:1679347868 def postpone ::Sidekiq.redis do |redis| - # warning: Redis#sadd will always return an Integer in Redis 5.0.0. Use Redis#sadd? instead - if redis.respond_to?(:sadd?) - redis.sadd?(ALL_SETS_KEY, timechunks_key) - redis.sadd?(timechunk_key, serialize_data) - else - redis.sadd(ALL_SETS_KEY, timechunks_key) - redis.sadd(timechunk_key, serialize_data) - end - - redis.expire(timechunk_key, ttl) - - unless redis.zrank(timechunks_key, timechunk_key) - redis.zadd(timechunks_key, at, timechunk_key) - redis.expire(timechunks_key, ttl) - + # do the redis stuff in a single command to avoid concurrency issues + if redis.eval(LUA_SCRIPT, keys: [timechunk_key, timechunks_key], argv: [serialize_data, at, ttl]) ::Sidekiq::Client.push( 'queue' => sidekiq_queue, 'at' => at + margin, diff --git a/lib/chewy/strategy/delayed_sidekiq/worker.rb b/lib/chewy/strategy/delayed_sidekiq/worker.rb index 4d17a4cd1..af5fa793d 100644 --- a/lib/chewy/strategy/delayed_sidekiq/worker.rb +++ b/lib/chewy/strategy/delayed_sidekiq/worker.rb @@ -6,13 +6,40 @@ class DelayedSidekiq class Worker include ::Sidekiq::Worker + LUA_SCRIPT = <<~LUA + local type = ARGV[1] + local score = tonumber(ARGV[2]) + local prefix = ARGV[3] + local timechunks_key = prefix .. ":" .. type .. ":timechunks" + + -- Get timechunk_keys with scores less than or equal to the specified score + local timechunk_keys = redis.call('zrangebyscore', timechunks_key, '-inf', score) + + -- Get all members from the sets associated with the timechunk_keys + local members = {} + for _, timechunk_key in ipairs(timechunk_keys) do + local set_members = redis.call('smembers', timechunk_key) + for _, member in ipairs(set_members) do + table.insert(members, member) + end + end + + -- Remove timechunk_keys and their associated sets + for _, timechunk_key in ipairs(timechunk_keys) do + redis.call('del', timechunk_key) + end + + -- Remove timechunks with scores less than or equal to the specified score + redis.call('zremrangebyscore', timechunks_key, '-inf', score) + + return members + LUA + def perform(type, score, options = {}) options[:refresh] = !Chewy.disable_refresh_async if Chewy.disable_refresh_async ::Sidekiq.redis do |redis| - timechunks_key = "#{Scheduler::KEY_PREFIX}:#{type}:timechunks" - timechunk_keys = redis.zrangebyscore(timechunks_key, -1, score) - members = timechunk_keys.flat_map { |timechunk_key| redis.smembers(timechunk_key) }.compact + members = redis.eval(LUA_SCRIPT, keys: [], argv: [type, score, Scheduler::KEY_PREFIX]) # extract ids and fields & do the reset of records ids, fields = extract_ids_and_fields(members) @@ -22,9 +49,6 @@ def perform(type, score, options = {}) index.strategy_config.delayed_sidekiq.reindex_wrapper.call do options.any? ? index.import!(ids, **options) : index.import!(ids) end - - redis.del(timechunk_keys) - redis.zremrangebyscore(timechunks_key, -1, score) end end diff --git a/spec/chewy/strategy/delayed_sidekiq_spec.rb b/spec/chewy/strategy/delayed_sidekiq_spec.rb index ae7a07dc7..78a1219b8 100644 --- a/spec/chewy/strategy/delayed_sidekiq_spec.rb +++ b/spec/chewy/strategy/delayed_sidekiq_spec.rb @@ -2,7 +2,7 @@ if defined?(Sidekiq) require 'sidekiq/testing' - require 'mock_redis' + require 'redis' describe Chewy::Strategy::DelayedSidekiq do around do |example| @@ -10,9 +10,10 @@ end before do - redis = MockRedis.new + redis = Redis.new allow(Sidekiq).to receive(:redis).and_yield(redis) Sidekiq::Worker.clear_all + described_class.clear_timechunks! end before do @@ -35,7 +36,7 @@ it "respects 'refresh: false' options" do allow(Chewy).to receive(:disable_refresh_async).and_return(true) - expect(CitiesIndex).to receive(:import!).with([city.id, other_city.id], refresh: false) + expect(CitiesIndex).to receive(:import!).with(match_array([city.id, other_city.id]), refresh: false) scheduler = Chewy::Strategy::DelayedSidekiq::Scheduler.new(CitiesIndex, [city.id, other_city.id]) scheduler.postpone Chewy::Strategy::DelayedSidekiq::Worker.drain @@ -108,7 +109,7 @@ def expected_at_time context 'two reindex call within the timewindow' do it 'accumulates all ids does the reindex one time' do Timecop.freeze do - expect(CitiesIndex).to receive(:import!).with([other_city.id, city.id]).once + expect(CitiesIndex).to receive(:import!).with(match_array([city.id, other_city.id])).once scheduler = Chewy::Strategy::DelayedSidekiq::Scheduler.new(CitiesIndex, [city.id]) scheduler.postpone scheduler = Chewy::Strategy::DelayedSidekiq::Scheduler.new(CitiesIndex, [other_city.id]) @@ -120,7 +121,7 @@ def expected_at_time context 'one call with update_fields another one without update_fields' do it 'does reindex of all fields' do Timecop.freeze do - expect(CitiesIndex).to receive(:import!).with([other_city.id, city.id]).once + expect(CitiesIndex).to receive(:import!).with(match_array([city.id, other_city.id])).once scheduler = Chewy::Strategy::DelayedSidekiq::Scheduler.new(CitiesIndex, [city.id], update_fields: ['name']) scheduler.postpone scheduler = Chewy::Strategy::DelayedSidekiq::Scheduler.new(CitiesIndex, [other_city.id]) @@ -133,7 +134,7 @@ def expected_at_time context 'both calls with different update fields' do it 'deos reindex with union of fields' do Timecop.freeze do - expect(CitiesIndex).to receive(:import!).with([other_city.id, city.id], update_fields: %w[description name]).once + expect(CitiesIndex).to receive(:import!).with(match_array([city.id, other_city.id]), update_fields: %w[name description]).once scheduler = Chewy::Strategy::DelayedSidekiq::Scheduler.new(CitiesIndex, [city.id], update_fields: ['name']) scheduler.postpone scheduler = Chewy::Strategy::DelayedSidekiq::Scheduler.new(CitiesIndex, [other_city.id], update_fields: ['description'])