Skip to content

Commit

Permalink
fix over-aggressive fetching
Browse files Browse the repository at this point in the history
In ce141fa / #664, the client was tuned in an attempt to make it more
aggressive about fetching more jobs when it has just fetched a full
batch of jobs. Unfortunately, a conditional was missed and it ended up
fetching aggressively any time a single job was fetched.

This adds the correct conditional so that the
`fetchWhenSlotsAreAvailable` flag only gets set when (a) the previous
fetch was full, or (b) when the previous fetch was skipped due to being
already full of jobs.
  • Loading branch information
bgentry committed Nov 4, 2024
1 parent e0441d9 commit 9d2935e
Show file tree
Hide file tree
Showing 2 changed files with 12 additions and 4 deletions.
6 changes: 6 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,12 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0

## [Unreleased]

## [0.14.1] - 2024-11-04

### Fixed

- In [PR #663](https://github.com/riverqueue/river/pull/663) the client was changed to be more aggressive about re-fetching when it had previously fetched a full batch. Unfortunately a clause was missed, which resulted in the client being more aggressive any time even a single job was fetched on the previous attempt. This was corrected with a conditional to ensure it only happens when the last fetch was full. [PR #668](https://github.com/riverqueue/river/pull/668).

## [0.14.0] - 2024-11-03

### Added
Expand Down
10 changes: 6 additions & 4 deletions producer.go
Original file line number Diff line number Diff line change
Expand Up @@ -494,10 +494,12 @@ func (p *producer) innerFetchLoop(workCtx context.Context, fetchResultCh chan pr
} else if len(result.jobs) > 0 {
p.startNewExecutors(workCtx, result.jobs)

// Fetch returned the maximum number of jobs that were requested,
// implying there may be more in the queue. Trigger another fetch when
// slots are available.
p.fetchWhenSlotsAreAvailable = true
if len(result.jobs) == limit {
// Fetch returned the maximum number of jobs that were requested,
// implying there may be more in the queue. Trigger another fetch when
// slots are available.
p.fetchWhenSlotsAreAvailable = true
}
}
return
case result := <-p.jobResultCh:
Expand Down

0 comments on commit 9d2935e

Please sign in to comment.