-
Notifications
You must be signed in to change notification settings - Fork 327
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
chore: improve batch router #5677
base: master
Are you sure you want to change the base?
Conversation
ab51e7c
to
88eb6d2
Compare
88eb6d2
to
1c01dd2
Compare
dc37bb1
to
9000924
Compare
…r into chore.improveBRT
…r into chore.improveBRT
7078513
to
5389eff
Compare
55b5da4
to
1a2e697
Compare
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Pull Request Overview
This PR improves the batch router by enhancing logging, introducing channel-based job buffering, and updating test timeouts for reliability.
- Added goroutine ID logging in workers
- Refactored job processing to use a job buffer with dedicated consumers
- Adjusted test timeout durations in both unit and integration tests
Reviewed Changes
Copilot reviewed 8 out of 8 changed files in this pull request and generated 1 comment.
Show a summary per file
File | Description |
---|---|
utils/workerpool/internal_worker.go | Added goroutine ID logging and helper function for retrieving goid |
router/batchrouter/worker.go | Updated worker logging and refactored job processing to use buffers |
router/batchrouter/job_buffer.go | Introduced channel-based job buffering and new consumer routines |
router/batchrouter/handle_observability.go | Revised metrics collection with context cancellation in select block |
router/batchrouter/handle_lifecycle.go | Initialized job buffer and updated shutdown logic for timers |
router/batchrouter/handle.go | Added job buffer field and improved job status update with sorting |
router/batchrouter/batchrouter_test.go | Increased test timeouts in batch router tests |
integration_test/docker_test/docker_test.go | Increased test timeouts to accommodate slower integration test scenarios |
Comments suppressed due to low confidence (2)
utils/workerpool/internal_worker.go:91
- The goid() function is duplicated here and in other packages; consider extracting it into a common utility to avoid code duplication.
func goid() int {
router/batchrouter/worker.go:37
- The goid() function is duplicated in multiple files; refactor this logic into a shared utility module to improve maintainability.
func goid() int {
router/batchrouter/handle.go
Outdated
|
||
// Sort jobs by ID to ensure consistent processing order | ||
slices.SortFunc(batchJobs.Jobs, func(a, b *jobsdb.JobT) int { | ||
return int(a.JobID - b.JobID) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Converting the difference of two int64 JobIDs to an int may lead to overflow or unexpected behavior; consider using an explicit comparison returning -1, 0, or 1.
return int(a.JobID - b.JobID) | |
if a.JobID < b.JobID { | |
return -1 | |
} else if a.JobID > b.JobID { | |
return 1 | |
} else { | |
return 0 | |
} |
Copilot is powered by AI, so mistakes are possible. Review output carefully before use.
d152210
to
3ffaa26
Compare
3ffaa26
to
3da0a05
Compare
8dae67d
to
35c4865
Compare
@coderabbitai full review |
WalkthroughThe pull request increases timeout durations in several test assertions and refactors various components of the batch router. In the tests, the wait durations in Changes
✨ Finishing Touches
Thanks for using CodeRabbit! It's free for OSS, and your support helps us grow. If you like it, consider giving us a shout-out. 🪧 TipsChatThere are 3 ways to chat with CodeRabbit:
Note: Be mindful of the bot's finite context window. It's strongly recommended to break down tasks such as reading entire modules into smaller chunks. For a focused discussion, use review comments to chat about specific files and their changes, instead of using the PR comments. CodeRabbit Commands (Invoked using PR comments)
Other keywords and placeholders
CodeRabbit Configuration File (
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Actionable comments posted: 2
♻️ Duplicate comments (1)
router/batchrouter/handle.go (1)
627-630
:⚠️ Potential issueFix potential integer overflow in sort function
The current implementation may lead to integer overflow when comparing large int64 JobIDs and converting their difference to an int type. This is especially problematic on 32-bit systems where int is only 32 bits.
Apply this safer implementation:
-slices.SortFunc(batchJobs.Jobs, func(a, b *jobsdb.JobT) int { - return int(a.JobID - b.JobID) -}) +slices.SortFunc(batchJobs.Jobs, func(a, b *jobsdb.JobT) int { + if a.JobID < b.JobID { + return -1 + } else if a.JobID > b.JobID { + return 1 + } else { + return 0 + } +})
🧹 Nitpick comments (11)
router/batchrouter/worker.go (4)
47-47
: Prefer verifying workerJob data before routing.
Before callingrouteJobsToBuffer(workerJob)
, it might be beneficial to log or validate the structure ofworkerJob
(e.g., ensuringdestWithSources
andjobs
fields are non-empty), to catch unexpected nil references or empty slices quickly.
49-49
: Add context to the log message.
The log message is helpful, but consider including the length ofworkerJobs
or other relevant details to facilitate debugging.- brt.logger.Infof("returning true for worker %s", w.partition) + brt.logger.Infof("returning true for worker %s; processed %d job groups", w.partition, len(workerJobs))
53-62
: Clarify or rename local variables for better readability.
In this block, variables such asstatusList
,drainList
, andjobIDConnectionDetailsMap
are meaningful but might benefit from descriptive comments to clarify their role in the job buffering flow.
138-154
: Handle possible concurrency or transaction failures gracefully.
IfUpdateJobStatus
fails after retries, logging a panic is valid for irrecoverable scenarios. However, confirm that higher-level logic can recover properly from such a panic, or consider using a less drastic approach for production resilience.router/batchrouter/handle_lifecycle.go (4)
174-176
: Review asynchronous manager usage.
Starting the async destination manager here is good. If there are any synchronous destinations that also require special handling, confirm that they won't break by skippingstartAsyncDestinationManager()
. Add a log to confirm the asynchronous manager's readiness.
178-181
: Maintain resilience on backend configuration errors.
The backend config subscriber goroutine can face transient issues if the configuration reading fails. Consider adding robust error-handling or retry logic aroundbrt.backendConfigSubscriber()
, if not already present, to avoid silent failures.
272-275
: Include context about which goroutines are stopping.
When initiating shutdown, it’s helpful to specify which background goroutines (async manager, job consumer, etc.) are being stopped. This clarifies the logs.- brt.logger.Info("Initiating batch router shutdown") + brt.logger.Infof("Initiating batch router shutdown; stopping background goroutines for destType: %s", brt.destType)
288-288
: Shutdown log can be combined with final resource usage stats.
Consider adding stats for how many job channels or timers were cleaned up, or how many pending jobs remain, so the final log is more actionable.router/batchrouter/job_buffer.go (3)
16-22
: Ensure concurrency for map usage.
sourceDestMap
anduploadTimers
share a single Mutexmu
. If usage grows large or if contention becomes high, evaluate whether separate locks or concurrent data structures are needed.
24-31
: Enhance stat tagging with workspace details if available.
Including the workspace ID in the stat tags (if it’s easily retrievable here) can aid in diagnosing multi-workspace scenarios.
38-62
: Invert flow to reduce time under write lock.
Whench
does not exist, you re-lock with a write lock. This is correct but can slightly hinder concurrency if many readers are contending. Consider an approach that uses double-checked locking more selectively or a sync.Map structure.
📜 Review details
Configuration used: CodeRabbit UI
Review profile: CHILL
Plan: Pro
📒 Files selected for processing (7)
integration_test/docker_test/docker_test.go
(3 hunks)router/batchrouter/batchrouter_test.go
(1 hunks)router/batchrouter/handle.go
(2 hunks)router/batchrouter/handle_lifecycle.go
(4 hunks)router/batchrouter/handle_observability.go
(1 hunks)router/batchrouter/job_buffer.go
(1 hunks)router/batchrouter/worker.go
(1 hunks)
🧰 Additional context used
🧬 Code Definitions (2)
router/batchrouter/worker.go (3)
router/batchrouter/types.go (2)
DestinationJobs
(18-21)ErrorResponse
(50-52)jobsdb/jobsdb.go (3)
ParameterFilterT
(551-554)JobStatusT
(363-374)JobT
(400-413)router/utils/utils.go (3)
DrainStats
(41-45)DRAIN_ERROR_CODE
(21-21)JobParameters
(53-72)
router/batchrouter/job_buffer.go (1)
router/batchrouter/handle.go (1)
Handle
(54-154)
🔇 Additional comments (12)
router/batchrouter/batchrouter_test.go (1)
668-668
: Increased test timeout improves reliabilityThe test timeout has been increased from 5 seconds to 2 minutes, which will allow the test to better handle systems with slower resource access or higher load. This change aligns with similar timeout increases elsewhere in the codebase and should help reduce flaky test failures.
router/batchrouter/handle.go (1)
112-113
: New jobBuffer field for channel-based job bufferingThis introduces a new
jobBuffer
field to theHandle
struct that will be used for job buffering. This is part of the architecture improvements mentioned in the PR objectives.integration_test/docker_test/docker_test.go (1)
178-178
: Increased test timeouts improve reliabilityThe test timeouts for database operations have been increased from 1 minute to 2 minutes across multiple test assertions. This change makes the tests more robust against system variations and slower CI environments, reducing the likelihood of flaky test failures due to timing issues.
Also applies to: 183-183, 197-197, 202-202, 216-216, 221-221
router/batchrouter/handle_observability.go (1)
32-54
: Improved metrics collection efficiencyThis change optimizes the metrics collection process by moving it inside the ticker's case block. Now metrics will only be processed when the diagnosis ticker ticks, rather than potentially at every loop iteration. This will reduce unnecessary processing and improve overall performance of the batch router.
router/batchrouter/worker.go (2)
64-136
: Validate drain condition logic thoroughly.
The logic that appends jobs todrainList
vs.statusList
is critical. Ensure thorough testing of corner cases, such as whendrainer.Drain
can return true for borderline conditions, and confirm that partial drains (e.g., only one job out of the batch is drained) work as intended. Consider adding unit tests or test logs for these situations.
156-196
: Optimize drain job handling.
Storing drained jobs intoErrorDB
is a key part of the flow. Confirm that subsequent steps, such as re-processing or reporting metrics for these aborted jobs, are consistent with the rest of the pipeline. Additionally, ensure that thependingEventsRegistry.DecreasePendingEvents
call is always accurate in multi-worker scenarios.router/batchrouter/handle_lifecycle.go (3)
165-170
: Confirm buffer initialization settings align with concurrency limits.
Ensure that thesourceDestMap
anduploadTimers
do not become overly large when there are many sources or destinations, and confirm that the chosen buffer or timer defaults can handle worst-case production loads efficiently.
276-285
: Lock hierarchy consideration around timer stopping.
You acquirebrt.jobBuffer.mu.Lock()
and then stop timers. Ensure no other place in the code attempts to stop or reset timers while holding a different lock that could cause a potential deadlock.
340-399
: Remove or handle raw file resources with caution.
When dealing with partially downloaded files in the crash recovery logic, ensure that these temporary files are always removed, even if a panic occurs. The current approach seems correct but carefully recheck early returns or panic paths.router/batchrouter/job_buffer.go (3)
1-14
: Check for potential import duplicates or cleanup.
A quick pass shows most imports are used. However, ensure that none are redundant.
33-36
: Enforce stricter string sanitization on keys.
IfsourceID
ordestID
can have special characters, consider additional sanitization or escaping to avoid collisions or parse issues ingetSourceDestKey
.
63-185
: Review shutdown race conditions with job consumer.
The shutdown path and this consumer both manipulate channels and timers. Confirm no edge case where the consumer goroutine might block indefinitely if the channel is closed before the final batch is processed.
brt.logger.Debug("BRT: Checking for incomplete journal entries to recover from...") | ||
entries := brt.jobsDB.GetJournalEntries(jobsdb.RawDataDestUploadOperation) | ||
for _, entry := range entries { | ||
var object ObjectStorageDefinition | ||
if err := jsonrs.Unmarshal(entry.OpPayload, &object); err != nil { | ||
panic(err) | ||
} | ||
if len(object.Config) == 0 { | ||
// Backward compatibility. If old entries dont have config, just delete journal entry | ||
brt.jobsDB.JournalDeleteEntry(entry.OpID) | ||
continue | ||
} | ||
downloader, err := brt.fileManagerFactory(&filemanager.Settings{ | ||
Provider: object.Provider, | ||
Config: object.Config, | ||
}) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
🛠️ Refactor suggestion
Consider structured error logging for incomplete journal entries.
When unmarshal errors or file manager creation errors occur, a panic can hamper partial recovery. A more structured approach (logging combined with a fallback mechanism) might be beneficial if partial completion is desired.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Actionable comments posted: 1
♻️ Duplicate comments (1)
router/batchrouter/handle.go (1)
627-630
:⚠️ Potential issueFix potential integer overflow in job sorting.
Converting the difference of two int64 JobIDs to an int may lead to overflow or unexpected behavior when the IDs have a large difference between them.
Apply this safer approach that uses explicit comparison:
-slices.SortFunc(batchJobs.Jobs, func(a, b *jobsdb.JobT) int { - return int(a.JobID - b.JobID) -}) +slices.SortFunc(batchJobs.Jobs, func(a, b *jobsdb.JobT) int { + if a.JobID < b.JobID { + return -1 + } else if a.JobID > b.JobID { + return 1 + } else { + return 0 + } +})
🧹 Nitpick comments (4)
router/batchrouter/worker.go (2)
47-49
: Consider using a more descriptive or debug-level log message.
If theWork()
method runs frequently, this info-level log might be too chatty. Switching to debug level or adding more context could improve readability.
53-136
: Validate drain logic and coverage with dedicated tests.
The core drain check and status assignment logic here is critical. Ensuring comprehensive test coverage (especially around edge cases like an empty or unexpectedreason
) would safeguard against regressions.router/batchrouter/handle_lifecycle.go (1)
272-289
: Graceful shutdown is well-structured.
Timers are stopped and goroutines are awaited, preventing race conditions. Consider verifying that job channels automatically drain and no additional cleanup is needed for leftover in-flight jobs.router/batchrouter/job_buffer.go (1)
16-22
: Document concurrency expectations in the JobBuffer struct.
It might help future maintainers to note thatmu
protects bothsourceDestMap
anduploadTimers
, especially for concurrent access across multiple goroutines.
📜 Review details
Configuration used: CodeRabbit UI
Review profile: CHILL
Plan: Pro
📒 Files selected for processing (7)
integration_test/docker_test/docker_test.go
(3 hunks)router/batchrouter/batchrouter_test.go
(1 hunks)router/batchrouter/handle.go
(2 hunks)router/batchrouter/handle_lifecycle.go
(4 hunks)router/batchrouter/handle_observability.go
(1 hunks)router/batchrouter/job_buffer.go
(1 hunks)router/batchrouter/worker.go
(1 hunks)
🧰 Additional context used
🧬 Code Definitions (3)
router/batchrouter/worker.go (3)
router/batchrouter/types.go (2)
DestinationJobs
(18-21)ErrorResponse
(50-52)jobsdb/jobsdb.go (3)
ParameterFilterT
(551-554)JobStatusT
(363-374)JobT
(400-413)router/utils/utils.go (3)
DrainStats
(41-45)DRAIN_ERROR_CODE
(21-21)JobParameters
(53-72)
router/batchrouter/job_buffer.go (1)
router/batchrouter/handle.go (1)
Handle
(54-154)
router/batchrouter/handle_lifecycle.go (4)
router/batchrouter/job_buffer.go (1)
JobBuffer
(17-22)jobsdb/jobsdb.go (1)
JobT
(400-413)router/batchrouter/asyncdestinationmanager/common/utils.go (1)
IsAsyncDestination
(18-20)utils/misc/misc.go (1)
CreateTMPDIR
(211-228)
🔇 Additional comments (9)
router/batchrouter/handle.go (1)
112-112
: New job buffer field added to Handle struct.The addition of the
jobBuffer
field will support channel-based job buffering as indicated in the comment. This is likely part of a larger refactoring to improve job processing efficiency.router/batchrouter/batchrouter_test.go (1)
668-668
: Test timeout increased for reliability.The timeout for the Minio content check was increased from 5 seconds to 2 minutes. This change is reasonable for a batch operation test that may take longer to complete depending on the environment.
router/batchrouter/handle_observability.go (1)
31-54
: Improved metric collection timing.The metric collection logic has been optimized to only execute within the ticker event handler rather than at each loop iteration. This is a good practice that:
- Makes metrics collection more predictable and aligned with the ticker frequency
- Reduces potential overhead by only collecting and reporting metrics at defined intervals
- Maintains the same functionality while improving code organization
integration_test/docker_test/docker_test.go (1)
178-178
: Test timeouts increased for database operations.The timeouts for database operation checks have been increased from 1 minute to 2 minutes, which is appropriate for integration tests that involve database interactions. These operations can be slower in test environments, and the increased timeout provides more reliability without affecting test validity.
Also applies to: 183-183, 197-197, 202-202, 216-216, 221-221
router/batchrouter/worker.go (1)
138-196
: Revisit panic for DB update failures.
Currently, errors fromUpdateJobStatus
or storing aborted jobs trigger a panic. This might be appropriate for severe failures, but consider whether partial recovery or fault tolerance is possible to avoid halting the entire system.router/batchrouter/handle_lifecycle.go (2)
165-170
: Confirm suitable buffer size when initializing jobBuffer.
A large default channel buffer size can consume substantial memory if many source-destination pairs exist. Revisit the config-based default to ensure it aligns with production constraints.
174-181
: Async destination manager startup looks good.
The conditional initialization and background subscription for async destinations seem cleanly integrated.router/batchrouter/job_buffer.go (2)
24-61
: Channel retrieval logic is correct; verify channel closure scenarios.
When shutting down, if the channel is never explicitly closed, ensure the consumer loop exits cleanly from context cancellation.
63-185
: startJobConsumer concurrency approach is solid.
Using a semaphore to limit concurrent uploads is a good pattern. However, keep an eye on memory usage if many large batches accumulate in the channel during slow uploads.
processedBatchSizeStat := jb.createStat("batch_router_processed_batch_size", stats.GaugeType, sourceID, destID) | ||
processedBatchSizeStat.Gauge(len(jobs)) | ||
|
||
processingStartTime := time.Now() | ||
defer func() { | ||
jb.createStat("batch_router_buffered_batch_processing_time", stats.TimerType, sourceID, destID).Since(processingStartTime) | ||
}() | ||
|
||
// Get destination and source details | ||
jb.brt.configSubscriberMu.RLock() | ||
destWithSources, ok := jb.brt.destinationsMap[destID] | ||
jb.brt.configSubscriberMu.RUnlock() | ||
|
||
if !ok { | ||
// Handle destination not found | ||
jb.brt.logger.Errorf("Destination not found for ID: %s", destID) | ||
return | ||
} | ||
|
||
// Find the source | ||
var source backendconfig.SourceT | ||
sourceFound := false | ||
for _, s := range destWithSources.Sources { | ||
if s.ID == sourceID { | ||
source = s | ||
sourceFound = true | ||
break | ||
} | ||
} | ||
|
||
if !sourceFound { | ||
// Handle source not found | ||
jb.brt.logger.Errorf("Source not found for ID: %s", sourceID) | ||
return | ||
} | ||
|
||
batchedJobs := BatchedJobs{ | ||
Jobs: jobs, | ||
Connection: &Connection{ | ||
Destination: destWithSources.Destination, | ||
Source: source, | ||
}, | ||
} | ||
|
||
// Use the existing upload logic | ||
defer jb.brt.limiter.upload.Begin("")() | ||
|
||
// Helper function for standard object storage upload process | ||
processObjectStorageUpload := func(destType string, isWarehouse bool) { | ||
destUploadStat := jb.createStat("batch_router_dest_upload_time", stats.TimerType, sourceID, destID) | ||
destUploadStart := time.Now() | ||
output := jb.brt.upload(destType, &batchedJobs, isWarehouse) | ||
jb.brt.recordDeliveryStatus(*batchedJobs.Connection, output, isWarehouse) | ||
jb.brt.updateJobStatus(&batchedJobs, isWarehouse, output.Error, false) | ||
misc.RemoveFilePaths(output.LocalFilePaths...) | ||
if output.JournalOpID > 0 { | ||
jb.brt.jobsDB.JournalDeleteEntry(output.JournalOpID) | ||
} | ||
if output.Error == nil { | ||
jb.brt.recordUploadStats(*batchedJobs.Connection, output) | ||
} | ||
destUploadStat.Since(destUploadStart) | ||
} | ||
|
||
switch { | ||
case IsObjectStorageDestination(jb.brt.destType): | ||
processObjectStorageUpload(jb.brt.destType, false) | ||
case IsWarehouseDestination(jb.brt.destType): | ||
useRudderStorage := misc.IsConfiguredToUseRudderObjectStorage(batchedJobs.Connection.Destination.Config) | ||
objectStorageType := warehouseutils.ObjectStorageType(jb.brt.destType, batchedJobs.Connection.Destination.Config, useRudderStorage) | ||
destUploadStat := jb.createStat("batch_router_dest_upload_time", stats.TimerType, sourceID, destID) | ||
destUploadStart := time.Now() | ||
splitBatchJobs := jb.brt.splitBatchJobsOnTimeWindow(batchedJobs) | ||
for _, batchJob := range splitBatchJobs { | ||
output := jb.brt.upload(objectStorageType, batchJob, true) | ||
notifyWarehouseErr := false | ||
if output.Error == nil && output.Key != "" { | ||
output.Error = jb.brt.pingWarehouse(batchJob, output) | ||
if output.Error != nil { | ||
notifyWarehouseErr = true | ||
} | ||
warehouseutils.DestStat(stats.CountType, "generate_staging_files", batchJob.Connection.Destination.ID).Count(1) | ||
warehouseutils.DestStat(stats.CountType, "staging_file_batch_size", batchJob.Connection.Destination.ID).Count(len(batchJob.Jobs)) | ||
} | ||
jb.brt.recordDeliveryStatus(*batchJob.Connection, output, true) | ||
jb.brt.updateJobStatus(batchJob, true, output.Error, notifyWarehouseErr) | ||
misc.RemoveFilePaths(output.LocalFilePaths...) | ||
} | ||
destUploadStat.Since(destUploadStart) | ||
case asynccommon.IsAsyncDestination(jb.brt.destType): | ||
destUploadStat := jb.createStat("batch_router_dest_upload_time", stats.TimerType, sourceID, destID) | ||
destUploadStart := time.Now() | ||
jb.brt.sendJobsToStorage(batchedJobs) | ||
destUploadStat.Since(destUploadStart) | ||
default: | ||
// Handle any other destination types | ||
jb.brt.logger.Warnf("Unsupported destination type %s for job buffer. Attempting generic processing.", jb.brt.destType) | ||
panic(fmt.Sprintf("Unsupported destination type %s for job buffer. Attempting generic processing.", jb.brt.destType)) | ||
} | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
🛠️ Refactor suggestion
Panic on unsupported destination type.
Currently, the code panics if the destination is neither object storage, warehouse, nor async. Consider a fallback or a clear error response if more destination types are introduced later.
35c4865
to
ab5def2
Compare
812f03a
to
7dccdca
Compare
Codecov ReportAttention: Patch coverage is
Additional details and impacted files@@ Coverage Diff @@
## master #5677 +/- ##
==========================================
- Coverage 77.02% 76.97% -0.05%
==========================================
Files 487 488 +1
Lines 66063 66276 +213
==========================================
+ Hits 50885 51018 +133
- Misses 12403 12477 +74
- Partials 2775 2781 +6 ☔ View full report in Codecov by Sentry. 🚀 New features to boost your workflow:
|
jb.brt.logger.Debugf("Batch size threshold reached for %s:%s: %d jobs", sourceID, destID, len(jobBatch)) | ||
|
||
// Make a copy of the batch to avoid race conditions | ||
batchToProcess := make([]*jobsdb.JobT, len(jobBatch)) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
What kind of race condition can happen here? we're passing a slice and re-assigning to a new one after submitting it to a go routine. it's a safe operation without copy rt?
Description
Refactored job processing to use a job buffer with dedicated consumers to decouple reading jobs step and uploading the jobs step
Linear Ticket
Fixes PIPE-2004
Security