Skip to content

Conversation

jakeloo
Copy link
Member

@jakeloo jakeloo commented Sep 12, 2025

Summary by CodeRabbit

  • New Features

    • Poller parallelism now auto-scales to CPU cores when unset, improving default concurrency behavior.
  • Refactor

    • Overhauled commit/publish processing to use coordinated range-based loops and fail fast on state inconsistencies for greater reliability.
  • Chores

    • Removed publisher mode configuration/CLI flag.
    • Removed chain-tracking and work-mode monitoring features.
    • RPC now updates the ChainHead metric when fetching the latest block.

Copy link

zeet-co bot commented Sep 12, 2025

We're building your pull request over on Zeet.
Click me for more info about your build and deployment.
Once built, this branch can be tested at: https://insight-8453-base-backfill-migrat-c0eae5.insight.zeet.app before merging 😉

Copy link

coderabbitai bot commented Sep 12, 2025

Walkthrough

Initialize committer state from storage and run separate concurrent commit and publish loops using range-based block processing; remove publisher-mode config/flag and PublisherConfig.Mode; change poller default parallelism to use GOMAXPROCS when set to 0; remove ChainTracker and WorkModeMonitor; update RPC to emit chain head metric.

Changes

Cohort / File(s) Summary
Committer refactor
internal/orchestrator/committer.go
Initialize lastCommitted/lastPublished from storage and validate consistency (fatal on mismatch); run separate concurrent commit and publish loops; introduce range-based block computation helpers; replace per-block in-commit async publishing with dedicated publish loop; adjust logging and metrics.
Orchestrator lifecycle removals
internal/orchestrator/chain_tracker.go, internal/orchestrator/work_mode_monitor.go, internal/orchestrator/orchestrator.go
Remove ChainTracker and WorkModeMonitor implementations and associated lifecycle startup; orchestrator no longer starts or waits for a chain-tracker goroutine.
Poller default change
internal/orchestrator/poller.go
When config.Cfg.Poller.ParallelPollers is 0, use runtime.GOMAXPROCS(0) (minimum 1) for parallelPollers; import runtime; lookaheadBatches remains parallelPollers + 2.
Config: publisher mode removed
configs/config.go
Remove Mode string \mapstructure:"mode"`fromPublisherConfig`.
CLI flags updated
cmd/root.go
Change default for poller-parallel-pollers to 0; remove publisher-mode flag and its config binding.
RPC metrics emission
internal/rpc/rpc.go
On successful GetLatestBlockNumber, update metrics.ChainHead gauge with the numeric block number.

Sequence Diagram(s)

sequenceDiagram
    autonumber
    participant Comm as Committer
    participant Rpc as RPC Node
    participant Store as Storage
    participant Pub as Publisher
    participant Ctx as Context

    Note over Comm: Startup — load and validate state from Storage
    Comm->>Store: Load lastCommittedBlock, lastPublishedBlock
    Store-->>Comm: Return values
    Comm->>Comm: Validate consistency (fatal on mismatch)

    par Commit loop
        loop until Ctx canceled
            Comm->>Rpc: Get latest block number
            Comm->>Comm: Compute commit range (lastCommitted+1 .. N)
            alt Range available
                Comm->>Rpc: Fetch sequential block data
                Comm->>Store: Insert blocks
                Comm->>Comm: Update lastCommittedBlock
            else No blocks
                Comm->>Comm: Sleep/backoff
            end
        end
    and Publish loop
        loop until Ctx canceled
            Comm->>Rpc: Get latest block number
            Comm->>Comm: Compute publish range (lastPublished+1 .. N)
            alt Range available
                Comm->>Rpc: Fetch sequential block data
                Comm->>Pub: Publish blocks
                Comm->>Comm: Update lastPublishedBlock
            else No blocks
                Comm->>Comm: Sleep/backoff
            end
        end
    end

    Ctx-->>Comm: Cancel
    Comm->>Pub: Close
Loading

Estimated code review effort

🎯 4 (Complex) | ⏱️ ~60 minutes

Pre-merge checks (1 passed, 1 warning, 1 inconclusive)

❌ Failed checks (1 warning, 1 inconclusive)
Check name Status Explanation Resolution
Docstring Coverage ⚠️ Warning Docstring coverage is 0.00% which is insufficient. The required threshold is 80.00%. You can run @coderabbitai generate docstrings to improve docstring coverage.
Title Check ❓ Inconclusive The current title "Misc Cleanup" is vague and does not convey the PR's primary changes. This changeset removes ChainTracker and WorkModeMonitor, refactors the committer into concurrent commit/publish loops, removes the publisher.mode setting, and adjusts poller defaults, so the title should reflect the dominant changes. A concise, specific title will make code review and project history clearer. Please replace the title with a concise, single-sentence summary that names the primary change(s). Examples: "Refactor Committer to concurrent commit/publish loops; remove ChainTracker and WorkModeMonitor" or "Remove publisher.mode, adjust poller defaults, and refactor committer." Use present tense, avoid "misc" or "cleanup", and keep it short so reviewers immediately understand the main intent.
✅ Passed checks (1 passed)
Check name Status Explanation
Description Check ✅ Passed Check skipped - CodeRabbit’s high-level summary is enabled.
✨ Finishing touches
  • 📝 Generate Docstrings
🧪 Generate unit tests
  • Create PR with unit tests
  • Post copyable unit tests in a comment
  • Commit unit tests in branch jl/misc-fixes

Comment @coderabbitai help to get the list of available commands and usage tips.

Copy link

@coderabbitai coderabbitai bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actionable comments posted: 0

Caution

Some comments are outside the diff and can’t be posted inline due to platform limitations.

⚠️ Outside diff range comments (5)
internal/orchestrator/poller.go (1)

125-131: Make range “check-then-mark” atomic to avoid duplicate polling of the same range.

Between isRangeProcessing and markRangeAsProcessing another goroutine can slip in, causing duplicate RPC work. Introduce a tryMark that checks and marks under one lock.

@@
- // Check if already processing
- if p.isRangeProcessing(rangeKey) {
-   return nil, ErrBlocksProcessing
- }
-
- p.markRangeAsProcessing(rangeKey)
+ // Atomically check-and-mark
+ if !p.tryMarkRangeAsProcessing(rangeKey) {
+   return nil, ErrBlocksProcessing
+ }
  defer p.unmarkRangeAsProcessing(rangeKey)
@@
 func (p *Poller) isRangeProcessing(rangeKey string) bool {
   p.processingRangesMutex.RLock()
   defer p.processingRangesMutex.RUnlock()
   return len(p.processingRanges[rangeKey]) > 0
 }
+
+// tryMarkRangeAsProcessing checks and marks the range atomically.
+func (p *Poller) tryMarkRangeAsProcessing(rangeKey string) bool {
+  p.processingRangesMutex.Lock()
+  defer p.processingRangesMutex.Unlock()
+  if len(p.processingRanges[rangeKey]) > 0 {
+    return false
+  }
+  ch := make(chan struct{})
+  p.processingRanges[rangeKey] = []chan struct{}{ch}
+  return true
+}
@@
 func (p *Poller) markRangeAsProcessing(rangeKey string) chan struct{} {
   p.processingRangesMutex.Lock()
   defer p.processingRangesMutex.Unlock()
 
   // Create a notification channel for this range
   notifyChan := make(chan struct{})
 
   // Initialize the slice if it doesn't exist
   if p.processingRanges[rangeKey] == nil {
     p.processingRanges[rangeKey] = []chan struct{}{}
   }
 
   // Store the notification channel
   p.processingRanges[rangeKey] = append(p.processingRanges[rangeKey], notifyChan)
 
   return notifyChan
 }

Also applies to: 375-419

internal/orchestrator/committer.go (4)

216-224: Fatal on nil latestCommitted causes false positives; normalize nil to 0 and self-heal.

Storage may return nil for “no rows yet”. Treat as 0 and reconcile state instead of exiting.

- latestCommittedBlockNumber, err := c.storage.MainStorage.GetMaxBlockNumber(c.rpc.GetChainID())
+ latestCommittedBlockNumber, err := c.storage.MainStorage.GetMaxBlockNumber(c.rpc.GetChainID())
  if err != nil {
    return nil, err
  }
 
- if latestCommittedBlockNumber == nil || c.lastCommittedBlock.Load() != latestCommittedBlockNumber.Uint64() {
-   log.Fatal().Msgf("Inconsistent last committed block state between memory (%d) and storage (%v)", c.lastCommittedBlock.Load(), latestCommittedBlockNumber)
-   return nil, fmt.Errorf("last committed block number is not initialized correctly")
- }
+ if latestCommittedBlockNumber == nil {
+   latestCommittedBlockNumber = new(big.Int).SetUint64(0)
+ }
+ if c.lastCommittedBlock.Load() != latestCommittedBlockNumber.Uint64() {
+   log.Warn().Msgf("Reconciling last committed: memory=%d storage=%s",
+     c.lastCommittedBlock.Load(), latestCommittedBlockNumber.String())
+   c.lastCommittedBlock.Store(latestCommittedBlockNumber.Uint64())
+ }

236-244: Same for published state: avoid Fatal on nil; normalize and reconcile.

Mirror the committed-path fix to prevent unnecessary process exit on fresh state.

- if latestPublishedBlockNumber == nil || c.lastPublishedBlock.Load() != latestPublishedBlockNumber.Uint64() {
-   log.Fatal().Msgf("Inconsistent last published block state between memory (%d) and storage (%v)", c.lastPublishedBlock.Load(), latestPublishedBlockNumber)
-   return nil, fmt.Errorf("last published block number is not initialized correctly")
- }
+ if latestPublishedBlockNumber == nil {
+   latestPublishedBlockNumber = new(big.Int).SetUint64(0)
+ }
+ if c.lastPublishedBlock.Load() != latestPublishedBlockNumber.Uint64() {
+   log.Warn().Msgf("Reconciling last published: memory=%d storage=%s",
+     c.lastPublishedBlock.Load(), latestPublishedBlockNumber.String())
+   c.lastPublishedBlock.Store(latestPublishedBlockNumber.Uint64())
+ }

145-149: Busy-spin risk: add a small backoff when there’s no work.

Loops run continuously and call RPC for latest block even when idle. Add a short sleep to reduce load.

@@ func (c *Committer) runCommitLoop(ctx context.Context) {
       if err := c.commit(ctx); err != nil {
         log.Error().Err(err).Msg("Error committing blocks")
       }
       go c.cleanupProcessedStagingBlocks(ctx)
+      time.Sleep(200 * time.Millisecond)
@@ func (c *Committer) runPublishLoop(ctx context.Context) {
       if err := c.publish(ctx); err != nil {
         log.Error().Err(err).Msg("Error publishing blocks")
       }
       go c.cleanupProcessedStagingBlocks(ctx)
+      time.Sleep(200 * time.Millisecond)

Also applies to: 164-168


339-344: Fix metric: record the first missed block (expectedBlockNumber), not the first block in batch.

Current value is misleading for operators.

- metrics.MissedBlockNumbers.Set(float64(blocksData[0].Block.Number.Int64()))
+ metrics.MissedBlockNumbers.Set(float64(expectedBlockNumber.Int64()))
🧹 Nitpick comments (4)
internal/orchestrator/poller.go (2)

7-7: Defaulting to NumCPU can oversubscribe containers; prefer GOMAXPROCS(0) and clamp >=1.

NumCPU may ignore cgroup CPU quotas depending on Go/runtime settings. Using GOMAXPROCS(0) respects quotas when automaxprocs or env is set. Also guard against 0.

- parallelPollers := config.Cfg.Poller.ParallelPollers
- if parallelPollers == 0 {
-   parallelPollers = runtime.NumCPU()
- }
+ parallelPollers := config.Cfg.Poller.ParallelPollers
+ if parallelPollers <= 0 {
+   // Respect container CPU limits when GOMAXPROCS is set/managed
+   parallelPollers = runtime.GOMAXPROCS(0)
+   if parallelPollers < 1 {
+     parallelPollers = 1
+   }
+ }

Also applies to: 61-64


165-172: Comment says “fetch from staging” but code polls via worker.

Either adjust the comment or fetch from staging to avoid misleading operators.

internal/orchestrator/committer.go (1)

164-168: Deduplicate staging cleanup execution.

Both loops invoke cleanup concurrently; gate it to one runner or add singleflight to reduce duplicate deletes.

@@ func (c *Committer) runPublishLoop(ctx context.Context) {
-     go c.cleanupProcessedStagingBlocks(ctx)
+     // cleanup is already triggered from commit loop

Alternatively, wrap cleanupProcessedStagingBlocks in a singleflight/do to coalesce calls.

Also applies to: 172-207

cmd/root.go (1)

52-53: Clarify flag semantics and fix typo.

Make it clear that 0 uses GOMAXPROCS/NumCPU; fix “oller” typo.

- rootCmd.PersistentFlags().Int("poller-parallel-pollers", 0, "Maximum number of parallel pollers")
+ rootCmd.PersistentFlags().Int("poller-parallel-pollers", 0, "Parallel poller workers (0 = GOMAXPROCS/NumCPU)")
- rootCmd.PersistentFlags().String("poller-s3-bucket", "", "S3 bucket for oller archive source")
+ rootCmd.PersistentFlags().String("poller-s3-bucket", "", "S3 bucket for poller archive source")
📜 Review details

Configuration used: CodeRabbit UI

Review profile: CHILL

Plan: Pro

Disabled knowledge base sources:

  • Linear integration is disabled by default for public repositories

You can enable these sources in your CodeRabbit configuration.

📥 Commits

Reviewing files that changed from the base of the PR and between efbdc81 and dba7539.

📒 Files selected for processing (4)
  • cmd/root.go (1 hunks)
  • configs/config.go (0 hunks)
  • internal/orchestrator/committer.go (7 hunks)
  • internal/orchestrator/poller.go (2 hunks)
💤 Files with no reviewable changes (1)
  • configs/config.go
🔇 Additional comments (1)
internal/orchestrator/committer.go (1)

75-87: Nice split into dedicated commit/publish loops.

Cleaner lifecycle and clearer responsibilities versus the previous inline path.

Copy link

@coderabbitai coderabbitai bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actionable comments posted: 1

🧹 Nitpick comments (3)
internal/orchestrator/poller.go (3)

62-67: Optional: respect container CPU quotas.

GOMAXPROCS may default to host cores; consider uber-go/automaxprocs (or equivalent) early in main to auto-size to cgroup quotas, so this default tracks container limits.


62-67: Nit: redundant min-1 guard after GOMAXPROCS.

runtime.GOMAXPROCS(0) returns ≥1. With the <=0 clamp above, the second guard is defensive but redundant.


20-22: Remove or deprecate unused exported constant DEFAULT_PARALLEL_POLLERS

rg shows only the declaration at internal/orchestrator/poller.go:21 and no other references; remove to avoid exported API drift, or mark deprecated first if other packages may depend on it.

📜 Review details

Configuration used: CodeRabbit UI

Review profile: CHILL

Plan: Pro

Disabled knowledge base sources:

  • Linear integration is disabled by default for public repositories

You can enable these sources in your CodeRabbit configuration.

📥 Commits

Reviewing files that changed from the base of the PR and between 3dcbcdd and 0c901be.

📒 Files selected for processing (1)
  • internal/orchestrator/poller.go (2 hunks)
🔇 Additional comments (1)
internal/orchestrator/poller.go (1)

7-7: LGTM: adopt runtime-based default.

Importing runtime to derive default parallelism is sensible.

Comment on lines 62 to 67
if parallelPollers == 0 {
parallelPollers = DEFAULT_PARALLEL_POLLERS
parallelPollers = runtime.GOMAXPROCS(0)
if parallelPollers < 1 {
parallelPollers = 1
}
}
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue

Prevent deadlock/panic when config is negative.

If config sets ParallelPollers to a negative value:

  • Start() spawns 0 workers (i < negative), and
  • tasks buffer becomes 2n+2, which is 0 for -1 (unbuffered; sends can block forever) or negative for ≤ -2 (runtime panic).

Clamp non-positive values, not just 0.

Apply:

- parallelPollers := config.Cfg.Poller.ParallelPollers
- if parallelPollers == 0 {
-   parallelPollers = runtime.GOMAXPROCS(0)
-   if parallelPollers < 1 {
-     parallelPollers = 1
-   }
- }
+ parallelPollers := config.Cfg.Poller.ParallelPollers
+ if parallelPollers <= 0 {
+   parallelPollers = runtime.GOMAXPROCS(0)
+ }
+ if parallelPollers < 1 {
+   parallelPollers = 1
+ }
📝 Committable suggestion

‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.

Suggested change
if parallelPollers == 0 {
parallelPollers = DEFAULT_PARALLEL_POLLERS
parallelPollers = runtime.GOMAXPROCS(0)
if parallelPollers < 1 {
parallelPollers = 1
}
}
parallelPollers := config.Cfg.Poller.ParallelPollers
if parallelPollers <= 0 {
parallelPollers = runtime.GOMAXPROCS(0)
}
if parallelPollers < 1 {
parallelPollers = 1
}
🤖 Prompt for AI Agents
In internal/orchestrator/poller.go around lines 62 to 67, the current check only
handles parallelPollers == 0 so a negative config value can remain and lead to
zero/negative task buffer sizes and deadlocks/panics; change the condition to if
parallelPollers <= 0 { ... } (leave the existing body that falls back to
runtime.GOMAXPROCS(0) and then clamps to at least 1) so any non‑positive
configured value gets normalized to a safe positive worker count.

@jakeloo jakeloo closed this Sep 17, 2025
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

2 participants