Skip to content

Commit

Permalink
op-batcher: add channel_queue_length gauge metric (#14212)
Browse files Browse the repository at this point in the history
* op-batcher: add channel_queue_length gauge metric

* update NoopMetrics

* integrate new metric behaviour into unit tests

* move metric update outside of loop
  • Loading branch information
geoknee authored Feb 6, 2025
1 parent 722b756 commit dbf1c04
Show file tree
Hide file tree
Showing 5 changed files with 29 additions and 2 deletions.
9 changes: 7 additions & 2 deletions op-batcher/batcher/channel_manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -83,6 +83,7 @@ func (s *channelManager) Clear(l1OriginLastSubmittedChannel eth.BlockID) {
s.tip = common.Hash{}
s.currentChannel = nil
s.channelQueue = nil
s.metr.RecordChannelQueueLength(0)
s.txChannels = make(map[string]*channel)
}

Expand Down Expand Up @@ -158,6 +159,7 @@ func (s *channelManager) handleChannelInvalidated(c *channel) {
break
}
}
s.metr.RecordChannelQueueLength(len(s.channelQueue))

// We want to start writing to a new channel, so reset currentChannel.
s.currentChannel = nil
Expand Down Expand Up @@ -306,8 +308,6 @@ func (s *channelManager) ensureChannelWithSpace(l1Head eth.BlockID) error {
pc := newChannel(s.log, s.metr, cfg, s.rollupCfg, s.l1OriginLastSubmittedChannel.Number, channelOut)

s.currentChannel = pc
s.channelQueue = append(s.channelQueue, pc)

s.log.Info("Created channel",
"id", pc.ID(),
"l1Head", l1Head,
Expand All @@ -321,6 +321,9 @@ func (s *channelManager) ensureChannelWithSpace(l1Head eth.BlockID) error {
)
s.metr.RecordChannelOpened(pc.ID(), s.pendingBlocks())

s.channelQueue = append(s.channelQueue, pc)
s.metr.RecordChannelQueueLength(len(s.channelQueue))

return nil
}

Expand Down Expand Up @@ -473,9 +476,11 @@ func (s *channelManager) PruneChannels(num int) {
}
}
s.channelQueue = s.channelQueue[num:]
s.metr.RecordChannelQueueLength(len(s.channelQueue))
if clearCurrentChannel {
s.currentChannel = nil
}

}

// PendingDABytes returns the current number of bytes pending to be written to the DA layer (from blocks fetched from L2
Expand Down
6 changes: 6 additions & 0 deletions op-batcher/batcher/channel_manager_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -412,14 +412,17 @@ func TestChannelManager_handleChannelInvalidated(t *testing.T) {
stateSnapshot := queue.Queue[*types.Block]{blockA, blockB}
m.blocks = stateSnapshot
require.Empty(t, m.channelQueue)
require.Equal(t, metrics.ChannelQueueLength, 0)

// Place an old channel in the queue.
// This channel should not be affected by
// a requeue or a later channel timing out.
oldChannel := newChannel(l, nil, m.defaultCfg, defaultTestRollupConfig, 0, nil)
oldChannel.Close()
m.channelQueue = []*channel{oldChannel}
metrics.RecordChannelQueueLength(1) // we need to do this manually b/c we are not using the usual codepath to set state
require.Len(t, m.channelQueue, 1)
require.Equal(t, metrics.ChannelQueueLength, 1)

// Setup initial metrics
metrics.RecordL2BlockInPendingQueue(blockA)
Expand All @@ -429,6 +432,8 @@ func TestChannelManager_handleChannelInvalidated(t *testing.T) {
// Trigger the blocks -> channelQueue data pipelining
require.NoError(t, m.ensureChannelWithSpace(eth.BlockID{}))
require.Len(t, m.channelQueue, 2)
require.Equal(t, metrics.ChannelQueueLength, 2)

require.NoError(t, m.processBlocks())

// Assert that at least one block was processed into the channel
Expand All @@ -446,6 +451,7 @@ func TestChannelManager_handleChannelInvalidated(t *testing.T) {
require.Equal(t, m.blocks, stateSnapshot)
require.Contains(t, m.channelQueue, oldChannel)
require.Len(t, m.channelQueue, 1)
require.Equal(t, metrics.ChannelQueueLength, 1)

// Check metric came back up to previous value
require.Equal(t, pendingBytesBefore, metrics.PendingBlocksBytesCurrent)
Expand Down
11 changes: 11 additions & 0 deletions op-batcher/metrics/metrics.go
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,7 @@ type Metricer interface {
RecordChannelClosed(id derive.ChannelID, numPendingBlocks int, numFrames int, inputBytes int, outputComprBytes int, reason error)
RecordChannelFullySubmitted(id derive.ChannelID)
RecordChannelTimedOut(id derive.ChannelID)
RecordChannelQueueLength(len int)

RecordBatchTxSubmitted()
RecordBatchTxSuccess()
Expand Down Expand Up @@ -86,6 +87,7 @@ type Metrics struct {
channelComprRatio prometheus.Histogram
channelInputBytesTotal prometheus.Counter
channelOutputBytesTotal prometheus.Counter
channelQueueLength prometheus.Gauge

batcherTxEvs opmetrics.EventVec

Expand Down Expand Up @@ -191,6 +193,11 @@ func NewMetrics(procName string) *Metrics {
Name: "output_bytes_total",
Help: "Total number of compressed output bytes from a channel.",
}),
channelQueueLength: factory.NewGauge(prometheus.GaugeOpts{
Namespace: ns,
Name: "channel_queue_length",
Help: "The number of channels currently in memory.",
}),
blobUsedBytes: factory.NewHistogram(prometheus.HistogramOpts{
Namespace: ns,
Name: "blob_used_bytes",
Expand Down Expand Up @@ -338,6 +345,10 @@ func (m *Metrics) RecordBlobUsedBytes(num int) {
m.blobUsedBytes.Observe(float64(num))
}

func (m *Metrics) RecordChannelQueueLength(len int) {
m.channelQueueLength.Set(float64(len))
}

// estimateBatchSize returns the estimated size of the block in a batch both with compression ('daSize') and without
// ('rawSize').
func estimateBatchSize(block *types.Block) (daSize, rawSize uint64) {
Expand Down
1 change: 1 addition & 0 deletions op-batcher/metrics/noop.go
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,7 @@ func (*noopMetrics) RecordChannelClosed(derive.ChannelID, int, int, int, int, er

func (*noopMetrics) RecordChannelFullySubmitted(derive.ChannelID) {}
func (*noopMetrics) RecordChannelTimedOut(derive.ChannelID) {}
func (*noopMetrics) RecordChannelQueueLength(int) {}

func (*noopMetrics) RecordBatchTxSubmitted() {}
func (*noopMetrics) RecordBatchTxSuccess() {}
Expand Down
4 changes: 4 additions & 0 deletions op-batcher/metrics/test.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ import (
type TestMetrics struct {
noopMetrics
PendingBlocksBytesCurrent float64
ChannelQueueLength int
}

var _ Metricer = new(TestMetrics)
Expand All @@ -20,3 +21,6 @@ func (m *TestMetrics) RecordL2BlockInChannel(block *types.Block) {
_, rawSize := estimateBatchSize(block)
m.PendingBlocksBytesCurrent -= float64(rawSize)
}
func (m *TestMetrics) RecordChannelQueueLength(l int) {
m.ChannelQueueLength = l
}

0 comments on commit dbf1c04

Please sign in to comment.