From df35a44ff9352411857a8869235e1ce7dc066482 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Wojciech=20Wn=C4=99trzak?= Date: Fri, 4 Jun 2021 16:06:36 +0200 Subject: [PATCH 1/5] Add deleting jobs from UI. (#265) * Add deleting jobs from UI. refs #256 * Improve deleting jobs * Move deleting jobs to own controller --- engine/app/controllers/good_job/jobs_controller.rb | 9 +++++++++ engine/app/views/layouts/good_job/base.html.erb | 13 +++++++++++++ engine/app/views/shared/_jobs_table.erb | 6 ++++++ engine/app/views/shared/icons/_check.html.erb | 4 ++++ engine/app/views/shared/icons/_exclamation.html.erb | 4 ++++ engine/app/views/shared/icons/_trash.html.erb | 5 +++++ engine/config/routes.rb | 3 ++- spec/system/dashboard_spec.rb | 13 +++++++++++++ 8 files changed, 56 insertions(+), 1 deletion(-) create mode 100644 engine/app/controllers/good_job/jobs_controller.rb create mode 100644 engine/app/views/shared/icons/_check.html.erb create mode 100644 engine/app/views/shared/icons/_exclamation.html.erb create mode 100644 engine/app/views/shared/icons/_trash.html.erb diff --git a/engine/app/controllers/good_job/jobs_controller.rb b/engine/app/controllers/good_job/jobs_controller.rb new file mode 100644 index 000000000..d4b4e52e0 --- /dev/null +++ b/engine/app/controllers/good_job/jobs_controller.rb @@ -0,0 +1,9 @@ +module GoodJob + class JobsController < GoodJob::BaseController + def destroy + deleted_count = GoodJob::Job.where(id: params[:id]).delete_all + message = deleted_count.positive? ? { notice: "Job deleted" } : { alert: "Job not deleted" } + redirect_to root_path, **message + end + end +end diff --git a/engine/app/views/layouts/good_job/base.html.erb b/engine/app/views/layouts/good_job/base.html.erb index 3e6a95e52..793017b67 100644 --- a/engine/app/views/layouts/good_job/base.html.erb +++ b/engine/app/views/layouts/good_job/base.html.erb @@ -51,6 +51,19 @@ + <% if notice %> + + <% elsif alert %> + + <% end %> <%= yield %> diff --git a/engine/app/views/shared/_jobs_table.erb b/engine/app/views/shared/_jobs_table.erb index 09f849714..4c6ddd805 100644 --- a/engine/app/views/shared/_jobs_table.erb +++ b/engine/app/views/shared/_jobs_table.erb @@ -9,6 +9,7 @@ Scheduled At Error ActiveJob Params + Actions <% jobs.each do |job| %> @@ -26,6 +27,11 @@ %> <%= tag.pre JSON.pretty_generate(job.serialized_params), id: dom_id(job, "params"), class: "collapse" %> + + <%= button_to job_path(job.id), method: :delete, class: "btn btn-sm btn-outline-danger", title: "Delete job" do %> + <%= render "shared/icons/trash" %> + <% end %> + <% end %> diff --git a/engine/app/views/shared/icons/_check.html.erb b/engine/app/views/shared/icons/_check.html.erb new file mode 100644 index 000000000..3ae85a3dd --- /dev/null +++ b/engine/app/views/shared/icons/_check.html.erb @@ -0,0 +1,4 @@ + + + + diff --git a/engine/app/views/shared/icons/_exclamation.html.erb b/engine/app/views/shared/icons/_exclamation.html.erb new file mode 100644 index 000000000..675e4f0b1 --- /dev/null +++ b/engine/app/views/shared/icons/_exclamation.html.erb @@ -0,0 +1,4 @@ + + + + diff --git a/engine/app/views/shared/icons/_trash.html.erb b/engine/app/views/shared/icons/_trash.html.erb new file mode 100644 index 000000000..08bec9361 --- /dev/null +++ b/engine/app/views/shared/icons/_trash.html.erb @@ -0,0 +1,5 @@ + + + + + diff --git a/engine/config/routes.rb b/engine/config/routes.rb index 44d16ce4a..7e47c42d6 100644 --- a/engine/config/routes.rb +++ b/engine/config/routes.rb @@ -1,6 +1,7 @@ GoodJob::Engine.routes.draw do root to: 'dashboards#index' - resources :active_jobs, only: :show + resources :active_jobs, only: %i[show] + resources :jobs, only: %i[destroy] scope controller: :assets do get :bootstrap_css diff --git a/spec/system/dashboard_spec.rb b/spec/system/dashboard_spec.rb index d6a05ff70..9b791e9e9 100644 --- a/spec/system/dashboard_spec.rb +++ b/spec/system/dashboard_spec.rb @@ -5,4 +5,17 @@ visit '/good_job' expect(page).to have_content 'GoodJob 👍' end + + it 'deletes job' do + ActiveJob::Base.queue_adapter = GoodJob::Adapter.new(execution_mode: :external) + + ExampleJob.perform_later + + visit '/good_job' + expect(page).to have_content 'ExampleJob' + + click_button('Delete job') + expect(page).to have_content 'Job deleted' + expect(page).not_to have_content 'ExampleJob' + end end From e83d27448abf54195fa09eb03a872eefb39b9a03 Mon Sep 17 00:00:00 2001 From: Ben Sheldon Date: Fri, 4 Jun 2021 07:09:47 -0700 Subject: [PATCH 2/5] Release good_job v1.9.6 --- CHANGELOG.md | 14 ++++++++++++++ Gemfile.lock | 2 +- lib/good_job/version.rb | 2 +- 3 files changed, 16 insertions(+), 2 deletions(-) diff --git a/CHANGELOG.md b/CHANGELOG.md index ff909b1ae..b0b4b2488 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -1,5 +1,19 @@ # Changelog +## [v1.9.6](https://github.com/bensheldon/good_job/tree/v1.9.6) (2021-06-04) + +[Full Changelog](https://github.com/bensheldon/good_job/compare/v1.9.5...v1.9.6) + +**Closed issues:** + +- Pause jobs during migration / maintenance? [\#257](https://github.com/bensheldon/good_job/issues/257) +- How to properly report errors to error tracker service [\#159](https://github.com/bensheldon/good_job/issues/159) + +**Merged pull requests:** + +- Add deleting jobs from UI. [\#265](https://github.com/bensheldon/good_job/pull/265) ([morgoth](https://github.com/morgoth)) +- Collapse Dashboard params by default [\#263](https://github.com/bensheldon/good_job/pull/263) ([morgoth](https://github.com/morgoth)) + ## [v1.9.5](https://github.com/bensheldon/good_job/tree/v1.9.5) (2021-05-24) [Full Changelog](https://github.com/bensheldon/good_job/compare/v1.9.4...v1.9.5) diff --git a/Gemfile.lock b/Gemfile.lock index b14bf3a87..aa08fc44e 100644 --- a/Gemfile.lock +++ b/Gemfile.lock @@ -11,7 +11,7 @@ GIT PATH remote: . specs: - good_job (1.9.5) + good_job (1.9.6) activejob (>= 5.2.0) activerecord (>= 5.2.0) concurrent-ruby (>= 1.0.2) diff --git a/lib/good_job/version.rb b/lib/good_job/version.rb index 50963c739..39a77298f 100644 --- a/lib/good_job/version.rb +++ b/lib/good_job/version.rb @@ -1,4 +1,4 @@ module GoodJob # GoodJob gem version. - VERSION = '1.9.5'.freeze + VERSION = '1.9.6'.freeze end From e750696061bf039bf5d2e021382d34f6eaad85e7 Mon Sep 17 00:00:00 2001 From: "Ben Sheldon [he/him]" Date: Wed, 23 Jun 2021 15:11:29 -0700 Subject: [PATCH 3/5] Fix Scheduler integration spec to ensure jobs are run in the Scheduler under test (#276) --- spec/integration/scheduler_spec.rb | 17 +++++++++++------ 1 file changed, 11 insertions(+), 6 deletions(-) diff --git a/spec/integration/scheduler_spec.rb b/spec/integration/scheduler_spec.rb index 59f55b2d2..1e54ec253 100644 --- a/spec/integration/scheduler_spec.rb +++ b/spec/integration/scheduler_spec.rb @@ -1,6 +1,8 @@ require 'rails_helper' RSpec.describe 'Schedule Integration' do + let(:adapter) { GoodJob::Adapter.new(execution_mode: :external) } + before do ActiveJob::Base.queue_adapter = adapter @@ -38,15 +40,15 @@ def perform(*args, **kwargs) end), transfer_nested_constants: true end - let(:adapter) { GoodJob::Adapter.new } - context 'when there are a large number of jobs' do let(:number_of_jobs) { 500 } let(:max_threads) { 5 } let!(:good_jobs) do - number_of_jobs.times do |i| - ExampleJob.perform_later(i) + GoodJob::Job.transaction do + number_of_jobs.times do |i| + ExampleJob.perform_later(i) + end end end @@ -77,8 +79,10 @@ def perform(*args, **kwargs) let(:number_of_jobs) { 50 } let!(:good_jobs) do - number_of_jobs.times do |i| - ExampleJob.perform_later(i) + GoodJob::Job.transaction do + number_of_jobs.times do |i| + ExampleJob.perform_later(i) + end end end @@ -117,6 +121,7 @@ def perform(*args, **kwargs) it 'warms up and schedules them in a cache' do performer = GoodJob::JobPerformer.new('*') scheduler = GoodJob::Scheduler.new(performer, max_threads: 5, max_cache: 5) + scheduler.warm_cache sleep_until(max: 5, increments_of: 0.5) { GoodJob::Job.count == 0 } scheduler.shutdown end From f7df44a07882a266518ff7bfe009384528366484 Mon Sep 17 00:00:00 2001 From: "Ben Sheldon [he/him]" Date: Wed, 23 Jun 2021 15:13:22 -0700 Subject: [PATCH 4/5] Add example benchmark for job throughput (#275) --- .rubocop.yml | 1 + Gemfile | 1 + Gemfile.lock | 6 +++ good_job.gemspec | 1 + scripts/benchmark_job_throughput.rb | 51 +++++++++++++++++++ ...duler.rb => benchmark_scheduler_memory.rb} | 0 6 files changed, 60 insertions(+) create mode 100644 scripts/benchmark_job_throughput.rb rename scripts/{benchmark_scheduler.rb => benchmark_scheduler_memory.rb} (100%) diff --git a/.rubocop.yml b/.rubocop.yml index a02e0d143..6427d3f5f 100644 --- a/.rubocop.yml +++ b/.rubocop.yml @@ -20,6 +20,7 @@ AllCops: - pkg/**/* - spec/test_app/**/* - vendor/**/* + - scripts/**/* NewCops: enable Layout/EmptyLineAfterMagicComment: diff --git a/Gemfile b/Gemfile index 5256cebeb..7d7029d6f 100644 --- a/Gemfile +++ b/Gemfile @@ -17,6 +17,7 @@ gem 'pg', platforms: [:mri, :mingw, :x64_mingw] gem 'rails' platforms :ruby do + gem "activerecord-explain-analyze" gem "memory_profiler" gem "pry-byebug" gem "rbtrace" diff --git a/Gemfile.lock b/Gemfile.lock index aa08fc44e..cdbaf1d07 100644 --- a/Gemfile.lock +++ b/Gemfile.lock @@ -68,6 +68,9 @@ GEM activerecord (6.1.3.2) activemodel (= 6.1.3.2) activesupport (= 6.1.3.2) + activerecord-explain-analyze (0.1.0) + activerecord (>= 4) + pg activerecord-jdbc-adapter (61.0-java) activerecord (~> 6.1.0) activerecord-jdbcpostgresql-adapter (61.0-java) @@ -107,6 +110,7 @@ GEM async (~> 1.14) async-pool (0.3.6) async (~> 1.25) + benchmark-ips (2.8.4) better_html (1.0.16) actionview (>= 4.0) activesupport (>= 4.0) @@ -373,8 +377,10 @@ PLATFORMS universal-java-11 DEPENDENCIES + activerecord-explain-analyze activerecord-jdbcpostgresql-adapter appraisal! + benchmark-ips (= 2.8.4) capybara database_cleaner dotenv diff --git a/good_job.gemspec b/good_job.gemspec index 3e14429d3..e9cca79b6 100644 --- a/good_job.gemspec +++ b/good_job.gemspec @@ -54,6 +54,7 @@ Gem::Specification.new do |spec| spec.add_dependency "thor", ">= 0.14.1" spec.add_dependency "zeitwerk", ">= 2.0" + spec.add_development_dependency "benchmark-ips", "2.8.4" # https://github.com/evanphx/benchmark-ips/pull/115 spec.add_development_dependency "capybara" spec.add_development_dependency "database_cleaner" spec.add_development_dependency "dotenv" diff --git a/scripts/benchmark_job_throughput.rb b/scripts/benchmark_job_throughput.rb new file mode 100644 index 000000000..345cfd0d7 --- /dev/null +++ b/scripts/benchmark_job_throughput.rb @@ -0,0 +1,51 @@ +# To run: +# bundle exec ruby scripts/benchmark_example.rb +# + +ENV['GOOD_JOB_EXECUTION_MODE'] = 'external' + +require_relative '../spec/test_app/config/environment' +require_relative '../lib/good_job' +require 'benchmark/ips' +require 'pry' + +booleans = [true, false] +priorities = (1..10).to_a +scheduled_minutes = (-60..60).to_a + +GoodJob::Job.delete_all +puts "Seeding database" +jobs_data = Array.new(10_000) do |i| + puts "Initializing seed record ##{i}" if (i % 1_000).zero? + { + queue_name: 'default', + priority: priorities.sample, + scheduled_at: booleans.sample ? scheduled_minutes.sample.minutes.ago : nil, + created_at: 90.minutes.ago, + updated_at: 90.minutes.ago, + finished_at: booleans.sample ? scheduled_minutes.sample.minutes.ago : nil, + serialized_params: {}, + } +end +puts "Inserting seed records into the database...\n" +GoodJob::Job.insert_all(jobs_data) + +# ActiveRecord::Base.connection.execute('SET enable_seqscan = OFF') +# puts GoodJob::Job.unfinished.priority_ordered.only_scheduled(use_coalesce: true).limit(1).advisory_lock.explain(analyze: true) +# exit! + +Benchmark.ips do |x| + x.report("with priority") do + GoodJob::Job.unfinished.priority_ordered.only_scheduled(use_coalesce: true).limit(1).with_advisory_lock do |good_jobs| + # good_jobs.first&.destroy! + end + end + + x.report("without priority") do + GoodJob::Job.unfinished.only_scheduled(use_coalesce: true).limit(1).with_advisory_lock do |good_jobs| + # good_jobs.first&.destroy! + end + end + + x.compare! +end diff --git a/scripts/benchmark_scheduler.rb b/scripts/benchmark_scheduler_memory.rb similarity index 100% rename from scripts/benchmark_scheduler.rb rename to scripts/benchmark_scheduler_memory.rb From e0e828df94773c929d0d1703f7fa4f9c5eaf5c57 Mon Sep 17 00:00:00 2001 From: Ben Sheldon Date: Wed, 23 Jun 2021 17:09:05 -0700 Subject: [PATCH 5/5] Allow Lockable to be passed custom column, key, and Postgres advisory lock/unlock function --- lib/good_job/lockable.rb | 124 ++++++++++++++++++++--------- spec/lib/good_job/lockable_spec.rb | 44 +++++++++- 2 files changed, 128 insertions(+), 40 deletions(-) diff --git a/lib/good_job/lockable.rb b/lib/good_job/lockable.rb index b81670a5f..561c4e456 100644 --- a/lib/good_job/lockable.rb +++ b/lib/good_job/lockable.rb @@ -22,17 +22,25 @@ module Lockable RecordAlreadyAdvisoryLockedError = Class.new(StandardError) included do + # Default column to be used when creating Advisory Locks + cattr_accessor(:advisory_lockable_column, instance_accessor: false) { primary_key } + + # Default Postgres function to be used for Advisory Locks + cattr_accessor(:advisory_lockable_function) { "pg_try_advisory_lock" } + # Attempt to acquire an advisory lock on the selected records and # return only those records for which a lock could be acquired. - # @!method advisory_lock + # @!method advisory_lock(column: advisory_lockable_column, function: advisory_lockable_function) # @!scope class + # @param column [String, Symbol] column values to Advisory Lock against + # @param function [String, Symbol] Postgres Advisory Lock function name to use # @return [ActiveRecord::Relation] # A relation selecting only the records that were locked. - scope :advisory_lock, (lambda do + scope :advisory_lock, (lambda do |column: advisory_lockable_column, function: advisory_lockable_function| original_query = self cte_table = Arel::Table.new(:rows) - cte_query = original_query.select(primary_key).except(:limit) + cte_query = original_query.select(primary_key, column).except(:limit) cte_type = if supports_cte_materialization_specifiers? 'MATERIALIZED' else @@ -41,9 +49,14 @@ module Lockable composed_cte = Arel::Nodes::As.new(cte_table, Arel::Nodes::SqlLiteral.new([cte_type, "(", cte_query.to_sql, ")"].join(' '))) + # In addition to an advisory lock, there is also a FOR UPDATE SKIP LOCKED + # because this causes the query to skip jobs that were completed (and deleted) + # by another session in the time since the table snapshot was taken. + # In rare cases under high concurrency levels, leaving this out can result in double executions. query = cte_table.project(cte_table[:id]) .with(composed_cte) - .where(Arel.sql(sanitize_sql_for_conditions(["pg_try_advisory_lock(('x' || substr(md5(:table_name || #{connection.quote_table_name(cte_table.name)}.#{quoted_primary_key}::text), 1, 16))::bit(64)::bigint)", { table_name: table_name }]))) + .where(Arel.sql(sanitize_sql_for_conditions(["#{function}(('x' || substr(md5(:table_name || #{connection.quote_table_name(cte_table.name)}.#{connection.quote_column_name(column)}::text), 1, 16))::bit(64)::bigint)", { table_name: table_name }]))) + .lock(Arel.sql("FOR UPDATE SKIP LOCKED")) limit = original_query.arel.ast.limit query.limit = limit.value if limit.present? @@ -57,40 +70,44 @@ module Lockable # # For details on +pg_locks+, see # {https://www.postgresql.org/docs/current/view-pg-locks.html}. - # @!method joins_advisory_locks + # @!method joins_advisory_locks(column: advisory_lockable_column) # @!scope class + # @param column [String, Symbol] column values to Advisory Lock against # @return [ActiveRecord::Relation] # @example Get the records that have a session awaiting a lock: # MyLockableRecord.joins_advisory_locks.where("pg_locks.granted = ?", false) - scope :joins_advisory_locks, (lambda do + scope :joins_advisory_locks, (lambda do |column: advisory_lockable_column| join_sql = <<~SQL.squish LEFT JOIN pg_locks ON pg_locks.locktype = 'advisory' AND pg_locks.objsubid = 1 - AND pg_locks.classid = ('x' || substr(md5(:table_name || #{quoted_table_name}.#{quoted_primary_key}::text), 1, 16))::bit(32)::int - AND pg_locks.objid = (('x' || substr(md5(:table_name || #{quoted_table_name}.#{quoted_primary_key}::text), 1, 16))::bit(64) << 32)::bit(32)::int + AND pg_locks.classid = ('x' || substr(md5(:table_name || #{quoted_table_name}.#{connection.quote_column_name(column)}::text), 1, 16))::bit(32)::int + AND pg_locks.objid = (('x' || substr(md5(:table_name || #{quoted_table_name}.#{connection.quote_column_name(column)}::text), 1, 16))::bit(64) << 32)::bit(32)::int SQL joins(sanitize_sql_for_conditions([join_sql, { table_name: table_name }])) end) # Find records that do not have an advisory lock on them. - # @!method advisory_unlocked + # @!method advisory_unlocked(column: advisory_lockable_column) # @!scope class + # @param column [String, Symbol] column values to Advisory Lock against # @return [ActiveRecord::Relation] - scope :advisory_unlocked, -> { joins_advisory_locks.where(pg_locks: { locktype: nil }) } + scope :advisory_unlocked, ->(column: advisory_lockable_column) { joins_advisory_locks(column: column).where(pg_locks: { locktype: nil }) } # Find records that have an advisory lock on them. - # @!method advisory_locked + # @!method advisory_locked(column: advisory_lockable_column) # @!scope class + # @param column [String, Symbol] column values to Advisory Lock against # @return [ActiveRecord::Relation] - scope :advisory_locked, -> { joins_advisory_locks.where.not(pg_locks: { locktype: nil }) } + scope :advisory_locked, ->(column: advisory_lockable_column) { joins_advisory_locks(column: column).where.not(pg_locks: { locktype: nil }) } # Find records with advisory locks owned by the current Postgres # session/connection. - # @!method advisory_locked + # @!method advisory_locked(column: advisory_lockable_column) # @!scope class + # @param column [String, Symbol] column values to Advisory Lock against # @return [ActiveRecord::Relation] - scope :owns_advisory_locked, -> { joins_advisory_locks.where('"pg_locks"."pid" = pg_backend_pid()') } + scope :owns_advisory_locked, ->(column: advisory_lockable_column) { joins_advisory_locks(column: column).where('"pg_locks"."pid" = pg_backend_pid()') } # Whether an advisory lock should be acquired in the same transaction # that created the record. @@ -122,6 +139,8 @@ module Lockable # can (as in {Lockable.advisory_lock}) and only pass those that could be # locked to the block. # + # @param column [String, Symbol] name of advisory lock or unlock function + # @param function [String, Symbol] Postgres Advisory Lock function name to use # @yield [Array] the records that were successfully locked. # @return [Object] the result of the block. # @@ -129,14 +148,17 @@ module Lockable # MyLockableRecord.order(created_at: :asc).limit(2).with_advisory_lock do |record| # do_something_with record # end - def with_advisory_lock + def with_advisory_lock(column: advisory_lockable_column, function: advisory_lockable_function) raise ArgumentError, "Must provide a block" unless block_given? - records = advisory_lock.to_a + records = advisory_lock(column: column, function: function).to_a begin yield(records) ensure - records.each(&:advisory_unlock) + records.each do |record| + key = [table_name, record[advisory_lockable_column]].join + record.advisory_unlock(key: key, function: advisory_unlockable_function(function)) + end end end @@ -145,49 +167,63 @@ def supports_cte_materialization_specifiers? @_supports_cte_materialization_specifiers = connection.postgresql_version >= 120000 end + + # Postgres advisory unlocking function for the class + # @param function [String, Symbol] name of advisory lock or unlock function + # @return [Boolean] + def advisory_unlockable_function(function = advisory_lockable_function) + function.to_s.sub("_lock", "_unlock").sub("_try_", "_") + end end # Acquires an advisory lock on this record if it is not already locked by # another database session. Be careful to ensure you release the lock when # you are done with {#advisory_unlock} (or {#advisory_unlock!} to release # all remaining locks). + # @param key [String, Symbol] Key to Advisory Lock against + # @param function [String, Symbol] Postgres Advisory Lock function name to use # @return [Boolean] whether the lock was acquired. - def advisory_lock + def advisory_lock(key: lockable_key, function: advisory_lockable_function) query = <<~SQL.squish SELECT 1 AS one - WHERE pg_try_advisory_lock(('x'||substr(md5($1 || $2::text), 1, 16))::bit(64)::bigint) + WHERE #{function}(('x'||substr(md5($1::text), 1, 16))::bit(64)::bigint) SQL - binds = [[nil, self.class.table_name], [nil, send(self.class.primary_key)]] + binds = [[nil, key]] self.class.connection.exec_query(pg_or_jdbc_query(query), 'GoodJob::Lockable Advisory Lock', binds).any? end # Releases an advisory lock on this record if it is locked by this database # session. Note that advisory locks stack, so you must call # {#advisory_unlock} and {#advisory_lock} the same number of times. + # @param key [String, Symbol] Key to lock against + # @param function [String, Symbol] Postgres Advisory Lock function name to use # @return [Boolean] whether the lock was released. - def advisory_unlock + def advisory_unlock(key: lockable_key, function: self.class.advisory_unlockable_function(advisory_lockable_function)) query = <<~SQL.squish SELECT 1 AS one - WHERE pg_advisory_unlock(('x'||substr(md5($1 || $2::text), 1, 16))::bit(64)::bigint) + WHERE #{function}(('x'||substr(md5($1::text), 1, 16))::bit(64)::bigint) SQL - binds = [[nil, self.class.table_name], [nil, send(self.class.primary_key)]] + binds = [[nil, key]] self.class.connection.exec_query(pg_or_jdbc_query(query), 'GoodJob::Lockable Advisory Unlock', binds).any? end # Acquires an advisory lock on this record or raises # {RecordAlreadyAdvisoryLockedError} if it is already locked by another # database session. + # @param key [String, Symbol] Key to lock against + # @param function [String, Symbol] Postgres Advisory Lock function name to use # @raise [RecordAlreadyAdvisoryLockedError] # @return [Boolean] +true+ - def advisory_lock! - result = advisory_lock + def advisory_lock!(key: lockable_key, function: advisory_lockable_function) + result = advisory_lock(key: key, function: function) result || raise(RecordAlreadyAdvisoryLockedError) end # Acquires an advisory lock on this record and safely releases it after the # passed block is completed. If the record is locked by another database # session, this raises {RecordAlreadyAdvisoryLockedError}. - # + # @param key [String, Symbol] Key to lock against + # @param function [String, Symbol] Postgres Advisory Lock function name to use # @yield Nothing # @return [Object] The result of the block. # @@ -196,51 +232,61 @@ def advisory_lock! # record.with_advisory_lock do # do_something_with record # end - def with_advisory_lock + def with_advisory_lock(key: lockable_key, function: advisory_lockable_function) raise ArgumentError, "Must provide a block" unless block_given? - advisory_lock! + advisory_lock!(key: key, function: function) yield ensure - advisory_unlock unless $ERROR_INFO.is_a? RecordAlreadyAdvisoryLockedError + advisory_unlock(key: key, function: self.class.advisory_unlockable_function(function)) unless $ERROR_INFO.is_a? RecordAlreadyAdvisoryLockedError end # Tests whether this record has an advisory lock on it. + # @param key [String, Symbol] Key to test lock against # @return [Boolean] - def advisory_locked? + def advisory_locked?(key: lockable_key) query = <<~SQL.squish SELECT 1 AS one FROM pg_locks WHERE pg_locks.locktype = 'advisory' AND pg_locks.objsubid = 1 - AND pg_locks.classid = ('x' || substr(md5($1 || $2::text), 1, 16))::bit(32)::int - AND pg_locks.objid = (('x' || substr(md5($3 || $4::text), 1, 16))::bit(64) << 32)::bit(32)::int + AND pg_locks.classid = ('x' || substr(md5($1::text), 1, 16))::bit(32)::int + AND pg_locks.objid = (('x' || substr(md5($2::text), 1, 16))::bit(64) << 32)::bit(32)::int SQL - binds = [[nil, self.class.table_name], [nil, send(self.class.primary_key)], [nil, self.class.table_name], [nil, send(self.class.primary_key)]] + binds = [[nil, key], [nil, key]] self.class.connection.exec_query(pg_or_jdbc_query(query), 'GoodJob::Lockable Advisory Locked?', binds).any? end # Tests whether this record is locked by the current database session. + # @param key [String, Symbol] Key to test lock against # @return [Boolean] - def owns_advisory_lock? + def owns_advisory_lock?(key: lockable_key) query = <<~SQL.squish SELECT 1 AS one FROM pg_locks WHERE pg_locks.locktype = 'advisory' AND pg_locks.objsubid = 1 - AND pg_locks.classid = ('x' || substr(md5($1 || $2::text), 1, 16))::bit(32)::int - AND pg_locks.objid = (('x' || substr(md5($3 || $4::text), 1, 16))::bit(64) << 32)::bit(32)::int + AND pg_locks.classid = ('x' || substr(md5($1::text), 1, 16))::bit(32)::int + AND pg_locks.objid = (('x' || substr(md5($2::text), 1, 16))::bit(64) << 32)::bit(32)::int AND pg_locks.pid = pg_backend_pid() SQL - binds = [[nil, self.class.table_name], [nil, send(self.class.primary_key)], [nil, self.class.table_name], [nil, send(self.class.primary_key)]] + binds = [[nil, key], [nil, key]] self.class.connection.exec_query(pg_or_jdbc_query(query), 'GoodJob::Lockable Owns Advisory Lock?', binds).any? end # Releases all advisory locks on the record that are held by the current # database session. + # @param key [String, Symbol] Key to lock against + # @param function [String, Symbol] Postgres Advisory Lock function name to use # @return [void] - def advisory_unlock! - advisory_unlock while advisory_locked? + def advisory_unlock!(key: lockable_key, function: self.class.advisory_unlockable_function(advisory_lockable_function)) + advisory_unlock(key: key, function: function) while advisory_locked? + end + + # Default Advisory Lock key + # @return [String] + def lockable_key + [self.class.table_name, self[self.class.advisory_lockable_column]].join end private diff --git a/spec/lib/good_job/lockable_spec.rb b/spec/lib/good_job/lockable_spec.rb index 78ac9492d..7d0a67eb5 100644 --- a/spec/lib/good_job/lockable_spec.rb +++ b/spec/lib/good_job/lockable_spec.rb @@ -2,7 +2,7 @@ RSpec.describe GoodJob::Lockable do let(:model_class) { GoodJob::Job } - let(:job) { model_class.create! } + let(:job) { model_class.create(queue_name: "default") } describe '.advisory_lock' do around do |example| @@ -35,6 +35,30 @@ FROM "rows" WHERE pg_try_advisory_lock(('x' || substr(md5('good_jobs' || "rows"."id"::text), 1, 16))::bit(64)::bigint) LIMIT 2 + FOR UPDATE SKIP LOCKED + ) + ORDER BY "good_jobs"."priority" DESC + SQL + end + + it 'can be customized with `lockable_column`' do + allow(model_class).to receive(:advisory_lockable_column).and_return("queue_name") + query = model_class.order(priority: :desc).limit(2).advisory_lock + + expect(normalize_sql(query.to_sql)).to eq normalize_sql(<<~SQL.squish) + SELECT "good_jobs".* + FROM "good_jobs" + WHERE "good_jobs"."id" IN ( + WITH "rows" AS #{'MATERIALIZED' if model_class.supports_cte_materialization_specifiers?} ( + SELECT "good_jobs"."id", "good_jobs"."queue_name" + FROM "good_jobs" + ORDER BY "good_jobs"."priority" DESC + ) + SELECT "rows"."id" + FROM "rows" + WHERE pg_try_advisory_lock(('x' || substr(md5('good_jobs' || "rows"."queue_name"::text), 1, 16))::bit(64)::bigint) + LIMIT 2 + FOR UPDATE SKIP LOCKED ) ORDER BY "good_jobs"."priority" DESC SQL @@ -48,6 +72,16 @@ job.advisory_unlock end + + it 'can lock an alternative column' do + expect(job).not_to be_advisory_locked + result_job = model_class.advisory_lock(column: :queue_name).first + expect(result_job).to eq job + expect(job).to be_advisory_locked(key: "good_jobsdefault") + expect(job).not_to be_advisory_locked # on default key + + job.advisory_unlock(key: "good_jobsdefault") + end end describe '.with_advisory_lock' do @@ -81,6 +115,14 @@ job.advisory_unlock end + + it 'can lock alternative values' do + job.advisory_lock!(key: "alternative") + expect(job.advisory_locked?(key: "alternative")).to be true + expect(job.advisory_locked?).to be false + + job.advisory_unlock(key: "alternative") + end end describe '#advisory_unlock' do