Skip to content

Commit 5e0e9a2

Browse files
committed
Support fast unique insertion path
Updates the Ruby client to be compatible with the fast unique insertion added to the main River in [1] which uses a unique index instead of advisory lock + fetch as long as uniqueness is constrained to the default set of unique job states. We also reorganize the driver tests such that the majority of the tests are put in a single set of shared examples, largely so that ActiveRecord and Sequel aren't so duplicative of each other, and so we can easily add new tests for all drivers in one place. Lastly, I killed the mock driver in use at the top level. Adding anything new required all kinds of engineering around it, and I found lots of test bugs that were the result of imperfect mocking that wasn't fully checking the client end to end. [1] riverqueue/river#451
1 parent 29b8e4f commit 5e0e9a2

File tree

13 files changed

+851
-921
lines changed

13 files changed

+851
-921
lines changed

CHANGELOG.md

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -7,6 +7,10 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0
77

88
## [Unreleased]
99

10+
### Changed
11+
12+
- Now compatible with "fast path" unique job insertion that uses a unique index instead of advisory lock and fetch [as introduced in River #451](https://github.com/riverqueue/river/pull/451). [PR #XXX](https://github.com/riverqueue/riverqueue-ruby/pull/XXX).
13+
1014
## [0.6.1] - 2024-08-21
1115

1216
### Fixed

Gemfile

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -11,5 +11,6 @@ group :test do
1111
gem "debug"
1212
gem "rspec-core"
1313
gem "rspec-expectations"
14+
gem "riverqueue-sequel", path: "driver/riverqueue-sequel"
1415
gem "simplecov", require: false
1516
end

Gemfile.lock

Lines changed: 11 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -3,6 +3,13 @@ PATH
33
specs:
44
riverqueue (0.6.1)
55

6+
PATH
7+
remote: driver/riverqueue-sequel
8+
specs:
9+
riverqueue-sequel (0.6.1)
10+
pg (> 0, < 1000)
11+
sequel (> 0, < 1000)
12+
613
GEM
714
remote: https://rubygems.org/
815
specs:
@@ -50,6 +57,7 @@ GEM
5057
parser (3.3.0.5)
5158
ast (~> 2.4.1)
5259
racc
60+
pg (1.5.6)
5361
psych (5.1.2)
5462
stringio
5563
racc (1.7.3)
@@ -89,6 +97,8 @@ GEM
8997
rubocop-ast (>= 1.30.0, < 2.0)
9098
ruby-progressbar (1.13.0)
9199
securerandom (0.3.1)
100+
sequel (5.79.0)
101+
bigdecimal
92102
simplecov (0.22.0)
93103
docile (~> 1.1)
94104
simplecov-html (~> 0.11)
@@ -137,6 +147,7 @@ PLATFORMS
137147
DEPENDENCIES
138148
debug
139149
riverqueue!
150+
riverqueue-sequel!
140151
rspec-core
141152
rspec-expectations
142153
simplecov

Steepfile

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -3,6 +3,7 @@ D = Steep::Diagnostic
33
target :lib do
44
check "lib"
55

6+
library "digest"
67
library "json"
78
library "time"
89

driver/riverqueue-activerecord/lib/driver.rb

Lines changed: 89 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -33,6 +33,16 @@ def errors = {}
3333

3434
def advisory_lock(key)
3535
::ActiveRecord::Base.connection.execute("SELECT pg_advisory_xact_lock(#{key})")
36+
nil
37+
end
38+
39+
def advisory_lock_try(key)
40+
::ActiveRecord::Base.connection.execute("SELECT pg_try_advisory_xact_lock(123)").first["pg_try_advisory_xact_lock"]
41+
end
42+
43+
def job_get_by_id(id)
44+
data_set = RiverJob.where(id: id)
45+
data_set.first ? to_job_row_from_model(data_set.first) : nil
3646
end
3747

3848
def job_get_by_kind_and_unique_properties(get_params)
@@ -41,18 +51,44 @@ def job_get_by_kind_and_unique_properties(get_params)
4151
data_set = data_set.where(args: get_params.encoded_args) if get_params.encoded_args
4252
data_set = data_set.where(queue: get_params.queue) if get_params.queue
4353
data_set = data_set.where(state: get_params.state) if get_params.state
44-
data_set.take
54+
data_set.first ? to_job_row_from_model(data_set.first) : nil
4555
end
4656

4757
def job_insert(insert_params)
48-
to_job_row(RiverJob.create(insert_params_to_hash(insert_params)))
58+
to_job_row_from_model(RiverJob.create(insert_params_to_hash(insert_params)))
59+
end
60+
61+
def job_insert_unique(insert_params, unique_key)
62+
res = RiverJob.upsert(
63+
insert_params_to_hash(insert_params).merge(unique_key: unique_key),
64+
on_duplicate: Arel.sql("kind = EXCLUDED.kind"),
65+
returning: Arel.sql("*, (xmax != 0) AS unique_skipped_as_duplicate"),
66+
67+
# It'd be nice to specify this as `(kind, unique_key) WHERE unique_key
68+
# IS NOT NULL` like we do elsewhere, but in its pure ingenuity, fucking
69+
# ActiveRecord tries to look up a unique index instead of letting
70+
# Postgres handle that, and of course it doesn't support a `WHERE`
71+
# clause.
72+
unique_by: "river_job_kind_unique_key_idx"
73+
)
74+
75+
[to_job_row_from_raw(res), res.send(:hash_rows)[0]["unique_skipped_as_duplicate"]]
4976
end
5077

5178
def job_insert_many(insert_params_many)
5279
RiverJob.insert_all(insert_params_many.map { |p| insert_params_to_hash(p) })
5380
insert_params_many.count
5481
end
5582

83+
def job_list
84+
data_set = RiverJob.order(:id)
85+
data_set.all.map { |job| to_job_row_from_model(job) }
86+
end
87+
88+
def rollback_exception
89+
::ActiveRecord::Rollback
90+
end
91+
5692
def transaction(&)
5793
::ActiveRecord::Base.transaction(requires_new: true, &)
5894
end
@@ -72,21 +108,18 @@ def transaction(&)
72108
}.compact
73109
end
74110

75-
# Type type injected to this method is not a `RiverJob`, but rather a raw
76-
# hash with stringified keys because we're inserting with the Arel framework
77-
# directly rather than generating a record from a model.
78-
private def to_job_row(river_job)
111+
private def to_job_row_from_model(river_job)
79112
# needs to be accessed through values because `errors` is shadowed by both
80113
# ActiveRecord and the patch above
81114
errors = river_job.attributes["errors"]
82115

83116
River::JobRow.new(
84117
id: river_job.id,
85-
args: river_job.args ? JSON.parse(river_job.args) : nil,
118+
args: JSON.parse(river_job.args),
86119
attempt: river_job.attempt,
87-
attempted_at: river_job.attempted_at,
120+
attempted_at: river_job.attempted_at&.getutc,
88121
attempted_by: river_job.attempted_by,
89-
created_at: river_job.created_at,
122+
created_at: river_job.created_at.getutc,
90123
errors: errors&.map { |e|
91124
deserialized_error = JSON.parse(e, symbolize_names: true)
92125

@@ -97,16 +130,61 @@ def transaction(&)
97130
trace: deserialized_error[:trace]
98131
)
99132
},
100-
finalized_at: river_job.finalized_at,
133+
finalized_at: river_job.finalized_at&.getutc,
101134
kind: river_job.kind,
102135
max_attempts: river_job.max_attempts,
103136
metadata: river_job.metadata,
104137
priority: river_job.priority,
105138
queue: river_job.queue,
106-
scheduled_at: river_job.scheduled_at,
139+
scheduled_at: river_job.scheduled_at.getutc,
107140
state: river_job.state,
108141
tags: river_job.tags
109142
)
110143
end
144+
145+
# This is really awful, but some of ActiveRecord's methods (e.g. `.create`)
146+
# return a model, and others (e.g. `upsert`) return raw values, and
147+
# therefore this second version from unmarshaling a job row exists. I
148+
# searched long and hard for a way to have the former type of method return
149+
# raw or the latter type of method return a model, but was unable to find
150+
# anything amongst ActiveRecord's thousands of lines of incomprehensible
151+
# source code with its layers of abstraction so deep that it's borderline
152+
# impossible to find any code that's actually doing anything useful amongst
153+
# them. What an absolute blight of a framework.
154+
private def to_job_row_from_raw(res)
155+
river_job = {}
156+
157+
res.rows[0].each_with_index do |val, i|
158+
river_job[res.columns[i]] = res.column_types[i].deserialize(val)
159+
end
160+
161+
River::JobRow.new(
162+
id: river_job["id"],
163+
args: JSON.parse(river_job["args"]),
164+
attempt: river_job["attempt"],
165+
attempted_at: river_job["attempted_at"]&.getutc,
166+
attempted_by: river_job["attempted_by"],
167+
created_at: river_job["created_at"].getutc,
168+
errors: river_job["errors"]&.map { |e|
169+
deserialized_error = JSON.parse(e)
170+
171+
River::AttemptError.new(
172+
at: Time.parse(deserialized_error["at"]),
173+
attempt: deserialized_error["attempt"],
174+
error: deserialized_error["error"],
175+
trace: deserialized_error["trace"]
176+
)
177+
},
178+
finalized_at: river_job["finalized_at"]&.getutc,
179+
kind: river_job["kind"],
180+
max_attempts: river_job["max_attempts"],
181+
metadata: river_job["metadata"],
182+
priority: river_job["priority"],
183+
queue: river_job["queue"],
184+
scheduled_at: river_job["scheduled_at"].getutc,
185+
state: river_job["state"],
186+
tags: river_job["tags"]
187+
)
188+
end
111189
end
112190
end

0 commit comments

Comments
 (0)