Skip to content

Commit

Permalink
chore: Better abstract secondary storage - observe secondary storage …
Browse files Browse the repository at this point in the history
…via metrics - use in memory metrics
  • Loading branch information
epociask committed Oct 17, 2024
1 parent 8ef8108 commit 2fce490
Show file tree
Hide file tree
Showing 8 changed files with 141 additions and 311 deletions.
30 changes: 7 additions & 23 deletions e2e/optimism_test.go
Original file line number Diff line number Diff line change
@@ -1,11 +1,11 @@
package e2e_test

import (
"net/http"
"testing"

"github.com/Layr-Labs/eigenda-proxy/commitments"
"github.com/Layr-Labs/eigenda-proxy/e2e"
"github.com/Layr-Labs/eigenda-proxy/metrics"
altda "github.com/ethereum-optimism/optimism/op-alt-da"
"github.com/ethereum-optimism/optimism/op-e2e/actions"
"github.com/ethereum-optimism/optimism/op-e2e/config"
Expand Down Expand Up @@ -168,17 +168,10 @@ func TestOptimismKeccak256Commitment(gt *testing.T) {
optimism.sequencer.ActL2PipelineFull(t)
optimism.ActL1Finalized(t)

// assert that keccak256 primary store was written and read from
labels := metrics.BuildServerRPCLabels("put", "", string(commitments.OptimismKeccak), "0")
delete(labels, "method")

ms, err := proxyTS.MetricPoller.PollCountMetricsWithRetry(metrics.ServerRPCStatuses, labels, 20)
// assert that EigenDA proxy was written and read from using op keccak256 commitment mode
readCount, err := proxyTS.Metrics.HTTPServerRequestsTotal.Find(http.MethodGet, "", string(commitments.OptimismKeccak), "0")
require.NoError(t, err)
require.NotEmpty(t, ms)
require.Len(t, ms, 2)

require.True(t, ms[0].Count > 0)
require.True(t, ms[1].Count > 0)
require.True(t, readCount > 0)

}

Expand Down Expand Up @@ -229,17 +222,8 @@ func TestOptimismGenericCommitment(gt *testing.T) {
optimism.sequencer.ActL2PipelineFull(t)
optimism.ActL1Finalized(t)

// assert that EigenDA proxy's was written and read from

// assert that EigenDA's primary store was written and read from
labels := metrics.BuildServerRPCLabels("put", "", string(commitments.OptimismGeneric), "0")
delete(labels, "method")

ms, err := proxyTS.MetricPoller.PollCountMetricsWithRetry(metrics.ServerRPCStatuses, labels, 20)
// assert that EigenDA proxy was written and read from using op generic commitment mode
readCount, err := proxyTS.Metrics.HTTPServerRequestsTotal.Find(http.MethodGet, "", string(commitments.OptimismGeneric), "0")
require.NoError(t, err)
require.NotEmpty(t, ms)
require.Len(t, ms, 2)

require.True(t, ms[0].Count > 0)
require.True(t, ms[1].Count > 0)
require.True(t, readCount > 0)
}
25 changes: 6 additions & 19 deletions e2e/server_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,6 @@ import (
"time"

"github.com/Layr-Labs/eigenda-proxy/client"
"github.com/Layr-Labs/eigenda-proxy/metrics"
"github.com/Layr-Labs/eigenda-proxy/store"

"github.com/Layr-Labs/eigenda-proxy/e2e"
Expand Down Expand Up @@ -351,13 +350,10 @@ func TestProxyServerCaching(t *testing.T) {
require.Equal(t, testPreimage, preimage)

// ensure that read was from cache
labels := metrics.BuildSecondaryCountLabels(store.S3BackendType.String(), http.MethodGet, "success")

ms, err := ts.MetricPoller.PollCountMetricsWithRetry(metrics.SecondaryRequestStatuses, labels, 20)
count, err := ts.Metrics.SecondaryRequestsTotal.Find(store.S3BackendType.String(), http.MethodGet, "success")
require.NoError(t, err)
require.Len(t, ms, 1)

require.True(t, ms[0].Count > 0)
require.True(t, count > 0)

if useMemory() { // ensure that eigenda was not read from
memStats := ts.Server.GetEigenDAStats()
Expand Down Expand Up @@ -398,14 +394,10 @@ func TestProxyServerCachingWithRedis(t *testing.T) {
require.Equal(t, testPreimage, preimage)

// ensure that read was from cache
labels := metrics.BuildSecondaryCountLabels(store.RedisBackendType.String(), http.MethodGet, "success")
ms, err := ts.MetricPoller.PollCountMetricsWithRetry(metrics.SecondaryRequestStatuses, labels, 20)
readCount, err := ts.Metrics.SecondaryRequestsTotal.Find(store.RedisBackendType.String(), http.MethodGet, "success")
require.NoError(t, err)
require.NotEmpty(t, ms)
require.Len(t, ms, 1)
require.True(t, ms[0].Count >= 1)
require.True(t, readCount > 0)

// TODO: Add metrics for EigenDA dispersal/retrieval
if useMemory() { // ensure that eigenda was not read from
memStats := ts.Server.GetEigenDAStats()
require.Equal(t, 0, memStats.Reads)
Expand Down Expand Up @@ -455,14 +447,9 @@ func TestProxyServerReadFallback(t *testing.T) {
require.NoError(t, err)
require.Equal(t, testPreimage, preimage)

labels := metrics.BuildSecondaryCountLabels(store.S3BackendType.String(), http.MethodGet, "success")

ms, err := ts.MetricPoller.PollCountMetricsWithRetry(metrics.SecondaryRequestStatuses, labels, 20)
count, err := ts.Metrics.SecondaryRequestsTotal.Find(store.S3BackendType.String(), http.MethodGet, "success")
require.NoError(t, err)
require.NotEmpty(t, ms)
require.Len(t, ms, 1)

require.True(t, ms[0].Count > 0)
require.True(t, count > 0)

// TODO - remove this in favor of metrics sampling
if useMemory() { // ensure that an eigenda read was attempted with zero data available
Expand Down
31 changes: 9 additions & 22 deletions e2e/setup.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,6 @@ import (
"github.com/minio/minio-go/v7/pkg/credentials"
"golang.org/x/exp/rand"

"github.com/ethereum-optimism/optimism/op-service/httputil"
oplog "github.com/ethereum-optimism/optimism/op-service/log"
opmetrics "github.com/ethereum-optimism/optimism/op-service/metrics"

Expand Down Expand Up @@ -175,11 +174,10 @@ func TestSuiteConfig(t *testing.T, testCfg *Cfg) server.CLIConfig {
}

type TestSuite struct {
Ctx context.Context
Log log.Logger
Server *server.Server
MetricPoller *metrics.PollerClient
MetricSvr *httputil.HTTPServer
Ctx context.Context
Log log.Logger
Server *server.Server
Metrics *metrics.InMemoryMetricer
}

func CreateTestSuite(t *testing.T, testSuiteCfg server.CLIConfig) (TestSuite, func()) {
Expand All @@ -189,8 +187,8 @@ func CreateTestSuite(t *testing.T, testSuiteCfg server.CLIConfig) (TestSuite, fu
Color: true,
}).New("role", svcName)

m := metrics.NewInMemoryMetricer()
ctx := context.Background()
m := metrics.NewMetrics("default")
store, err := server.LoadStoreRouter(
ctx,
testSuiteCfg,
Expand All @@ -205,28 +203,17 @@ func CreateTestSuite(t *testing.T, testSuiteCfg server.CLIConfig) (TestSuite, fu
err = proxySvr.Start()
require.NoError(t, err)

metricsSvr, err := m.StartServer(host, 0)
t.Log("Starting metrics server...")

require.NoError(t, err)

kill := func() {
if err := proxySvr.Stop(); err != nil {
log.Error("failed to stop proxy server", "err", err)
}

if err := metricsSvr.Stop(context.Background()); err != nil {
log.Error("failed to stop metrics server", "err", err)
}
}
log.Info("started metrics server", "addr", metricsSvr.Addr())

return TestSuite{
Ctx: ctx,
Log: log,
Server: proxySvr,
MetricPoller: metrics.NewPoller(fmt.Sprintf("http://%s", metricsSvr.Addr().String())),
MetricSvr: metricsSvr,
Ctx: ctx,
Log: log,
Server: proxySvr,
Metrics: m,
}, kill
}

Expand Down
119 changes: 119 additions & 0 deletions metrics/memory.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,119 @@
package metrics

import (
"fmt"
"sort"
"sync"

"github.com/ethereum-optimism/optimism/op-service/metrics"
"github.com/ethereum/go-ethereum/common"
"github.com/ethereum/go-ethereum/crypto"
"github.com/ethereum/go-ethereum/rlp"
)

func keyLabels(labels []string) (common.Hash, error) {
sort.Strings(labels) // in-place sort strings so keys are order agnostic

encodedBytes, err := rlp.EncodeToBytes(labels)
if err != nil {
return common.Hash{}, err
}

hash := crypto.Keccak256Hash(encodedBytes)

return hash, nil
}

type MetricCountMap struct {
m *sync.Map
}

func NewCountMap() *MetricCountMap {
return &MetricCountMap{
m: new(sync.Map),
}
}

func (mcm *MetricCountMap) insert(values ...string) error {
key, err := keyLabels(values)

if err != nil {
return err
}

// update or add count entry
value, exists := mcm.m.Load(key.Hex())
if !exists {
mcm.m.Store(key.Hex(), uint64(1))
return nil
}
uint64Val, ok := value.(uint64)
if !ok {
return fmt.Errorf("could not read uint64 from sync map")
}

mcm.m.Store(key.Hex(), uint64Val+uint64(1))
return nil
}

func (mcm *MetricCountMap) Find(values ...string) (uint64, error) {
key, err := keyLabels(values)

if err != nil {
return 0, err
}

val, exists := mcm.m.Load(key.Hex())
if !exists {
return 0, fmt.Errorf("value doesn't exist")
}
uint64Val, ok := val.(uint64)
if !ok {
return 0, fmt.Errorf("could not read uint64 from sync map")
}

return uint64Val, nil
}

type InMemoryMetricer struct {
HTTPServerRequestsTotal *MetricCountMap
// secondary metrics
SecondaryRequestsTotal *MetricCountMap
}

func NewInMemoryMetricer() *InMemoryMetricer {
return &InMemoryMetricer{
HTTPServerRequestsTotal: NewCountMap(),
SecondaryRequestsTotal: NewCountMap(),
}
}

var _ Metricer = NewInMemoryMetricer()

func (n *InMemoryMetricer) Document() []metrics.DocumentedMetric {
return nil
}

func (n *InMemoryMetricer) RecordInfo(_ string) {
}

func (n *InMemoryMetricer) RecordUp() {
}

func (n *InMemoryMetricer) RecordRPCServerRequest(endpoint string) func(status, mode, ver string) {
return func(x string, y string, z string) {
err := n.HTTPServerRequestsTotal.insert(endpoint, x, y, z)
if err != nil {
panic(err)
}
}
}

func (n *InMemoryMetricer) RecordSecondaryRequest(x string, y string) func(status string) {
return func(z string) {
err := n.SecondaryRequestsTotal.insert(x, y, z)
if err != nil {
panic(err)
}
}
}
Loading

0 comments on commit 2fce490

Please sign in to comment.