Skip to content

Commit

Permalink
Perf: cache the operator state (#809)
Browse files Browse the repository at this point in the history
  • Loading branch information
jianoaix authored Oct 15, 2024
1 parent 8328766 commit 5620f73
Showing 1 changed file with 23 additions and 0 deletions.
23 changes: 23 additions & 0 deletions disperser/batcher/encoding_streamer.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ package batcher

import (
"context"
"encoding/binary"
"errors"
"fmt"
"strings"
Expand All @@ -13,12 +14,15 @@ import (
"github.com/Layr-Labs/eigenda/disperser"
"github.com/Layr-Labs/eigenda/encoding"
"github.com/Layr-Labs/eigensdk-go/logging"
lru "github.com/hashicorp/golang-lru/v2"
"github.com/wealdtech/go-merkletree/v2"
grpc_metadata "google.golang.org/grpc/metadata"
)

const encodingInterval = 2 * time.Second

const operatorStateCacheSize = 32

var errNoEncodedResults = errors.New("no encoded results")

type EncodedSizeNotifier struct {
Expand Down Expand Up @@ -77,6 +81,8 @@ type EncodingStreamer struct {

// Used to keep track of the last evaluated key for fetching metadatas
exclusiveStartKey *disperser.BlobStoreExclusiveStartKey

operatorStateCache *lru.Cache[string, *core.IndexedOperatorState]
}

type batch struct {
Expand Down Expand Up @@ -110,6 +116,10 @@ func NewEncodingStreamer(
if config.EncodingQueueLimit <= 0 {
return nil, errors.New("EncodingQueueLimit should be greater than 0")
}
operatorStateCache, err := lru.New[string, *core.IndexedOperatorState](operatorStateCacheSize)
if err != nil {
return nil, err
}
return &EncodingStreamer{
StreamerConfig: config,
EncodedBlobstore: newEncodedBlobStore(logger),
Expand All @@ -125,6 +135,7 @@ func NewEncodingStreamer(
batcherMetrics: batcherMetrics,
logger: logger.With("component", "EncodingStreamer"),
exclusiveStartKey: nil,
operatorStateCache: operatorStateCache,
}, nil
}

Expand Down Expand Up @@ -669,11 +680,16 @@ func (e *EncodingStreamer) getOperatorState(ctx context.Context, metadatas []*di
i++
}

cacheKey := computeCacheKey(blockNumber, quorumIds)
if val, ok := e.operatorStateCache.Get(cacheKey); ok {
return val, nil
}
// GetIndexedOperatorState should return state for valid quorums only
state, err := e.chainState.GetIndexedOperatorState(ctx, blockNumber, quorumIds)
if err != nil {
return nil, fmt.Errorf("error getting operator state at block number %d: %w", blockNumber, err)
}
e.operatorStateCache.Add(cacheKey, state)
return state, nil
}

Expand All @@ -699,3 +715,10 @@ func (e *EncodingStreamer) validateMetadataQuorums(metadatas []*disperser.BlobMe
}
return validMetadata
}

func computeCacheKey(blockNumber uint, quorumIDs []uint8) string {
bytes := make([]byte, 8+len(quorumIDs))
binary.LittleEndian.PutUint64(bytes, uint64(blockNumber))
copy(bytes[8:], quorumIDs)
return string(bytes)
}

0 comments on commit 5620f73

Please sign in to comment.