-
Notifications
You must be signed in to change notification settings - Fork 27
Misc Cleanup #289
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Misc Cleanup #289
Conversation
We're building your pull request over on Zeet. |
WalkthroughInitialize committer state from storage and run separate concurrent commit and publish loops using range-based block processing; remove publisher-mode config/flag and PublisherConfig.Mode; change poller default parallelism to use GOMAXPROCS when set to 0; remove ChainTracker and WorkModeMonitor; update RPC to emit chain head metric. Changes
Sequence Diagram(s)sequenceDiagram
autonumber
participant Comm as Committer
participant Rpc as RPC Node
participant Store as Storage
participant Pub as Publisher
participant Ctx as Context
Note over Comm: Startup — load and validate state from Storage
Comm->>Store: Load lastCommittedBlock, lastPublishedBlock
Store-->>Comm: Return values
Comm->>Comm: Validate consistency (fatal on mismatch)
par Commit loop
loop until Ctx canceled
Comm->>Rpc: Get latest block number
Comm->>Comm: Compute commit range (lastCommitted+1 .. N)
alt Range available
Comm->>Rpc: Fetch sequential block data
Comm->>Store: Insert blocks
Comm->>Comm: Update lastCommittedBlock
else No blocks
Comm->>Comm: Sleep/backoff
end
end
and Publish loop
loop until Ctx canceled
Comm->>Rpc: Get latest block number
Comm->>Comm: Compute publish range (lastPublished+1 .. N)
alt Range available
Comm->>Rpc: Fetch sequential block data
Comm->>Pub: Publish blocks
Comm->>Comm: Update lastPublishedBlock
else No blocks
Comm->>Comm: Sleep/backoff
end
end
end
Ctx-->>Comm: Cancel
Comm->>Pub: Close
Estimated code review effort🎯 4 (Complex) | ⏱️ ~60 minutes Pre-merge checks (1 passed, 1 warning, 1 inconclusive)❌ Failed checks (1 warning, 1 inconclusive)
✅ Passed checks (1 passed)
✨ Finishing touches
🧪 Generate unit tests
Comment |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Actionable comments posted: 0
Caution
Some comments are outside the diff and can’t be posted inline due to platform limitations.
⚠️ Outside diff range comments (5)
internal/orchestrator/poller.go (1)
125-131
: Make range “check-then-mark” atomic to avoid duplicate polling of the same range.Between isRangeProcessing and markRangeAsProcessing another goroutine can slip in, causing duplicate RPC work. Introduce a tryMark that checks and marks under one lock.
@@ - // Check if already processing - if p.isRangeProcessing(rangeKey) { - return nil, ErrBlocksProcessing - } - - p.markRangeAsProcessing(rangeKey) + // Atomically check-and-mark + if !p.tryMarkRangeAsProcessing(rangeKey) { + return nil, ErrBlocksProcessing + } defer p.unmarkRangeAsProcessing(rangeKey)@@ func (p *Poller) isRangeProcessing(rangeKey string) bool { p.processingRangesMutex.RLock() defer p.processingRangesMutex.RUnlock() return len(p.processingRanges[rangeKey]) > 0 } + +// tryMarkRangeAsProcessing checks and marks the range atomically. +func (p *Poller) tryMarkRangeAsProcessing(rangeKey string) bool { + p.processingRangesMutex.Lock() + defer p.processingRangesMutex.Unlock() + if len(p.processingRanges[rangeKey]) > 0 { + return false + } + ch := make(chan struct{}) + p.processingRanges[rangeKey] = []chan struct{}{ch} + return true +} @@ func (p *Poller) markRangeAsProcessing(rangeKey string) chan struct{} { p.processingRangesMutex.Lock() defer p.processingRangesMutex.Unlock() // Create a notification channel for this range notifyChan := make(chan struct{}) // Initialize the slice if it doesn't exist if p.processingRanges[rangeKey] == nil { p.processingRanges[rangeKey] = []chan struct{}{} } // Store the notification channel p.processingRanges[rangeKey] = append(p.processingRanges[rangeKey], notifyChan) return notifyChan }Also applies to: 375-419
internal/orchestrator/committer.go (4)
216-224
: Fatal on nil latestCommitted causes false positives; normalize nil to 0 and self-heal.Storage may return nil for “no rows yet”. Treat as 0 and reconcile state instead of exiting.
- latestCommittedBlockNumber, err := c.storage.MainStorage.GetMaxBlockNumber(c.rpc.GetChainID()) + latestCommittedBlockNumber, err := c.storage.MainStorage.GetMaxBlockNumber(c.rpc.GetChainID()) if err != nil { return nil, err } - if latestCommittedBlockNumber == nil || c.lastCommittedBlock.Load() != latestCommittedBlockNumber.Uint64() { - log.Fatal().Msgf("Inconsistent last committed block state between memory (%d) and storage (%v)", c.lastCommittedBlock.Load(), latestCommittedBlockNumber) - return nil, fmt.Errorf("last committed block number is not initialized correctly") - } + if latestCommittedBlockNumber == nil { + latestCommittedBlockNumber = new(big.Int).SetUint64(0) + } + if c.lastCommittedBlock.Load() != latestCommittedBlockNumber.Uint64() { + log.Warn().Msgf("Reconciling last committed: memory=%d storage=%s", + c.lastCommittedBlock.Load(), latestCommittedBlockNumber.String()) + c.lastCommittedBlock.Store(latestCommittedBlockNumber.Uint64()) + }
236-244
: Same for published state: avoid Fatal on nil; normalize and reconcile.Mirror the committed-path fix to prevent unnecessary process exit on fresh state.
- if latestPublishedBlockNumber == nil || c.lastPublishedBlock.Load() != latestPublishedBlockNumber.Uint64() { - log.Fatal().Msgf("Inconsistent last published block state between memory (%d) and storage (%v)", c.lastPublishedBlock.Load(), latestPublishedBlockNumber) - return nil, fmt.Errorf("last published block number is not initialized correctly") - } + if latestPublishedBlockNumber == nil { + latestPublishedBlockNumber = new(big.Int).SetUint64(0) + } + if c.lastPublishedBlock.Load() != latestPublishedBlockNumber.Uint64() { + log.Warn().Msgf("Reconciling last published: memory=%d storage=%s", + c.lastPublishedBlock.Load(), latestPublishedBlockNumber.String()) + c.lastPublishedBlock.Store(latestPublishedBlockNumber.Uint64()) + }
145-149
: Busy-spin risk: add a small backoff when there’s no work.Loops run continuously and call RPC for latest block even when idle. Add a short sleep to reduce load.
@@ func (c *Committer) runCommitLoop(ctx context.Context) { if err := c.commit(ctx); err != nil { log.Error().Err(err).Msg("Error committing blocks") } go c.cleanupProcessedStagingBlocks(ctx) + time.Sleep(200 * time.Millisecond) @@ func (c *Committer) runPublishLoop(ctx context.Context) { if err := c.publish(ctx); err != nil { log.Error().Err(err).Msg("Error publishing blocks") } go c.cleanupProcessedStagingBlocks(ctx) + time.Sleep(200 * time.Millisecond)Also applies to: 164-168
339-344
: Fix metric: record the first missed block (expectedBlockNumber), not the first block in batch.Current value is misleading for operators.
- metrics.MissedBlockNumbers.Set(float64(blocksData[0].Block.Number.Int64())) + metrics.MissedBlockNumbers.Set(float64(expectedBlockNumber.Int64()))
🧹 Nitpick comments (4)
internal/orchestrator/poller.go (2)
7-7
: Defaulting to NumCPU can oversubscribe containers; prefer GOMAXPROCS(0) and clamp >=1.NumCPU may ignore cgroup CPU quotas depending on Go/runtime settings. Using GOMAXPROCS(0) respects quotas when automaxprocs or env is set. Also guard against 0.
- parallelPollers := config.Cfg.Poller.ParallelPollers - if parallelPollers == 0 { - parallelPollers = runtime.NumCPU() - } + parallelPollers := config.Cfg.Poller.ParallelPollers + if parallelPollers <= 0 { + // Respect container CPU limits when GOMAXPROCS is set/managed + parallelPollers = runtime.GOMAXPROCS(0) + if parallelPollers < 1 { + parallelPollers = 1 + } + }Also applies to: 61-64
165-172
: Comment says “fetch from staging” but code polls via worker.Either adjust the comment or fetch from staging to avoid misleading operators.
internal/orchestrator/committer.go (1)
164-168
: Deduplicate staging cleanup execution.Both loops invoke cleanup concurrently; gate it to one runner or add singleflight to reduce duplicate deletes.
@@ func (c *Committer) runPublishLoop(ctx context.Context) { - go c.cleanupProcessedStagingBlocks(ctx) + // cleanup is already triggered from commit loopAlternatively, wrap cleanupProcessedStagingBlocks in a singleflight/do to coalesce calls.
Also applies to: 172-207
cmd/root.go (1)
52-53
: Clarify flag semantics and fix typo.Make it clear that 0 uses GOMAXPROCS/NumCPU; fix “oller” typo.
- rootCmd.PersistentFlags().Int("poller-parallel-pollers", 0, "Maximum number of parallel pollers") + rootCmd.PersistentFlags().Int("poller-parallel-pollers", 0, "Parallel poller workers (0 = GOMAXPROCS/NumCPU)") - rootCmd.PersistentFlags().String("poller-s3-bucket", "", "S3 bucket for oller archive source") + rootCmd.PersistentFlags().String("poller-s3-bucket", "", "S3 bucket for poller archive source")
📜 Review details
Configuration used: CodeRabbit UI
Review profile: CHILL
Plan: Pro
Disabled knowledge base sources:
- Linear integration is disabled by default for public repositories
You can enable these sources in your CodeRabbit configuration.
📒 Files selected for processing (4)
cmd/root.go
(1 hunks)configs/config.go
(0 hunks)internal/orchestrator/committer.go
(7 hunks)internal/orchestrator/poller.go
(2 hunks)
💤 Files with no reviewable changes (1)
- configs/config.go
🔇 Additional comments (1)
internal/orchestrator/committer.go (1)
75-87
: Nice split into dedicated commit/publish loops.Cleaner lifecycle and clearer responsibilities versus the previous inline path.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Actionable comments posted: 1
🧹 Nitpick comments (3)
internal/orchestrator/poller.go (3)
62-67
: Optional: respect container CPU quotas.GOMAXPROCS may default to host cores; consider uber-go/automaxprocs (or equivalent) early in main to auto-size to cgroup quotas, so this default tracks container limits.
62-67
: Nit: redundant min-1 guard after GOMAXPROCS.runtime.GOMAXPROCS(0) returns ≥1. With the <=0 clamp above, the second guard is defensive but redundant.
20-22
: Remove or deprecate unused exported constant DEFAULT_PARALLEL_POLLERSrg shows only the declaration at internal/orchestrator/poller.go:21 and no other references; remove to avoid exported API drift, or mark deprecated first if other packages may depend on it.
📜 Review details
Configuration used: CodeRabbit UI
Review profile: CHILL
Plan: Pro
Disabled knowledge base sources:
- Linear integration is disabled by default for public repositories
You can enable these sources in your CodeRabbit configuration.
📒 Files selected for processing (1)
internal/orchestrator/poller.go
(2 hunks)
🔇 Additional comments (1)
internal/orchestrator/poller.go (1)
7-7
: LGTM: adopt runtime-based default.Importing runtime to derive default parallelism is sensible.
if parallelPollers == 0 { | ||
parallelPollers = DEFAULT_PARALLEL_POLLERS | ||
parallelPollers = runtime.GOMAXPROCS(0) | ||
if parallelPollers < 1 { | ||
parallelPollers = 1 | ||
} | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Prevent deadlock/panic when config is negative.
If config sets ParallelPollers to a negative value:
- Start() spawns 0 workers (i < negative), and
- tasks buffer becomes 2n+2, which is 0 for -1 (unbuffered; sends can block forever) or negative for ≤ -2 (runtime panic).
Clamp non-positive values, not just 0.
Apply:
- parallelPollers := config.Cfg.Poller.ParallelPollers
- if parallelPollers == 0 {
- parallelPollers = runtime.GOMAXPROCS(0)
- if parallelPollers < 1 {
- parallelPollers = 1
- }
- }
+ parallelPollers := config.Cfg.Poller.ParallelPollers
+ if parallelPollers <= 0 {
+ parallelPollers = runtime.GOMAXPROCS(0)
+ }
+ if parallelPollers < 1 {
+ parallelPollers = 1
+ }
📝 Committable suggestion
‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.
if parallelPollers == 0 { | |
parallelPollers = DEFAULT_PARALLEL_POLLERS | |
parallelPollers = runtime.GOMAXPROCS(0) | |
if parallelPollers < 1 { | |
parallelPollers = 1 | |
} | |
} | |
parallelPollers := config.Cfg.Poller.ParallelPollers | |
if parallelPollers <= 0 { | |
parallelPollers = runtime.GOMAXPROCS(0) | |
} | |
if parallelPollers < 1 { | |
parallelPollers = 1 | |
} |
🤖 Prompt for AI Agents
In internal/orchestrator/poller.go around lines 62 to 67, the current check only
handles parallelPollers == 0 so a negative config value can remain and lead to
zero/negative task buffer sizes and deadlocks/panics; change the condition to if
parallelPollers <= 0 { ... } (leave the existing body that falls back to
runtime.GOMAXPROCS(0) and then clamps to at least 1) so any non‑positive
configured value gets normalized to a safe positive worker count.
Summary by CodeRabbit
New Features
Refactor
Chores