From 9b80dd4f251f2c1194e6d013672d301102b61121 Mon Sep 17 00:00:00 2001 From: Antti Hukkanen Date: Wed, 28 Feb 2024 23:57:38 +0200 Subject: [PATCH] Make collection `process!` method class method to improve locking In order to further prevent two processes processing the same collection at the same time, do this through a class method. --- app/models/decidim/stats/collection.rb | 50 +++++++++------ spec/models/decidim/stats/collection_spec.rb | 66 +++++++++++--------- 2 files changed, 67 insertions(+), 49 deletions(-) diff --git a/app/models/decidim/stats/collection.rb b/app/models/decidim/stats/collection.rb index 01f47db..b7bbd2a 100644 --- a/app/models/decidim/stats/collection.rb +++ b/app/models/decidim/stats/collection.rb @@ -11,6 +11,31 @@ class Collection < Stats::ApplicationRecord belongs_to :measurable, foreign_key: :decidim_measurable_id, foreign_type: :decidim_measurable_type, polymorphic: true has_many :sets, foreign_key: :decidim_stats_collection_id, class_name: "Decidim::Stats::Set", inverse_of: :collection, dependent: :destroy + # A block method that locks the collection during the data aggregation and + # unlocks it after the given block has been processed. + # + # When adding data to a collection, it should always happen within the + # block given to this method. Otherwise it is possible that multiple + # processes are adding data to the same collection at the same time. + # + # @raise [ActiveRecord::RecordInvalid] If the collection cannot be locked + # or unlocked due to it being invalid. + # @return [Boolean] A boolean indicating if collection was unlocked after + # processing the block. + def self.process!(**conditions) + collection = find_or_create_by(**conditions) + return unless collection.available? + + collection.lock! + collection.transaction do + yield collection + collection.unlock! + end + rescue ActiveRecord::RecordNotUnique + # This can happen if two processes try to create the same collection + # exactly at the same time. + end + # Finalizes the collection meaning after finalized, it will no longer # receive any new values. # @@ -37,30 +62,17 @@ def available? # # @return [Boolean] A boolean indicating if the collection is locked. def locked? + self.locked_at = self.class.where(id: id).pick(:locked_at) locked_at.present? end - # A block method that locks the process during the data aggregation and - # unlocks it after the given block has been processed. - # - # When adding data to a collection, it should always happen within the - # block given to this method. Otherwise it is possible that multiple - # processes are adding data to the same collection at the same time. - # - # @raise [ActiveRecord::RecordInvalid] If the collection cannot be locked - # or unlocked due to it being invalid. - # @return [Boolean] A boolean indicating if collection was unlocked after - # processing the block. - def process! - return unless available? - - lock! - yield - unlock! - end - # Locks the collection. # + # Note that this works differently from + # `ActiveRecord::Locking::Pessimistic` because we want the record to be + # available but report that it is locked, so that it is not blocking the + # thread. + # # @raise [ActiveRecord::RecordInvalid] If the collection cannot be locked # due to it being invalid. # @return [Boolean] A boolean indicating if locking was successful. diff --git a/spec/models/decidim/stats/collection_spec.rb b/spec/models/decidim/stats/collection_spec.rb index dd1228f..ad35648 100644 --- a/spec/models/decidim/stats/collection_spec.rb +++ b/spec/models/decidim/stats/collection_spec.rb @@ -5,6 +5,42 @@ describe Decidim::Stats::Collection do subject { create(:stats_collection) } + describe ".process!" do + let(:organization) { create(:organization) } + let(:participatory_space) { create(:participatory_process, organization: organization) } + let(:measurable) { create(:component, manifest_name: "dummy", participatory_space: participatory_space) } + let(:conditions) { { organization: organization, metadata: {}, key: "test" } } + + it "yields by default" do + expect { |b| measurable.stats.process!(**conditions, &b) }.to yield_control + end + + it "sets the collection locked during the yield" do + b = ->(collection) { expect(collection.locked_at).not_to be_nil } + + measurable.stats.process!(**conditions, &b) + + collection = measurable.stats.find_by(**conditions) + expect(collection.locked_at).to be_nil + end + + context "when the collection is locked" do + let!(:collection) { measurable.stats.create!(locked_at: Time.current, **conditions) } + + it "does not yield" do + expect { |b| measurable.stats.process!(**conditions, &b) }.not_to yield_control + end + end + + context "when the collection is finalized" do + let!(:collection) { measurable.stats.create!(finalized: true, **conditions) } + + it "does not yield" do + expect { |b| measurable.stats.process!(**conditions, &b) }.not_to yield_control + end + end + end + describe "#finalize!" do it "sets the finalized at field" do expect { subject.finalize! }.to change(subject, :finalized?).from(false).to(true) @@ -47,36 +83,6 @@ end end - describe "#process!" do - it "yields by default" do - expect { |b| subject.process!(&b) }.to yield_control - end - - it "sets the collection locked during the yield" do - b = -> { expect(subject.locked_at).not_to be_nil } - - expect(subject.locked_at).to be_nil - subject.process!(&b) - expect(subject.locked_at).to be_nil - end - - context "when the collection is locked" do - before { subject.update!(locked_at: Time.current) } - - it "does not yield" do - expect { |b| subject.process!(&b) }.not_to yield_control - end - end - - context "when the collection is finalized" do - before { subject.update!(finalized: true) } - - it "does not yield" do - expect { |b| subject.process!(&b) }.not_to yield_control - end - end - end - describe "#lock!" do it "sets the locked_at to the collection" do expect(subject.locked_at).to be_nil