Skip to content

Commit

Permalink
feat: Support explict modes for read fallbacks and caching (#104)
Browse files Browse the repository at this point in the history
* feat: Support explict modes for read fallback and caching

* feat: Support explict modes for read fallback and caching - updated docs and fixed bugs

* feat: Support explict modes for read fallback and caching - updated docs and fixed bugs

* feat: Support explict modes for read fallback and caching - updated E2E test

* feat: Support explict modes for read fallback and caching - updated E2E test

* feat: Support explict modes for read fallback and caching - updated cfg validation tests

* feat: Support explict modes for read fallback and caching - incorporate PR feedback

* feat: Support explict modes for read fallback and caching - update query timeout
  • Loading branch information
epociask authored Aug 29, 2024
1 parent 83811ac commit bb493d8
Show file tree
Hide file tree
Showing 16 changed files with 687 additions and 191 deletions.
6 changes: 4 additions & 2 deletions Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,9 @@ run-minio:
docker run -p 4566:9000 -d -e "MINIO_ROOT_USER=minioadmin" -e "MINIO_ROOT_PASSWORD=minioadmin" --name minio minio/minio server /data

stop-minio:
docker stop minio && docker rm minio
@if [ -n "$$(docker ps -q -f name=minio)" ]; then \
docker stop minio && docker rm minio; \
fi

run-server:
./bin/eigenda-proxy
Expand All @@ -38,7 +40,7 @@ clean:
test:
go test -v ./... -parallel 4

e2e-test: run-minio
e2e-test: stop-minio run-minio
$(E2ETEST) && \
make stop-minio

Expand Down
10 changes: 7 additions & 3 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -59,7 +59,8 @@ In order to disperse to the EigenDA network in production, or at high throughput
| `--s3.bucket` | | `$EIGENDA_PROXY_S3_BUCKET` | Bucket name for S3 storage. |
| `--s3.path` | | `$EIGENDA_PROXY_S3_PATH` | Bucket path for S3 storage. |
| `--s3.endpoint` | | `$EIGENDA_PROXY_S3_ENDPOINT` | Endpoint for S3 storage. |
| `--s3.backup` | `false` | `$EIGENDA_PROXY_S3_BACKUP` | Backup mode for S3. | whether to use S3 as a backup store to ensure resiliency in case of EigenDA read failure |
| `--routing.fallback-targets` | `[]` | `$EIGENDA_PROXY_FALLBACK_TARGETS` | Fall back backend targets. Supports S3. | Backup storage locations to read from in the event of eigenda retrieval failure. |
| `--routing.cache-targets` | `[]` | `$EIGENDA_PROXY_CACHE_TARGETS` | Caching targets. Supports S3. | Caches data to backend targets after dispersing to DA, retrieved from before trying read from EigenDA. |
| `--s3.timeout` | `5s` | `$EIGENDA_PROXY_S3_TIMEOUT` | timeout for S3 storage operations (e.g. get, put) |
| `--help, -h` | `false` | | Show help. |
| `--version, -v` | `false` | | Print the version. |
Expand Down Expand Up @@ -88,8 +89,11 @@ An optional `--eigenda-eth-confirmation-depth` flag can be provided to specify a

An ephemeral memory store backend can be used for faster feedback testing when testing rollup integrations. To target this feature, use the CLI flags `--memstore.enabled`, `--memstore.expiration`.

### S3 Fallback
An optional S3 fallback `--s3.backup` can be triggered to ensure resiliency when **reading**. When enabled, a blob is persisted to S3 after being successfully dispersed using the keccak256 hash of the existing commitment for the entity key. In the event that blobs cannot be read from EigenDA, they will then be retrieved from S3.
### Storage Fallback
An optional storage fallback CLI flag `--routing.fallback-targets` can be leveraged to ensure resiliency when **reading**. When enabled, a blob is persisted to a fallback target after being successfully dispersed. Fallback targets use the keccak256 hash of the existing EigenDA commitment as their key, for succinctness. In the event that blobs cannot be read from EigenDA, they will then be retrieved in linear order from the provided fallback targets.

### Storage Caching
An optional storage caching CLI flag `--routing.cache-targets` can be leveraged to ensure less redundancy and more optimal reading. When enabled, a blob is persisted to each cache target after being successfully dispersed using the keccak256 hash of the existing EigenDA commitment for the fallback target key. This ensure second order keys are succinct. Upon a blob retrieval request, the cached targets are first referenced to read the blob data before referring to EigenDA.


## Metrics
Expand Down
8 changes: 5 additions & 3 deletions e2e/optimism_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -121,7 +121,9 @@ func TestOptimismKeccak256Commitment(gt *testing.T) {
gt.Skip("Skipping test as INTEGRATION or TESTNET env var not set")
}

proxyTS, shutDown := e2e.CreateTestSuite(gt, useMemory(), true)
testCfg := e2e.TestConfig(useMemory())
testCfg.UseKeccak256ModeS3 = true
proxyTS, shutDown := e2e.CreateTestSuite(gt, testCfg)
defer shutDown()

t := actions.NewDefaultTesting(gt)
Expand Down Expand Up @@ -174,7 +176,7 @@ func TestOptimismAltDACommitment(gt *testing.T) {
gt.Skip("Skipping test as INTEGRATION or TESTNET env var not set")
}

proxyTS, shutDown := e2e.CreateTestSuite(gt, useMemory(), false)
proxyTS, shutDown := e2e.CreateTestSuite(gt, e2e.TestConfig(useMemory()))
defer shutDown()

t := actions.NewDefaultTesting(gt)
Expand Down Expand Up @@ -218,7 +220,7 @@ func TestOptimismAltDACommitment(gt *testing.T) {
// assert that EigenDA proxy's was written and read from

if useMemory() {
stat := proxyTS.Server.GetMemStats()
stat := proxyTS.Server.GetEigenDAStats()
require.Equal(t, 1, stat.Entries)
require.Equal(t, 1, stat.Reads)
}
Expand Down
126 changes: 115 additions & 11 deletions e2e/server_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ package e2e_test
import (
"strings"
"testing"
"time"

"github.com/Layr-Labs/eigenda-proxy/client"
"github.com/Layr-Labs/eigenda-proxy/e2e"
Expand All @@ -14,14 +15,17 @@ func useMemory() bool {
return !runTestnetIntegrationTests
}

func TestOptimismClientWithS3Backend(t *testing.T) {
func TestOptimismClientWithKeccak256Commitment(t *testing.T) {
if !runIntegrationTests && !runTestnetIntegrationTests {
t.Skip("Skipping test as INTEGRATION or TESTNET env var not set")
}

t.Parallel()

ts, kill := e2e.CreateTestSuite(t, useMemory(), true)
testCfg := e2e.TestConfig(useMemory())
testCfg.UseKeccak256ModeS3 = true

ts, kill := e2e.CreateTestSuite(t, testCfg)
defer kill()

daClient := op_plasma.NewDAClient(ts.Address(), false, true)
Expand All @@ -36,16 +40,19 @@ func TestOptimismClientWithS3Backend(t *testing.T) {
require.Equal(t, testPreimage, preimage)
}

func TestOptimismClientWithEigenDABackend(t *testing.T) {
// this test asserts that the data can be posted/read to EigenDA with a concurrent S3 backend configured
/*
this test asserts that the data can be posted/read to EigenDA
with a concurrent S3 backend configured
*/
func TestOptimismClientWithGenericCommitment(t *testing.T) {

if !runIntegrationTests && !runTestnetIntegrationTests {
t.Skip("Skipping test as INTEGRATION or TESTNET env var not set")
}

t.Parallel()

ts, kill := e2e.CreateTestSuite(t, useMemory(), true)
ts, kill := e2e.CreateTestSuite(t, e2e.TestConfig(useMemory()))
defer kill()

daClient := op_plasma.NewDAClient(ts.Address(), false, false)
Expand All @@ -69,7 +76,7 @@ func TestProxyClient(t *testing.T) {

t.Parallel()

ts, kill := e2e.CreateTestSuite(t, useMemory(), false)
ts, kill := e2e.CreateTestSuite(t, e2e.TestConfig(useMemory()))
defer kill()

cfg := &client.Config{
Expand All @@ -89,21 +96,21 @@ func TestProxyClient(t *testing.T) {
require.Equal(t, testPreimage, preimage)
}

func TestProxyClientWithLargeBlob(t *testing.T) {
func TestProxyServerWithLargeBlob(t *testing.T) {
if !runIntegrationTests && !runTestnetIntegrationTests {
t.Skip("Skipping test as INTEGRATION or TESTNET env var not set")
}

t.Parallel()

ts, kill := e2e.CreateTestSuite(t, useMemory(), false)
ts, kill := e2e.CreateTestSuite(t, e2e.TestConfig(useMemory()))
defer kill()

cfg := &client.Config{
URL: ts.Address(),
}
daClient := client.New(cfg)
// 2MB blob
// 4MB blob
testPreimage := []byte(e2e.RandString(4_000_000))

t.Log("Setting input data on proxy server...")
Expand All @@ -116,14 +123,14 @@ func TestProxyClientWithLargeBlob(t *testing.T) {
require.Equal(t, testPreimage, preimage)
}

func TestProxyClientWithOversizedBlob(t *testing.T) {
func TestProxyServerWithOversizedBlob(t *testing.T) {
if !runIntegrationTests && !runTestnetIntegrationTests {
t.Skip("Skipping test as INTEGRATION or TESTNET env var not set")
}

t.Parallel()

ts, kill := e2e.CreateTestSuite(t, useMemory(), false)
ts, kill := e2e.CreateTestSuite(t, e2e.TestConfig(useMemory()))
defer kill()

cfg := &client.Config{
Expand All @@ -150,3 +157,100 @@ func TestProxyClientWithOversizedBlob(t *testing.T) {
require.True(t, oversizedError)

}

/*
Ensure that proxy is able to write/read from a cache backend when enabled
*/
func TestProxyServerCaching(t *testing.T) {
if !runIntegrationTests && !runTestnetIntegrationTests {
t.Skip("Skipping test as INTEGRATION or TESTNET env var not set")
}

t.Parallel()

testCfg := e2e.TestConfig(useMemory())
testCfg.UseS3Caching = true

ts, kill := e2e.CreateTestSuite(t, testCfg)
defer kill()

cfg := &client.Config{
URL: ts.Address(),
}
daClient := client.New(cfg)
// 1mb blob
testPreimage := []byte(e2e.RandString(1_0000))

t.Log("Setting input data on proxy server...")
blobInfo, err := daClient.SetData(ts.Ctx, testPreimage)
require.NotEmpty(t, blobInfo)
require.NoError(t, err)

t.Log("Getting input data from proxy server...")
preimage, err := daClient.GetData(ts.Ctx, blobInfo)
require.NoError(t, err)
require.Equal(t, testPreimage, preimage)

// ensure that read was from cache
s3Stats := ts.Server.GetS3Stats()
require.Equal(t, 1, s3Stats.Reads)
require.Equal(t, 1, s3Stats.Entries)

if useMemory() { // ensure that eigenda was not read from
memStats := ts.Server.GetEigenDAStats()
require.Equal(t, 0, memStats.Reads)
require.Equal(t, 1, memStats.Entries)
}
}

/*
Ensure that fallback location is read from when EigenDA blob is not available.
This is done by setting the memstore expiration time to 1ms and waiting for the blob to expire
before attempting to read it.
*/

func TestProxyServerReadFallback(t *testing.T) {
// test can't be ran against holesky since read failure case can't be manually triggered
if !runIntegrationTests || runTestnetIntegrationTests {
t.Skip("Skipping test as INTEGRATION env var not set")
}

t.Parallel()

testCfg := e2e.TestConfig(useMemory())
testCfg.UseS3Fallback = true
testCfg.Expiration = time.Millisecond * 1

ts, kill := e2e.CreateTestSuite(t, testCfg)
defer kill()

cfg := &client.Config{
URL: ts.Address(),
}
daClient := client.New(cfg)
// 1mb blob
testPreimage := []byte(e2e.RandString(1_0000))

t.Log("Setting input data on proxy server...")
blobInfo, err := daClient.SetData(ts.Ctx, testPreimage)
require.NotEmpty(t, blobInfo)
require.NoError(t, err)

time.Sleep(time.Second * 1)

t.Log("Getting input data from proxy server...")
preimage, err := daClient.GetData(ts.Ctx, blobInfo)
require.NoError(t, err)
require.Equal(t, testPreimage, preimage)

// ensure that read was from fallback target location (i.e, S3 for this test)
s3Stats := ts.Server.GetS3Stats()
require.Equal(t, 1, s3Stats.Reads)
require.Equal(t, 1, s3Stats.Entries)

if useMemory() { // ensure that an eigenda read was attempted with zero data available
memStats := ts.Server.GetEigenDAStats()
require.Equal(t, 1, memStats.Reads)
require.Equal(t, 0, memStats.Entries)
}
}
Loading

0 comments on commit bb493d8

Please sign in to comment.