-
Notifications
You must be signed in to change notification settings - Fork 92
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Implement total driver system + detach River from pgx/v5
#212
Conversation
772ee03
to
67b1c05
Compare
@bgentry Damn, I know this changeset is absolutely brutal, sorry, but if you want to start mulling it over. |
67b1c05
to
745dce1
Compare
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Phew 😅 I still have quite a few of the larger bits to review in here, but I've cleared off a lot of the higher level parts and all of the tiny stuff. I'll need one more pass to get through this I think.
For now, here are some initial things I noticed / wanted to discuss.
@@ -117,7 +133,178 @@ type ExecutorTx interface { | |||
Rollback(ctx context.Context) error | |||
} | |||
|
|||
// Listner listens for notifications. In Postgres, this is a database connection |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
// Listner listens for notifications. In Postgres, this is a database connection | |
// Listener listens for notifications. In Postgres, this is a database connection |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thx, fixed.
@@ -332,7 +331,7 @@ var ( | |||
// ErrNotFound is returned when a query by ID does not match any existing | |||
// rows. For example, attempting to cancel a job that doesn't exist will | |||
// return this error. | |||
ErrNotFound = errors.New("not found") | |||
ErrNotFound = rivertype.ErrNotFound |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Got thrown off at first trying to figure out how you were still returning the top level ErrNotFound
but now I see it. I think this makes sense—if it's the same actual error value across packages then it will always be considered equal and errors.Is()
checks will succeed, right?
Only downside is the extra layer of indirection figuring out what the aliased value actually is, but I don't think that matters too much in the case of an error like this.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yeah, errors.Is
still works because it's a type alias (with =
).
I would've slightly preferred to not have it at the top-level river
and only be in rivertype
, but added this alias for backwards compatibility (although this is a change that maybe we could get away with). It needs to be in rivertype
so lower level packages can reference it.
if err != nil { | ||
return nil, err | ||
} | ||
return c.jobCancel(ctx, c.driver.UnwrapExecutor(tx), jobID) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I love that these are refactored to share all their logic, thank you!
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Do you think it makes sense for this to be tucked away into a driver-specific package? Is the idea that the serialization & encoding is driver-specific?
The reason I'm asking is because some recent UI work has made me think about whether we might want to stuff more info in here. For example at
gives us just the time the error occurred, but we don't have any info about how long it took for the job to run on that attempt. We have to be mindful of adding too much in here but there might still be some additional bits worth the overhead.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Nope. The fact that it's in the driver is purely a holdover from the sqlc.yaml
type override that was already in there. I'm totally good with errors being pushed up a level — it'd remove duplicate code at the driver layer that's not very valuable right now.
} | ||
|
||
func (s *Scheduler) runOnce(ctx context.Context) (*schedulerRunOnceResult, error) { | ||
res := &schedulerRunOnceResult{} | ||
|
||
for { | ||
// Wrapped in a function so that defers run as expected. | ||
numScheduled, err := func() (int64, error) { | ||
numScheduled, err := func() (int, error) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
oh come on, you're not gonna schedule 2.1 billion jobs in one query?? 😢
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Haha, I figure anyone big and powerful enough to be scheduling 2.1B jobs will also be rich enough to afford 64-bit arch!
job_list_params.go
Outdated
// Kinds returns an updated filter set that will only return jobs of the given | ||
// kinds. | ||
func (p *JobListParams) Kinds(kinds ...string) *JobListParams { | ||
result := p.copy() | ||
result.kinds = make([]string, 0, len(kinds)) | ||
copy(result.kinds, kinds) | ||
return result | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
A bit hard to tell, but I think this addition might be missing test coverage?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yeah, added. And actually, it didn't work :)
TBF, I'd copied the implementation from Queues
, which also didn't have a test, and also didn't work. Corrected that one too, and put a note in the changelog.
|
||
require github.com/jackc/pgx/v5 v5.5.0 | ||
replace github.com/riverqueue/river/rivertype => ../rivertype |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Is this meant to stay in here?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yeah, I believe this is right. All the pgx stuff disappears, but we still need a reference to rivertype
for structs like JobRow
.
# This one is necessary because `args` is nullable (this seems to have | ||
# been an oversight, but one we're determined isn't worth correcting | ||
# for now), and the `database/sql` variant of sqlc will give it a | ||
# crazy type by default, so here we give it something more reasonable. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
To be fixed 🔜
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yes! Soon.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Can this not go in a shared util package somewhere?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Ugh, it could be, but we'd have to Go module-ify river/internal/util
:/
But actually, since it was just this one function that made it into riverdriverutil
, I just removed this package and duplicated the helper into each driver as an internal mapSlice
. I think this is a little better since I don't love that this new utility package.
If more utilities need to be shared later on, we can revisit this later.
745dce1
to
19156b4
Compare
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Well, gotta be the easiest > 10k line PR I've ever reviewed 🤣 Really great work on this. It might help to push any subsequent changes into additional commits to make them even easier to re-review. Most of this commentary is pretty minor, should be good to go after this 👏
-- name: JobGetByKindMany :many | ||
SELECT * | ||
FROM river_job | ||
WHERE kind = any(@kind::text[]) | ||
ORDER BY id; | ||
|
||
-- name: JobGetByKindAndUniqueProperties :one |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Missed the alphabetization on these two
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thx. Fixed.
var driver TDriver | ||
jobRow, err := driver.UnwrapExecutor(tx).JobSetStateIfRunning(ctx, riverdriver.JobSetStateCompleted(job.ID, time.Now())) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This is a small behavioral change due to the differences in the underlying queries, but I don't think it's a big issue. You really shouldn't be using this to set a job as complete if it's not actually running.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yeah true, but I think you're right in that it won't matter. If being called in a Work
function then the job should be running unless something weird is going on. Feels good to get rid of a big SQL blob.
|
||
func jobRowFromInternal(internal *dbsqlc.RiverJob) *rivertype.JobRow { | ||
return &rivertype.JobRow{ | ||
ID: internal.ID, |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
May as well fully sort this struct declaration now, same for migrationFromInternal
below
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think it is. I'm sort of using the convention of ID
at the top, and then alphabetical from there on out.
t.Run("JobGetByIDMany", func(t *testing.T) { | ||
t.Parallel() | ||
}) | ||
|
||
t.Run("JobGetByKindAndUniqueProperties", func(t *testing.T) { | ||
t.Parallel() | ||
}) | ||
|
||
t.Run("JobGetByKindMany", func(t *testing.T) { | ||
t.Parallel() | ||
}) | ||
|
||
t.Run("JobGetStuck", func(t *testing.T) { | ||
t.Parallel() | ||
}) | ||
|
||
t.Run("JobInsert", func(t *testing.T) { | ||
t.Parallel() | ||
}) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Are these still meant to be empty? same comment for the other empty ones in here
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yep, caught me. These were for functions where there were no equivalents from adapter_test.go
, and I'd refactored so many tests already that I'd left these for an exercise for the future.
That said, had some time this evening, so I filled them all in. The test bootstrap here should actually be pretty good for testing a theoretical future driver now, even without having to put it in a real client. I also found a couple things that could be considered bugs, like some timestamps not being returned back as UTC()
when they probably should've been.
You might want to go through eventually and double-check that you're happy with all the conditions being checked. A few could probably use a couple more cases.
532a05c
to
16fd58f
Compare
A small one to upgrade: * golangci-lint GitHub Action from `v4` to `v5` * golangci-lint from 1.55 to 1.56 A small change, but I upgraded locally at some point, and it's causing a build issue in #212. The `tparallel` lint previously had a bug where it'd want you to put a `t.Parallel()` on test cases that use `t.Setenv()`, evening though use of `t.Parallel()` with `t.Setenv()` is disallowed by Go's test framework, so you'd have to mark those test cases with `nolint`. That bug's been fixed now, and now the linter detects that the `nolint` is no longer necessary, and automatically strips it out with my upgraded golangci-lint version, causing a failure when I push to CI. The `v5` GitHub Action is also somewhat nice because it stops using a deprecated NodeJS version, so produces fewer warnings in the log.
A small one to upgrade: * golangci-lint GitHub Action from `v4` to `v5` * golangci-lint from 1.55 to 1.56 A small change, but I upgraded locally at some point, and it's causing a build issue in #212. The `tparallel` lint previously had a bug where it'd want you to put a `t.Parallel()` on test cases that use `t.Setenv()`, evening though use of `t.Parallel()` with `t.Setenv()` is disallowed by Go's test framework, so you'd have to mark those test cases with `nolint`. That bug's been fixed now, and now the linter detects that the `nolint` is no longer necessary, and automatically strips it out with my upgraded golangci-lint version, causing a failure when I push to CI. The `v5` GitHub Action is also somewhat nice because it stops using a deprecated NodeJS version, so produces fewer warnings in the log.
A small one to upgrade: * golangci-lint GitHub Action from `v4` to `v5` * golangci-lint from 1.55 to 1.56 A small change, but I upgraded locally at some point, and it's causing a build issue in #212. The `tparallel` lint previously had a bug where it'd want you to put a `t.Parallel()` on test cases that use `t.Setenv()`, evening though use of `t.Parallel()` with `t.Setenv()` is disallowed by Go's test framework, so you'd have to mark those test cases with `nolint`. That bug's been fixed now, and now the linter detects that the `nolint` is no longer necessary, and automatically strips it out with my upgraded golangci-lint version, causing a failure when I push to CI. The `v5` GitHub Action is also somewhat nice because it stops using a deprecated NodeJS version, so produces fewer warnings in the log.
16fd58f
to
f3637b9
Compare
A small one to upgrade: * golangci-lint GitHub Action from `v4` to `v5` * golangci-lint from 1.55 to 1.56 A small change, but I upgraded locally at some point, and it's causing a build issue in #212. The `tparallel` lint previously had a bug where it'd want you to put a `t.Parallel()` on test cases that use `t.Setenv()`, even though use of `t.Parallel()` with `t.Setenv()` is disallowed by Go's test framework, so you'd have to mark those test cases with `nolint`. That bug's been fixed now, and now the linter detects that the `nolint` is no longer necessary, and automatically strips it out with my upgraded golangci-lint version, causing a failure when I push to CI. The `v5` GitHub Action is also somewhat nice because it stops using a deprecated NodeJS version, so produces fewer warnings in the log.
e059336
to
bcff31c
Compare
@bgentry Thanks for the great feedback here. After one more major push tonight, I think I was able to address all feedback, and have responded to all comments above. (And damn, just responding to the review of this PR felt like a big project — ty for writing all that up.) 95% of my responses are agreeing + fixing, but you should probably take another quick scan through anyway. The lint CI is failing because of the problem described in #223, but the build should be good aside from that. The suite's picked up another few dozen tests since the last time you looked even, so I'm feeling pretty good about it. Also added changelog notes for everything. |
5f8d648
to
5fe26b9
Compare
A small one to upgrade: * golangci-lint GitHub Action from `v4` to `v5` * golangci-lint from 1.55 to 1.56 A small change, but I upgraded locally at some point, and it's causing a build issue in #212. The `tparallel` lint previously had a bug where it'd want you to put a `t.Parallel()` on test cases that use `t.Setenv()`, even though use of `t.Parallel()` with `t.Setenv()` is disallowed by Go's test framework, so you'd have to mark those test cases with `nolint`. That bug's been fixed now, and now the linter detects that the `nolint` is no longer necessary, and automatically strips it out with my upgraded golangci-lint version, causing a failure when I push to CI. The `v5` GitHub Action is also somewhat nice because it stops using a deprecated NodeJS version, so produces fewer warnings in the log.
0dcbca0
to
5749b1c
Compare
5749b1c
to
55183b5
Compare
} | ||
|
||
t.Run("FiltersByKind", func(t *testing.T) { //nolint:dupl |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
heh, this linter can get pretty annoying in tests.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yeah ... we might want to just disable it. So far I've never seen a situation where it's done anything useful.
require.Equal(t, []int64{job3.ID}, sliceutil.Map(jobs, func(job *rivertype.JobRow) int64 { return job.ID })) | ||
}) | ||
|
||
t.Run("FiltersByQueue", func(t *testing.T) { //nolint:dupl |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks for adding this and fixing the issue 🙏
|
||
exec, _ := setupExecutor(ctx, t, driver, beginTx) | ||
|
||
job, err := exec.JobGetByID(ctx, 99999) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
realized from some of our other tests that we can just use 0 here and have no possibility of a flaky test ever
job, err := exec.JobGetByID(ctx, 99999) | |
job, err := exec.JobGetByID(ctx, 0) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Sweet fixed. There were a total of four instances of 9999 lookups between the riverdrivertest stuff I added and client_test. Updated all of them.
Previously, we implement a basic driver interface in River such that no exported River APIs depended on `pgx/v5` directly, with the idea being that although only one database driver was actually supported, it'd give us the leeway we needed to fully implement a multi-driver system later. Under the covers, the drivers were a hack, and `GetDBPool` just returned the wrapped pgx database pool back to the client and other packages that wanted to access it. This change takes the initial driver push and carries it all the way. The driver API becomes fully fleshed out with all database operations River needs to operate, and the top-level River module drops internal use of `pgx/v5` completely (it's still used in tests, but no non-test packages). If `pgx/v6` were to be released tomorrow, this would allow us to add support for it quite easily. It also allows us to theoretically fully implement the `databasesql` driver, or to implement drivers for other databases like MySQL and SQLite (it'd be work to get that done, but the driver API is generic enough to make it possible). Notably, the `dbadapter` package goes away because the River driver API becomes a very similar concept to it, with interface functions for major operations. A difference is that given we'd like to implement multiple drivers, the driver code should stay as lean as possible so that we don't have to reimplement it over and over again, so any higher level logic that could be extracted from `StandardAdapter` was, including: * About half of the list logic was already in `dblist`, but we move the other half too so that all list logic is now packaged together. * Uniqueness logic moves to a new `dbunique` package. `dbadapter` tests move into a new module `riverdrivertest` that provides some helpers that easily allow us to run the full barrage against `riverpgxv5`, but also any new drivers that we might add in the future. (`riverdatabasesql` is also tested, but uses a different helper that only checks its support for migrations.) Possibly the trickiest incision was moving listen/notify logic over to the driver, which picks up a new `GetListener()` function that returns this `Listener` interface: // Listner listens for notifications. In Postgres, this is a database connection // where `LISTEN` has been run. // // API is not stable. DO NOT IMPLEMENT. type Listener interface { Close(ctx context.Context) error Connect(ctx context.Context) error Listen(ctx context.Context, topic string) error Ping(ctx context.Context) error Unlisten(ctx context.Context, topic string) error WaitForNotification(ctx context.Context) (*Notification, error) } This is backed by a connection pool and `LISTEN` for `riverpgxv5`, but the idea is that for databases that don't support listen/notify, it's generic enough that it could be reimplemented as something else under the hood, like say an unlogged table of some sort. `river/riverdriver` becomes its own Go module. Previously, we'd gotten away without referencing it from driver implementations, but the implementations now reference it for parameter types and some interfaces and return values. Like most other modules, modules that need to reference `riverdriver` use `replace` in their `go.mod`s, so I don't expect the extra module to add any extra headache to our releases. Discerning readers may notice that driver implementations reference some types in `riverdriver` and other types in `rivertypes`. I'm defining which is which as: `riverdriver` should contain anything unstable that we're allowed to change and which a user shouldn't be accessing anyway (e.g. say function parameter structs). `rivertypes` is for specific constructs that external River users will need to reference like `JobRow`. Lastly, I'll say that this isn't final. I've refactored a lot of tests to ensure that this patch doesn't have any major outstanding TODOs and doesn't leave the code any worse that when it started, but there are a lot of follow up improvements that I can potentially make, and expect to.
55183b5
to
501c484
Compare
TY man! |
As of #212, `rivertype` became a separate Go module from the mainline `river`. Here, add it to the release instructions so that it gets properly tagged along with other all other modules. Also add it to the `update-submodule-versions` program. This isn't strictly necessary because `rivertype` doesn't have any submodule versions to update, but it feels like it should be there for completeness just in case, and also because it makes the list in the program a complete list of all Go modules in the project.
As of #212, `rivertype` became a separate Go module from the mainline `river`. Here, add it to the release instructions so that it gets properly tagged along with other all other modules. Also add it to the `update-submodule-versions` program. This isn't strictly necessary because `rivertype` doesn't have any submodule versions to update, but it feels like it should be there for completeness just in case, and also because it makes the list in the program a complete list of all Go modules in the project.
Previously, we implement a basic driver interface in River such that no
exported River APIs depended on
pgx/v5
directly, with the idea beingthat although only one database driver was actually supported, it'd give
us the leeway we needed to fully implement a multi-driver system later.
Under the covers, the drivers were a hack, and
GetDBPool
just returnedthe wrapped pgx database pool back to the client and other packages that
wanted to access it.
This change takes the initial driver push and carries it all the way.
The driver API becomes fully fleshed out with all database operations
River needs to operate, and the top-level River module drops internal
use of
pgx/v5
completely (it's still used in tests, but no non-testpackages).
If
pgx/v6
were to be released tomorrow, this would allow us to addsupport for it quite easily. It also allows us to theoretically fully
implement the
databasesql
driver, or to implement drivers for otherdatabases like MySQL and SQLite (it'd be work to get that done, but
the driver API is generic enough to make it possible).
Notably, the
dbadapter
package goes away because the River driver APIbecomes a very similar concept to it, with interface functions for major
operations. A difference is that given we'd like to implement multiple
drivers, the driver code should stay as lean as possible so that we
don't have to reimplement it over and over again, so any higher level
logic that could be extracted from
StandardAdapter
was, including:About half of the list logic was already in
dblist
, but we move theother half too so that all list logic is now packaged together.
Uniqueness logic moves to a new
dbunique
package.dbadapter
tests move into a new moduleriverdrivertest
that providessome helpers that easily allow us to run the full barrage against
riverpgxv5
, but also any new drivers that we might add in the future.(
riverdatabasesql
is also tested, but uses a different helper thatonly checks its support for migrations.)
Possibly the trickiest incision was moving listen/notify logic over to
the driver, which picks up a new
GetListener()
function that returnsthis
Listener
interface:This is backed by a connection pool and
LISTEN
forriverpgxv5
, butthe idea is that for databases that don't support listen/notify, it's
generic enough that it could be reimplemented as something else under
the hood, like say an unlogged table of some sort.
river/riverdriver
becomes its own Go module. Previously, we'd gottenaway without referencing it from driver implementations, but the
implementations now reference it for parameter types and some interfaces
and return values. Like most other modules, modules that need to
reference
riverdriver
usereplace
in theirgo.mod
s, so I don'texpect the extra module to add any extra headache to our releases.
Discerning readers may notice that driver implementations reference some
types in
riverdriver
and other types inrivertypes
. I'm definingwhich is which as:
riverdriver
should contain anything unstable thatwe're allowed to change and which a user shouldn't be accessing anyway
(e.g. say function parameter structs).
rivertypes
is for specificconstructs that external River users will need to reference like
JobRow
.Lastly, I'll say that this isn't final. I've refactored a lot of tests
to ensure that this patch doesn't have any major outstanding TODOs and
doesn't leave the code any worse that when it started, but there are a
lot of follow up improvements that I can potentially make, and expect
to.