From 5620f7382dffc6ce3b4a01ed6af9de55f6523ed8 Mon Sep 17 00:00:00 2001 From: Jian Xiao <99709935+jianoaix@users.noreply.github.com> Date: Tue, 15 Oct 2024 18:34:59 -0500 Subject: [PATCH] Perf: cache the operator state (#809) --- disperser/batcher/encoding_streamer.go | 23 +++++++++++++++++++++++ 1 file changed, 23 insertions(+) diff --git a/disperser/batcher/encoding_streamer.go b/disperser/batcher/encoding_streamer.go index c17142a35..2a14aa7ff 100644 --- a/disperser/batcher/encoding_streamer.go +++ b/disperser/batcher/encoding_streamer.go @@ -2,6 +2,7 @@ package batcher import ( "context" + "encoding/binary" "errors" "fmt" "strings" @@ -13,12 +14,15 @@ import ( "github.com/Layr-Labs/eigenda/disperser" "github.com/Layr-Labs/eigenda/encoding" "github.com/Layr-Labs/eigensdk-go/logging" + lru "github.com/hashicorp/golang-lru/v2" "github.com/wealdtech/go-merkletree/v2" grpc_metadata "google.golang.org/grpc/metadata" ) const encodingInterval = 2 * time.Second +const operatorStateCacheSize = 32 + var errNoEncodedResults = errors.New("no encoded results") type EncodedSizeNotifier struct { @@ -77,6 +81,8 @@ type EncodingStreamer struct { // Used to keep track of the last evaluated key for fetching metadatas exclusiveStartKey *disperser.BlobStoreExclusiveStartKey + + operatorStateCache *lru.Cache[string, *core.IndexedOperatorState] } type batch struct { @@ -110,6 +116,10 @@ func NewEncodingStreamer( if config.EncodingQueueLimit <= 0 { return nil, errors.New("EncodingQueueLimit should be greater than 0") } + operatorStateCache, err := lru.New[string, *core.IndexedOperatorState](operatorStateCacheSize) + if err != nil { + return nil, err + } return &EncodingStreamer{ StreamerConfig: config, EncodedBlobstore: newEncodedBlobStore(logger), @@ -125,6 +135,7 @@ func NewEncodingStreamer( batcherMetrics: batcherMetrics, logger: logger.With("component", "EncodingStreamer"), exclusiveStartKey: nil, + operatorStateCache: operatorStateCache, }, nil } @@ -669,11 +680,16 @@ func (e *EncodingStreamer) getOperatorState(ctx context.Context, metadatas []*di i++ } + cacheKey := computeCacheKey(blockNumber, quorumIds) + if val, ok := e.operatorStateCache.Get(cacheKey); ok { + return val, nil + } // GetIndexedOperatorState should return state for valid quorums only state, err := e.chainState.GetIndexedOperatorState(ctx, blockNumber, quorumIds) if err != nil { return nil, fmt.Errorf("error getting operator state at block number %d: %w", blockNumber, err) } + e.operatorStateCache.Add(cacheKey, state) return state, nil } @@ -699,3 +715,10 @@ func (e *EncodingStreamer) validateMetadataQuorums(metadatas []*disperser.BlobMe } return validMetadata } + +func computeCacheKey(blockNumber uint, quorumIDs []uint8) string { + bytes := make([]byte, 8+len(quorumIDs)) + binary.LittleEndian.PutUint64(bytes, uint64(blockNumber)) + copy(bytes[8:], quorumIDs) + return string(bytes) +}