Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 4 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,10 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0

## [Unreleased]

### Changed

- Now compatible with "fast path" unique job insertion that uses a unique index instead of advisory lock and fetch [as introduced in River #451](https://github.com/riverqueue/river/pull/451). [PR #28](https://github.com/riverqueue/riverqueue-ruby/pull/28).

## [0.6.1] - 2024-08-21

### Fixed
Expand Down
1 change: 1 addition & 0 deletions Gemfile
Original file line number Diff line number Diff line change
Expand Up @@ -11,5 +11,6 @@ group :test do
gem "debug"
gem "rspec-core"
gem "rspec-expectations"
gem "riverqueue-sequel", path: "driver/riverqueue-sequel"
gem "simplecov", require: false
end
11 changes: 11 additions & 0 deletions Gemfile.lock
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,13 @@ PATH
specs:
riverqueue (0.6.1)

PATH
remote: driver/riverqueue-sequel
specs:
riverqueue-sequel (0.6.1)
pg (> 0, < 1000)
sequel (> 0, < 1000)

GEM
remote: https://rubygems.org/
specs:
Expand Down Expand Up @@ -50,6 +57,7 @@ GEM
parser (3.3.0.5)
ast (~> 2.4.1)
racc
pg (1.5.6)
psych (5.1.2)
stringio
racc (1.7.3)
Expand Down Expand Up @@ -89,6 +97,8 @@ GEM
rubocop-ast (>= 1.30.0, < 2.0)
ruby-progressbar (1.13.0)
securerandom (0.3.1)
sequel (5.79.0)
bigdecimal
simplecov (0.22.0)
docile (~> 1.1)
simplecov-html (~> 0.11)
Expand Down Expand Up @@ -137,6 +147,7 @@ PLATFORMS
DEPENDENCIES
debug
riverqueue!
riverqueue-sequel!
rspec-core
rspec-expectations
simplecov
Expand Down
1 change: 1 addition & 0 deletions Steepfile
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ D = Steep::Diagnostic
target :lib do
check "lib"

library "digest"
library "json"
library "time"

Expand Down
101 changes: 89 additions & 12 deletions driver/riverqueue-activerecord/lib/driver.rb
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,16 @@ def errors = {}

def advisory_lock(key)
::ActiveRecord::Base.connection.execute("SELECT pg_advisory_xact_lock(#{key})")
nil
end

def advisory_lock_try(key)
::ActiveRecord::Base.connection.execute("SELECT pg_try_advisory_xact_lock(123)").first["pg_try_advisory_xact_lock"]
end

def job_get_by_id(id)
data_set = RiverJob.where(id: id)
data_set.first ? to_job_row_from_model(data_set.first) : nil
end

def job_get_by_kind_and_unique_properties(get_params)
Expand All @@ -41,18 +51,44 @@ def job_get_by_kind_and_unique_properties(get_params)
data_set = data_set.where(args: get_params.encoded_args) if get_params.encoded_args
data_set = data_set.where(queue: get_params.queue) if get_params.queue
data_set = data_set.where(state: get_params.state) if get_params.state
data_set.take
data_set.first ? to_job_row_from_model(data_set.first) : nil
end

def job_insert(insert_params)
to_job_row(RiverJob.create(insert_params_to_hash(insert_params)))
to_job_row_from_model(RiverJob.create(insert_params_to_hash(insert_params)))
end

def job_insert_unique(insert_params, unique_key)
res = RiverJob.upsert(
insert_params_to_hash(insert_params).merge(unique_key: unique_key),
on_duplicate: Arel.sql("kind = EXCLUDED.kind"),
returning: Arel.sql("*, (xmax != 0) AS unique_skipped_as_duplicate"),

# It'd be nice to specify this as `(kind, unique_key) WHERE unique_key
# IS NOT NULL` like we do elsewhere, but in its pure ingenuity, fucking
# ActiveRecord tries to look up a unique index instead of letting
# Postgres handle that, and of course it doesn't support a `WHERE`
# clause. The workaround is to target the index name instead of columns.
unique_by: "river_job_kind_unique_key_idx"
)

[to_job_row_from_raw(res), res.send(:hash_rows)[0]["unique_skipped_as_duplicate"]]
end

def job_insert_many(insert_params_many)
RiverJob.insert_all(insert_params_many.map { |p| insert_params_to_hash(p) })
insert_params_many.count
end

def job_list
data_set = RiverJob.order(:id)
data_set.all.map { |job| to_job_row_from_model(job) }
end

def rollback_exception
::ActiveRecord::Rollback
end

def transaction(&)
::ActiveRecord::Base.transaction(requires_new: true, &)
end
Expand All @@ -72,21 +108,18 @@ def transaction(&)
}.compact
end

# Type type injected to this method is not a `RiverJob`, but rather a raw
# hash with stringified keys because we're inserting with the Arel framework
# directly rather than generating a record from a model.
private def to_job_row(river_job)
private def to_job_row_from_model(river_job)
# needs to be accessed through values because `errors` is shadowed by both
# ActiveRecord and the patch above
errors = river_job.attributes["errors"]

River::JobRow.new(
id: river_job.id,
args: river_job.args ? JSON.parse(river_job.args) : nil,
args: JSON.parse(river_job.args),
attempt: river_job.attempt,
attempted_at: river_job.attempted_at,
attempted_at: river_job.attempted_at&.getutc,
attempted_by: river_job.attempted_by,
created_at: river_job.created_at,
created_at: river_job.created_at.getutc,
errors: errors&.map { |e|
deserialized_error = JSON.parse(e, symbolize_names: true)

Expand All @@ -97,15 +130,59 @@ def transaction(&)
trace: deserialized_error[:trace]
)
},
finalized_at: river_job.finalized_at,
finalized_at: river_job.finalized_at&.getutc,
kind: river_job.kind,
max_attempts: river_job.max_attempts,
metadata: river_job.metadata,
priority: river_job.priority,
queue: river_job.queue,
scheduled_at: river_job.scheduled_at,
scheduled_at: river_job.scheduled_at.getutc,
state: river_job.state,
tags: river_job.tags
tags: river_job.tags,
unique_key: river_job.unique_key
)
end

# This is really awful, but some of ActiveRecord's methods (e.g. `.create`)
# return a model, and others (e.g. `.upsert`) return raw values, and
# therefore this second version from unmarshaling a job row exists. I
# searched long and hard for a way to have the former type of method return
# raw or the latter type of method return a model, but was unable to find
# anything.
private def to_job_row_from_raw(res)
river_job = {}

res.rows[0].each_with_index do |val, i|
river_job[res.columns[i]] = res.column_types[i].deserialize(val)
end

River::JobRow.new(
id: river_job["id"],
args: JSON.parse(river_job["args"]),
attempt: river_job["attempt"],
attempted_at: river_job["attempted_at"]&.getutc,
attempted_by: river_job["attempted_by"],
created_at: river_job["created_at"].getutc,
errors: river_job["errors"]&.map { |e|
deserialized_error = JSON.parse(e)

River::AttemptError.new(
at: Time.parse(deserialized_error["at"]),
attempt: deserialized_error["attempt"],
error: deserialized_error["error"],
trace: deserialized_error["trace"]
)
},
finalized_at: river_job["finalized_at"]&.getutc,
kind: river_job["kind"],
max_attempts: river_job["max_attempts"],
metadata: river_job["metadata"],
priority: river_job["priority"],
queue: river_job["queue"],
scheduled_at: river_job["scheduled_at"].getutc,
state: river_job["state"],
tags: river_job["tags"],
unique_key: river_job["unique_key"]
)
end
end
Expand Down
Loading