Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

chore: improve batch router #5677

Open
wants to merge 20 commits into
base: master
Choose a base branch
from
Open

chore: improve batch router #5677

wants to merge 20 commits into from

Conversation

cisse21
Copy link
Member

@cisse21 cisse21 commented Apr 1, 2025

Description

Refactored job processing to use a job buffer with dedicated consumers to decouple reading jobs step and uploading the jobs step

Linear Ticket

Fixes PIPE-2004

Security

  • The code changed/added as part of this pull request won't create any security issues with how the software is being used.

@cisse21 cisse21 force-pushed the chore.improveBRT branch 10 times, most recently from ab51e7c to 88eb6d2 Compare April 1, 2025 14:38
@cisse21 cisse21 force-pushed the chore.improveBRT branch from 88eb6d2 to 1c01dd2 Compare April 1, 2025 15:04
@cisse21 cisse21 force-pushed the chore.improveBRT branch from dc37bb1 to 9000924 Compare April 2, 2025 08:35
@cisse21 cisse21 force-pushed the chore.improveBRT branch 6 times, most recently from 7078513 to 5389eff Compare April 2, 2025 13:59
@cisse21 cisse21 force-pushed the chore.improveBRT branch 3 times, most recently from 55b5da4 to 1a2e697 Compare April 3, 2025 07:35
@Jayachand Jayachand requested a review from Copilot April 3, 2025 08:09
Copy link

@Copilot Copilot AI left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Pull Request Overview

This PR improves the batch router by enhancing logging, introducing channel-based job buffering, and updating test timeouts for reliability.

  • Added goroutine ID logging in workers
  • Refactored job processing to use a job buffer with dedicated consumers
  • Adjusted test timeout durations in both unit and integration tests

Reviewed Changes

Copilot reviewed 8 out of 8 changed files in this pull request and generated 1 comment.

Show a summary per file
File Description
utils/workerpool/internal_worker.go Added goroutine ID logging and helper function for retrieving goid
router/batchrouter/worker.go Updated worker logging and refactored job processing to use buffers
router/batchrouter/job_buffer.go Introduced channel-based job buffering and new consumer routines
router/batchrouter/handle_observability.go Revised metrics collection with context cancellation in select block
router/batchrouter/handle_lifecycle.go Initialized job buffer and updated shutdown logic for timers
router/batchrouter/handle.go Added job buffer field and improved job status update with sorting
router/batchrouter/batchrouter_test.go Increased test timeouts in batch router tests
integration_test/docker_test/docker_test.go Increased test timeouts to accommodate slower integration test scenarios
Comments suppressed due to low confidence (2)

utils/workerpool/internal_worker.go:91

  • The goid() function is duplicated here and in other packages; consider extracting it into a common utility to avoid code duplication.
func goid() int {

router/batchrouter/worker.go:37

  • The goid() function is duplicated in multiple files; refactor this logic into a shared utility module to improve maintainability.
func goid() int {


// Sort jobs by ID to ensure consistent processing order
slices.SortFunc(batchJobs.Jobs, func(a, b *jobsdb.JobT) int {
return int(a.JobID - b.JobID)
Copy link
Preview

Copilot AI Apr 3, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Converting the difference of two int64 JobIDs to an int may lead to overflow or unexpected behavior; consider using an explicit comparison returning -1, 0, or 1.

Suggested change
return int(a.JobID - b.JobID)
if a.JobID < b.JobID {
return -1
} else if a.JobID > b.JobID {
return 1
} else {
return 0
}

Copilot is powered by AI, so mistakes are possible. Review output carefully before use.

@cisse21 cisse21 force-pushed the chore.improveBRT branch 2 times, most recently from d152210 to 3ffaa26 Compare April 3, 2025 11:28
@cisse21 cisse21 force-pushed the chore.improveBRT branch from 3ffaa26 to 3da0a05 Compare April 3, 2025 11:29
@cisse21 cisse21 force-pushed the chore.improveBRT branch from 8dae67d to 35c4865 Compare April 4, 2025 07:09
@cisse21
Copy link
Member Author

cisse21 commented Apr 4, 2025

@coderabbitai full review

Copy link
Contributor

coderabbitai bot commented Apr 4, 2025

Walkthrough

The pull request increases timeout durations in several test assertions and refactors various components of the batch router. In the tests, the wait durations in require.Eventually calls were extended. In the batch router code, a new job buffering mechanism is introduced by adding a jobBuffer field and a dedicated JobBuffer type with associated methods. Additional changes include inserting a sort in job status updates, revising the Setup and Shutdown processes to initialize and clean up job buffers, reorganizing metric collection to occur with ticker events, and refactoring the worker’s job processing to use a synchronous routing method.

Changes

Files Change Summary
integration_test/.../docker_test.go, router/batchrouter/.../batchrouter_test.go Increased timeout durations in require.Eventually assertions (from time.Minute to 2*time.Minute in docker tests and from 5*time.Second to 2*time.Minute in batch router tests) to allow longer wait periods for expected conditions.
router/batchrouter/handle.go Added a new jobBuffer *JobBuffer field to the Handle struct and inserted a sorting operation using slices.SortFunc in the updateJobStatus method to order jobs by their ID.
router/batchrouter/handle_lifecycle.go Revised the Setup and Shutdown methods: initialized the job buffer with maps for channels and timers in Setup (with added asynchronous configuration subscription) and updated Shutdown to stop timers (using mutex locks) and wait for background goroutines.
router/batchrouter/handle_observability.go Moved the metric collection logic to execute within the diagnosisTicker case instead of at the beginning of each loop iteration, while keeping thread-safe aggregation of batch request metrics.
router/batchrouter/job_buffer.go Introduced a new JobBuffer struct along with methods for managing job channels, starting job consumers, processing and uploading job batches, and creating job statistics.
router/batchrouter/worker.go Refactored job processing by removing the async pattern (sync.WaitGroup) from the Work method and introducing a new synchronous routeJobsToBuffer method that organizes jobs by source and destination and updates their statuses in batch.
✨ Finishing Touches
  • 📝 Generate Docstrings

Thanks for using CodeRabbit! It's free for OSS, and your support helps us grow. If you like it, consider giving us a shout-out.

❤️ Share
🪧 Tips

Chat

There are 3 ways to chat with CodeRabbit:

‼️ IMPORTANT
Auto-reply has been disabled for this repository in the CodeRabbit settings. The CodeRabbit bot will not respond to your replies unless it is explicitly tagged.

  • Files and specific lines of code (under the "Files changed" tab): Tag @coderabbitai in a new review comment at the desired location with your query. Examples:
    • @coderabbitai generate unit testing code for this file.
    • @coderabbitai modularize this function.
  • PR comments: Tag @coderabbitai in a new PR comment to ask questions about the PR branch. For the best results, please provide a very specific query, as very limited context is provided in this mode. Examples:
    • @coderabbitai gather interesting stats about this repository and render them as a table. Additionally, render a pie chart showing the language distribution in the codebase.
    • @coderabbitai read src/utils.ts and generate unit testing code.
    • @coderabbitai read the files in the src/scheduler package and generate a class diagram using mermaid and a README in the markdown format.
    • @coderabbitai help me debug CodeRabbit configuration file.

Note: Be mindful of the bot's finite context window. It's strongly recommended to break down tasks such as reading entire modules into smaller chunks. For a focused discussion, use review comments to chat about specific files and their changes, instead of using the PR comments.

CodeRabbit Commands (Invoked using PR comments)

  • @coderabbitai pause to pause the reviews on a PR.
  • @coderabbitai resume to resume the paused reviews.
  • @coderabbitai review to trigger an incremental review. This is useful when automatic reviews are disabled for the repository.
  • @coderabbitai full review to do a full review from scratch and review all the files again.
  • @coderabbitai summary to regenerate the summary of the PR.
  • @coderabbitai generate docstrings to generate docstrings for this PR.
  • @coderabbitai resolve resolve all the CodeRabbit review comments.
  • @coderabbitai plan to trigger planning for file edits and PR creation.
  • @coderabbitai configuration to show the current CodeRabbit configuration for the repository.
  • @coderabbitai help to get help.

Other keywords and placeholders

  • Add @coderabbitai ignore anywhere in the PR description to prevent this PR from being reviewed.
  • Add @coderabbitai summary to generate the high-level summary at a specific location in the PR description.
  • Add @coderabbitai or @coderabbitai title anywhere in the PR title to generate the title automatically.

CodeRabbit Configuration File (.coderabbit.yaml)

  • You can programmatically configure CodeRabbit by adding a .coderabbit.yaml file to the root of your repository.
  • Please see the configuration documentation for more information.
  • If your editor has YAML language server enabled, you can add the path at the top of this file to enable auto-completion and validation: # yaml-language-server: $schema=https://coderabbit.ai/integrations/schema.v2.json

Documentation and Community

  • Visit our Documentation for detailed information on how to use CodeRabbit.
  • Join our Discord Community to get help, request features, and share feedback.
  • Follow us on X/Twitter for updates and announcements.

Copy link
Contributor

@coderabbitai coderabbitai bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actionable comments posted: 2

♻️ Duplicate comments (1)
router/batchrouter/handle.go (1)

627-630: ⚠️ Potential issue

Fix potential integer overflow in sort function

The current implementation may lead to integer overflow when comparing large int64 JobIDs and converting their difference to an int type. This is especially problematic on 32-bit systems where int is only 32 bits.

Apply this safer implementation:

-slices.SortFunc(batchJobs.Jobs, func(a, b *jobsdb.JobT) int {
-    return int(a.JobID - b.JobID)
-})
+slices.SortFunc(batchJobs.Jobs, func(a, b *jobsdb.JobT) int {
+    if a.JobID < b.JobID {
+        return -1
+    } else if a.JobID > b.JobID {
+        return 1
+    } else {
+        return 0
+    }
+})
🧹 Nitpick comments (11)
router/batchrouter/worker.go (4)

47-47: Prefer verifying workerJob data before routing.
Before calling routeJobsToBuffer(workerJob), it might be beneficial to log or validate the structure of workerJob (e.g., ensuring destWithSources and jobs fields are non-empty), to catch unexpected nil references or empty slices quickly.


49-49: Add context to the log message.
The log message is helpful, but consider including the length of workerJobs or other relevant details to facilitate debugging.

- brt.logger.Infof("returning true for worker %s", w.partition)
+ brt.logger.Infof("returning true for worker %s; processed %d job groups", w.partition, len(workerJobs))

53-62: Clarify or rename local variables for better readability.
In this block, variables such as statusList, drainList, and jobIDConnectionDetailsMap are meaningful but might benefit from descriptive comments to clarify their role in the job buffering flow.


138-154: Handle possible concurrency or transaction failures gracefully.
If UpdateJobStatus fails after retries, logging a panic is valid for irrecoverable scenarios. However, confirm that higher-level logic can recover properly from such a panic, or consider using a less drastic approach for production resilience.

router/batchrouter/handle_lifecycle.go (4)

174-176: Review asynchronous manager usage.
Starting the async destination manager here is good. If there are any synchronous destinations that also require special handling, confirm that they won't break by skipping startAsyncDestinationManager(). Add a log to confirm the asynchronous manager's readiness.


178-181: Maintain resilience on backend configuration errors.
The backend config subscriber goroutine can face transient issues if the configuration reading fails. Consider adding robust error-handling or retry logic around brt.backendConfigSubscriber(), if not already present, to avoid silent failures.


272-275: Include context about which goroutines are stopping.
When initiating shutdown, it’s helpful to specify which background goroutines (async manager, job consumer, etc.) are being stopped. This clarifies the logs.

- brt.logger.Info("Initiating batch router shutdown")
+ brt.logger.Infof("Initiating batch router shutdown; stopping background goroutines for destType: %s", brt.destType)

288-288: Shutdown log can be combined with final resource usage stats.
Consider adding stats for how many job channels or timers were cleaned up, or how many pending jobs remain, so the final log is more actionable.

router/batchrouter/job_buffer.go (3)

16-22: Ensure concurrency for map usage.
sourceDestMap and uploadTimers share a single Mutex mu. If usage grows large or if contention becomes high, evaluate whether separate locks or concurrent data structures are needed.


24-31: Enhance stat tagging with workspace details if available.
Including the workspace ID in the stat tags (if it’s easily retrievable here) can aid in diagnosing multi-workspace scenarios.


38-62: Invert flow to reduce time under write lock.
When ch does not exist, you re-lock with a write lock. This is correct but can slightly hinder concurrency if many readers are contending. Consider an approach that uses double-checked locking more selectively or a sync.Map structure.

📜 Review details

Configuration used: CodeRabbit UI
Review profile: CHILL
Plan: Pro

📥 Commits

Reviewing files that changed from the base of the PR and between 7dbf5d8 and 35c4865.

📒 Files selected for processing (7)
  • integration_test/docker_test/docker_test.go (3 hunks)
  • router/batchrouter/batchrouter_test.go (1 hunks)
  • router/batchrouter/handle.go (2 hunks)
  • router/batchrouter/handle_lifecycle.go (4 hunks)
  • router/batchrouter/handle_observability.go (1 hunks)
  • router/batchrouter/job_buffer.go (1 hunks)
  • router/batchrouter/worker.go (1 hunks)
🧰 Additional context used
🧬 Code Definitions (2)
router/batchrouter/worker.go (3)
router/batchrouter/types.go (2)
  • DestinationJobs (18-21)
  • ErrorResponse (50-52)
jobsdb/jobsdb.go (3)
  • ParameterFilterT (551-554)
  • JobStatusT (363-374)
  • JobT (400-413)
router/utils/utils.go (3)
  • DrainStats (41-45)
  • DRAIN_ERROR_CODE (21-21)
  • JobParameters (53-72)
router/batchrouter/job_buffer.go (1)
router/batchrouter/handle.go (1)
  • Handle (54-154)
🔇 Additional comments (12)
router/batchrouter/batchrouter_test.go (1)

668-668: Increased test timeout improves reliability

The test timeout has been increased from 5 seconds to 2 minutes, which will allow the test to better handle systems with slower resource access or higher load. This change aligns with similar timeout increases elsewhere in the codebase and should help reduce flaky test failures.

router/batchrouter/handle.go (1)

112-113: New jobBuffer field for channel-based job buffering

This introduces a new jobBuffer field to the Handle struct that will be used for job buffering. This is part of the architecture improvements mentioned in the PR objectives.

integration_test/docker_test/docker_test.go (1)

178-178: Increased test timeouts improve reliability

The test timeouts for database operations have been increased from 1 minute to 2 minutes across multiple test assertions. This change makes the tests more robust against system variations and slower CI environments, reducing the likelihood of flaky test failures due to timing issues.

Also applies to: 183-183, 197-197, 202-202, 216-216, 221-221

router/batchrouter/handle_observability.go (1)

32-54: Improved metrics collection efficiency

This change optimizes the metrics collection process by moving it inside the ticker's case block. Now metrics will only be processed when the diagnosis ticker ticks, rather than potentially at every loop iteration. This will reduce unnecessary processing and improve overall performance of the batch router.

router/batchrouter/worker.go (2)

64-136: Validate drain condition logic thoroughly.
The logic that appends jobs to drainList vs. statusList is critical. Ensure thorough testing of corner cases, such as when drainer.Drain can return true for borderline conditions, and confirm that partial drains (e.g., only one job out of the batch is drained) work as intended. Consider adding unit tests or test logs for these situations.


156-196: Optimize drain job handling.
Storing drained jobs into ErrorDB is a key part of the flow. Confirm that subsequent steps, such as re-processing or reporting metrics for these aborted jobs, are consistent with the rest of the pipeline. Additionally, ensure that the pendingEventsRegistry.DecreasePendingEvents call is always accurate in multi-worker scenarios.

router/batchrouter/handle_lifecycle.go (3)

165-170: Confirm buffer initialization settings align with concurrency limits.
Ensure that the sourceDestMap and uploadTimers do not become overly large when there are many sources or destinations, and confirm that the chosen buffer or timer defaults can handle worst-case production loads efficiently.


276-285: Lock hierarchy consideration around timer stopping.
You acquire brt.jobBuffer.mu.Lock() and then stop timers. Ensure no other place in the code attempts to stop or reset timers while holding a different lock that could cause a potential deadlock.


340-399: Remove or handle raw file resources with caution.
When dealing with partially downloaded files in the crash recovery logic, ensure that these temporary files are always removed, even if a panic occurs. The current approach seems correct but carefully recheck early returns or panic paths.

router/batchrouter/job_buffer.go (3)

1-14: Check for potential import duplicates or cleanup.
A quick pass shows most imports are used. However, ensure that none are redundant.


33-36: Enforce stricter string sanitization on keys.
If sourceID or destID can have special characters, consider additional sanitization or escaping to avoid collisions or parse issues in getSourceDestKey.


63-185: Review shutdown race conditions with job consumer.
The shutdown path and this consumer both manipulate channels and timers. Confirm no edge case where the consumer goroutine might block indefinitely if the channel is closed before the final batch is processed.

Comment on lines 324 to 339
brt.logger.Debug("BRT: Checking for incomplete journal entries to recover from...")
entries := brt.jobsDB.GetJournalEntries(jobsdb.RawDataDestUploadOperation)
for _, entry := range entries {
var object ObjectStorageDefinition
if err := jsonrs.Unmarshal(entry.OpPayload, &object); err != nil {
panic(err)
}
if len(object.Config) == 0 {
// Backward compatibility. If old entries dont have config, just delete journal entry
brt.jobsDB.JournalDeleteEntry(entry.OpID)
continue
}
downloader, err := brt.fileManagerFactory(&filemanager.Settings{
Provider: object.Provider,
Config: object.Config,
})
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

🛠️ Refactor suggestion

Consider structured error logging for incomplete journal entries.
When unmarshal errors or file manager creation errors occur, a panic can hamper partial recovery. A more structured approach (logging combined with a fallback mechanism) might be beneficial if partial completion is desired.

Copy link
Contributor

@coderabbitai coderabbitai bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actionable comments posted: 1

♻️ Duplicate comments (1)
router/batchrouter/handle.go (1)

627-630: ⚠️ Potential issue

Fix potential integer overflow in job sorting.

Converting the difference of two int64 JobIDs to an int may lead to overflow or unexpected behavior when the IDs have a large difference between them.

Apply this safer approach that uses explicit comparison:

-slices.SortFunc(batchJobs.Jobs, func(a, b *jobsdb.JobT) int {
-    return int(a.JobID - b.JobID)
-})
+slices.SortFunc(batchJobs.Jobs, func(a, b *jobsdb.JobT) int {
+    if a.JobID < b.JobID {
+        return -1
+    } else if a.JobID > b.JobID {
+        return 1
+    } else {
+        return 0
+    }
+})
🧹 Nitpick comments (4)
router/batchrouter/worker.go (2)

47-49: Consider using a more descriptive or debug-level log message.
If the Work() method runs frequently, this info-level log might be too chatty. Switching to debug level or adding more context could improve readability.


53-136: Validate drain logic and coverage with dedicated tests.
The core drain check and status assignment logic here is critical. Ensuring comprehensive test coverage (especially around edge cases like an empty or unexpected reason) would safeguard against regressions.

router/batchrouter/handle_lifecycle.go (1)

272-289: Graceful shutdown is well-structured.
Timers are stopped and goroutines are awaited, preventing race conditions. Consider verifying that job channels automatically drain and no additional cleanup is needed for leftover in-flight jobs.

router/batchrouter/job_buffer.go (1)

16-22: Document concurrency expectations in the JobBuffer struct.
It might help future maintainers to note that mu protects both sourceDestMap and uploadTimers, especially for concurrent access across multiple goroutines.

📜 Review details

Configuration used: CodeRabbit UI
Review profile: CHILL
Plan: Pro

📥 Commits

Reviewing files that changed from the base of the PR and between 7dbf5d8 and 35c4865.

📒 Files selected for processing (7)
  • integration_test/docker_test/docker_test.go (3 hunks)
  • router/batchrouter/batchrouter_test.go (1 hunks)
  • router/batchrouter/handle.go (2 hunks)
  • router/batchrouter/handle_lifecycle.go (4 hunks)
  • router/batchrouter/handle_observability.go (1 hunks)
  • router/batchrouter/job_buffer.go (1 hunks)
  • router/batchrouter/worker.go (1 hunks)
🧰 Additional context used
🧬 Code Definitions (3)
router/batchrouter/worker.go (3)
router/batchrouter/types.go (2)
  • DestinationJobs (18-21)
  • ErrorResponse (50-52)
jobsdb/jobsdb.go (3)
  • ParameterFilterT (551-554)
  • JobStatusT (363-374)
  • JobT (400-413)
router/utils/utils.go (3)
  • DrainStats (41-45)
  • DRAIN_ERROR_CODE (21-21)
  • JobParameters (53-72)
router/batchrouter/job_buffer.go (1)
router/batchrouter/handle.go (1)
  • Handle (54-154)
router/batchrouter/handle_lifecycle.go (4)
router/batchrouter/job_buffer.go (1)
  • JobBuffer (17-22)
jobsdb/jobsdb.go (1)
  • JobT (400-413)
router/batchrouter/asyncdestinationmanager/common/utils.go (1)
  • IsAsyncDestination (18-20)
utils/misc/misc.go (1)
  • CreateTMPDIR (211-228)
🔇 Additional comments (9)
router/batchrouter/handle.go (1)

112-112: New job buffer field added to Handle struct.

The addition of the jobBuffer field will support channel-based job buffering as indicated in the comment. This is likely part of a larger refactoring to improve job processing efficiency.

router/batchrouter/batchrouter_test.go (1)

668-668: Test timeout increased for reliability.

The timeout for the Minio content check was increased from 5 seconds to 2 minutes. This change is reasonable for a batch operation test that may take longer to complete depending on the environment.

router/batchrouter/handle_observability.go (1)

31-54: Improved metric collection timing.

The metric collection logic has been optimized to only execute within the ticker event handler rather than at each loop iteration. This is a good practice that:

  1. Makes metrics collection more predictable and aligned with the ticker frequency
  2. Reduces potential overhead by only collecting and reporting metrics at defined intervals
  3. Maintains the same functionality while improving code organization
integration_test/docker_test/docker_test.go (1)

178-178: Test timeouts increased for database operations.

The timeouts for database operation checks have been increased from 1 minute to 2 minutes, which is appropriate for integration tests that involve database interactions. These operations can be slower in test environments, and the increased timeout provides more reliability without affecting test validity.

Also applies to: 183-183, 197-197, 202-202, 216-216, 221-221

router/batchrouter/worker.go (1)

138-196: Revisit panic for DB update failures.
Currently, errors from UpdateJobStatus or storing aborted jobs trigger a panic. This might be appropriate for severe failures, but consider whether partial recovery or fault tolerance is possible to avoid halting the entire system.

router/batchrouter/handle_lifecycle.go (2)

165-170: Confirm suitable buffer size when initializing jobBuffer.
A large default channel buffer size can consume substantial memory if many source-destination pairs exist. Revisit the config-based default to ensure it aligns with production constraints.


174-181: Async destination manager startup looks good.
The conditional initialization and background subscription for async destinations seem cleanly integrated.

router/batchrouter/job_buffer.go (2)

24-61: Channel retrieval logic is correct; verify channel closure scenarios.
When shutting down, if the channel is never explicitly closed, ensure the consumer loop exits cleanly from context cancellation.


63-185: startJobConsumer concurrency approach is solid.
Using a semaphore to limit concurrent uploads is a good pattern. However, keep an eye on memory usage if many large batches accumulate in the channel during slow uploads.

Comment on lines +187 to +292
processedBatchSizeStat := jb.createStat("batch_router_processed_batch_size", stats.GaugeType, sourceID, destID)
processedBatchSizeStat.Gauge(len(jobs))

processingStartTime := time.Now()
defer func() {
jb.createStat("batch_router_buffered_batch_processing_time", stats.TimerType, sourceID, destID).Since(processingStartTime)
}()

// Get destination and source details
jb.brt.configSubscriberMu.RLock()
destWithSources, ok := jb.brt.destinationsMap[destID]
jb.brt.configSubscriberMu.RUnlock()

if !ok {
// Handle destination not found
jb.brt.logger.Errorf("Destination not found for ID: %s", destID)
return
}

// Find the source
var source backendconfig.SourceT
sourceFound := false
for _, s := range destWithSources.Sources {
if s.ID == sourceID {
source = s
sourceFound = true
break
}
}

if !sourceFound {
// Handle source not found
jb.brt.logger.Errorf("Source not found for ID: %s", sourceID)
return
}

batchedJobs := BatchedJobs{
Jobs: jobs,
Connection: &Connection{
Destination: destWithSources.Destination,
Source: source,
},
}

// Use the existing upload logic
defer jb.brt.limiter.upload.Begin("")()

// Helper function for standard object storage upload process
processObjectStorageUpload := func(destType string, isWarehouse bool) {
destUploadStat := jb.createStat("batch_router_dest_upload_time", stats.TimerType, sourceID, destID)
destUploadStart := time.Now()
output := jb.brt.upload(destType, &batchedJobs, isWarehouse)
jb.brt.recordDeliveryStatus(*batchedJobs.Connection, output, isWarehouse)
jb.brt.updateJobStatus(&batchedJobs, isWarehouse, output.Error, false)
misc.RemoveFilePaths(output.LocalFilePaths...)
if output.JournalOpID > 0 {
jb.brt.jobsDB.JournalDeleteEntry(output.JournalOpID)
}
if output.Error == nil {
jb.brt.recordUploadStats(*batchedJobs.Connection, output)
}
destUploadStat.Since(destUploadStart)
}

switch {
case IsObjectStorageDestination(jb.brt.destType):
processObjectStorageUpload(jb.brt.destType, false)
case IsWarehouseDestination(jb.brt.destType):
useRudderStorage := misc.IsConfiguredToUseRudderObjectStorage(batchedJobs.Connection.Destination.Config)
objectStorageType := warehouseutils.ObjectStorageType(jb.brt.destType, batchedJobs.Connection.Destination.Config, useRudderStorage)
destUploadStat := jb.createStat("batch_router_dest_upload_time", stats.TimerType, sourceID, destID)
destUploadStart := time.Now()
splitBatchJobs := jb.brt.splitBatchJobsOnTimeWindow(batchedJobs)
for _, batchJob := range splitBatchJobs {
output := jb.brt.upload(objectStorageType, batchJob, true)
notifyWarehouseErr := false
if output.Error == nil && output.Key != "" {
output.Error = jb.brt.pingWarehouse(batchJob, output)
if output.Error != nil {
notifyWarehouseErr = true
}
warehouseutils.DestStat(stats.CountType, "generate_staging_files", batchJob.Connection.Destination.ID).Count(1)
warehouseutils.DestStat(stats.CountType, "staging_file_batch_size", batchJob.Connection.Destination.ID).Count(len(batchJob.Jobs))
}
jb.brt.recordDeliveryStatus(*batchJob.Connection, output, true)
jb.brt.updateJobStatus(batchJob, true, output.Error, notifyWarehouseErr)
misc.RemoveFilePaths(output.LocalFilePaths...)
}
destUploadStat.Since(destUploadStart)
case asynccommon.IsAsyncDestination(jb.brt.destType):
destUploadStat := jb.createStat("batch_router_dest_upload_time", stats.TimerType, sourceID, destID)
destUploadStart := time.Now()
jb.brt.sendJobsToStorage(batchedJobs)
destUploadStat.Since(destUploadStart)
default:
// Handle any other destination types
jb.brt.logger.Warnf("Unsupported destination type %s for job buffer. Attempting generic processing.", jb.brt.destType)
panic(fmt.Sprintf("Unsupported destination type %s for job buffer. Attempting generic processing.", jb.brt.destType))
}
}
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

🛠️ Refactor suggestion

Panic on unsupported destination type.
Currently, the code panics if the destination is neither object storage, warehouse, nor async. Consider a fallback or a clear error response if more destination types are introduced later.

@cisse21 cisse21 force-pushed the chore.improveBRT branch from 35c4865 to ab5def2 Compare April 4, 2025 09:04
@cisse21 cisse21 force-pushed the chore.improveBRT branch 3 times, most recently from 812f03a to 7dccdca Compare April 4, 2025 11:16
Copy link

codecov bot commented Apr 8, 2025

Codecov Report

Attention: Patch coverage is 79.42584% with 86 lines in your changes missing coverage. Please review.

Project coverage is 76.97%. Comparing base (4c3e4ff) to head (79f32b9).
Report is 3 commits behind head on master.

Files with missing lines Patch % Lines
router/batchrouter/job_buffer.go 74.07% 49 Missing and 7 partials ⚠️
router/batchrouter/handle_observability.go 0.00% 20 Missing ⚠️
router/batchrouter/worker.go 93.42% 6 Missing and 4 partials ⚠️
Additional details and impacted files
@@            Coverage Diff             @@
##           master    #5677      +/-   ##
==========================================
- Coverage   77.02%   76.97%   -0.05%     
==========================================
  Files         487      488       +1     
  Lines       66063    66276     +213     
==========================================
+ Hits        50885    51018     +133     
- Misses      12403    12477      +74     
- Partials     2775     2781       +6     

☔ View full report in Codecov by Sentry.
📢 Have feedback on the report? Share it here.

🚀 New features to boost your workflow:
  • ❄️ Test Analytics: Detect flaky tests, report on failures, and find test suite problems.

@cisse21 cisse21 marked this pull request as ready for review April 8, 2025 13:13
@ktgowtham ktgowtham requested review from atzoum and ktgowtham April 8, 2025 17:22
jb.brt.logger.Debugf("Batch size threshold reached for %s:%s: %d jobs", sourceID, destID, len(jobBatch))

// Make a copy of the batch to avoid race conditions
batchToProcess := make([]*jobsdb.JobT, len(jobBatch))
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What kind of race condition can happen here? we're passing a slice and re-assigning to a new one after submitting it to a go routine. it's a safe operation without copy rt?

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

3 participants