Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion router-tests/operations/pql_manifest_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -87,7 +87,7 @@ func TestPQLManifest(t *testing.T) {
require.Equal(t, expectedEmployeesBody, res.Body)

// Verify startup log
logEntries := xEnv.Observer().FilterMessageSnippet("Loaded initial PQL manifest").All()
logEntries := xEnv.Observer().FilterMessageSnippet("Loaded PQL manifest").All()
require.Len(t, logEntries, 1)
})
})
Expand Down
13 changes: 6 additions & 7 deletions router/core/init_config_poller.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,6 @@ import (
"errors"
"fmt"

"github.com/wundergraph/cosmo/router/pkg/config"
"github.com/wundergraph/cosmo/router/pkg/controlplane/configpoller"
"github.com/wundergraph/cosmo/router/pkg/execution_config"
"github.com/wundergraph/cosmo/router/pkg/routerconfig"
Expand All @@ -13,9 +12,9 @@ import (
"go.uber.org/zap"
)

func getConfigClient(r *Router, cdnProviders map[string]config.CDNStorageProvider, s3Providers map[string]config.S3StorageProvider, providerID string, isFallbackClient bool) (client *routerconfig.Client, err error) {
func getConfigClient(r *Router, registry *ProviderRegistry, providerID string, isFallbackClient bool) (client *routerconfig.Client, err error) {
// CDN Providers
if provider, ok := cdnProviders[providerID]; ok {
if provider, ok := registry.CDN(providerID); ok {
if r.graphApiToken == "" {
return nil, errors.New(
"graph token is required to fetch execution config from CDN. " +
Expand Down Expand Up @@ -50,7 +49,7 @@ func getConfigClient(r *Router, cdnProviders map[string]config.CDNStorageProvide
}

// S3 Providers
if provider, ok := s3Providers[providerID]; ok {
if provider, ok := registry.S3(providerID); ok {
clientOptions := &configs3Provider.ClientOptions{
AccessKeyID: provider.AccessKey,
SecretAccessKey: provider.SecretKey,
Expand Down Expand Up @@ -121,12 +120,12 @@ func getConfigClient(r *Router, cdnProviders map[string]config.CDNStorageProvide
}

// InitializeConfigPoller creates a poller to fetch execution config. It is only initialized when a config poller is configured and the router is not started with a static config
func InitializeConfigPoller(r *Router, cdnProviders map[string]config.CDNStorageProvider, s3Providers map[string]config.S3StorageProvider) (*configpoller.ConfigPoller, error) {
func InitializeConfigPoller(r *Router, registry *ProviderRegistry) (*configpoller.ConfigPoller, error) {
if r.staticExecutionConfig != nil || r.routerConfigPollerConfig == nil || r.configPoller != nil {
return nil, nil
}

primaryClient, err := getConfigClient(r, cdnProviders, s3Providers, r.routerConfigPollerConfig.Storage.ProviderID, false)
primaryClient, err := getConfigClient(r, registry, r.routerConfigPollerConfig.Storage.ProviderID, false)
if err != nil {
return nil, err
}
Expand All @@ -141,7 +140,7 @@ func InitializeConfigPoller(r *Router, cdnProviders map[string]config.CDNStorage
return nil, errors.New("cannot use the same storage as both primary and fallback provider for execution config")
}

fallbackClient, err = getConfigClient(r, cdnProviders, s3Providers, r.routerConfigPollerConfig.FallbackStorage.ProviderID, true)
fallbackClient, err = getConfigClient(r, registry, r.routerConfigPollerConfig.FallbackStorage.ProviderID, true)
if err != nil {
return nil, err
}
Expand Down
86 changes: 86 additions & 0 deletions router/core/provider_registry.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,86 @@
package core

import (
"fmt"

"github.com/wundergraph/cosmo/router/pkg/config"
)

// ProviderRegistry indexes storage provider configurations by ID, providing
// typed lookups with clear error messages. It is built once during router
// initialization and shared across all subsystems that need to resolve a
// provider by its configured ID.
type ProviderRegistry struct {
s3 map[string]config.S3StorageProvider
cdn map[string]config.CDNStorageProvider
redis map[string]config.RedisStorageProvider
fileSystem map[string]config.FileSystemStorageProvider
}

// NewProviderRegistry builds lookup maps for every provider type and returns
// an error if any type contains duplicate IDs.
func NewProviderRegistry(providers config.StorageProviders) (*ProviderRegistry, error) {
r := &ProviderRegistry{
s3: make(map[string]config.S3StorageProvider, len(providers.S3)),
cdn: make(map[string]config.CDNStorageProvider, len(providers.CDN)),
redis: make(map[string]config.RedisStorageProvider, len(providers.Redis)),
fileSystem: make(map[string]config.FileSystemStorageProvider, len(providers.FileSystem)),
}

for _, p := range providers.S3 {
if _, ok := r.s3[p.ID]; ok {
return nil, fmt.Errorf("duplicate s3 storage provider with id '%s'", p.ID)
}
r.s3[p.ID] = p
}
for _, p := range providers.CDN {
if _, ok := r.cdn[p.ID]; ok {
return nil, fmt.Errorf("duplicate cdn storage provider with id '%s'", p.ID)
}
r.cdn[p.ID] = p
}
for _, p := range providers.Redis {
if _, ok := r.redis[p.ID]; ok {
return nil, fmt.Errorf("duplicate Redis storage provider with id '%s'", p.ID)
}
r.redis[p.ID] = p
}
for _, p := range providers.FileSystem {
if _, ok := r.fileSystem[p.ID]; ok {
return nil, fmt.Errorf("duplicate file system storage provider with id '%s'", p.ID)
}
r.fileSystem[p.ID] = p
}

return r, nil
}

// S3 looks up an S3 provider by ID.
func (r *ProviderRegistry) S3(id string) (config.S3StorageProvider, bool) {
p, ok := r.s3[id]
return p, ok
}

// CDN looks up a CDN provider by ID.
func (r *ProviderRegistry) CDN(id string) (config.CDNStorageProvider, bool) {
p, ok := r.cdn[id]
return p, ok
}

// Redis looks up a Redis provider by ID.
func (r *ProviderRegistry) Redis(id string) (config.RedisStorageProvider, bool) {
p, ok := r.redis[id]
return p, ok
}

// FileSystem looks up a filesystem provider by ID.
func (r *ProviderRegistry) FileSystem(id string) (config.FileSystemStorageProvider, bool) {
p, ok := r.fileSystem[id]
return p, ok
}

// IsFileSystem returns true if the given ID matches a filesystem provider.
func (r *ProviderRegistry) IsFileSystem(id string) bool {
_, ok := r.fileSystem[id]
return ok
}
107 changes: 107 additions & 0 deletions router/core/provider_registry_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,107 @@
package core

import (
"testing"

"github.com/stretchr/testify/require"
"github.com/wundergraph/cosmo/router/pkg/config"
)

func TestProviderRegistry(t *testing.T) {
t.Parallel()

t.Run("successful lookups", func(t *testing.T) {
t.Parallel()

reg, err := NewProviderRegistry(config.StorageProviders{
S3: []config.S3StorageProvider{{ID: "my-s3", Bucket: "b"}},
CDN: []config.CDNStorageProvider{{ID: "my-cdn", URL: "https://cdn"}},
Redis: []config.RedisStorageProvider{{ID: "my-redis"}},
FileSystem: []config.FileSystemStorageProvider{{ID: "my-fs", Path: "/tmp"}},
})
require.NoError(t, err)

s3, ok := reg.S3("my-s3")
require.True(t, ok)
require.Equal(t, "b", s3.Bucket)

cdn, ok := reg.CDN("my-cdn")
require.True(t, ok)
require.Equal(t, "https://cdn", cdn.URL)

redis, ok := reg.Redis("my-redis")
require.True(t, ok)
require.Equal(t, "my-redis", redis.ID)

fs, ok := reg.FileSystem("my-fs")
require.True(t, ok)
require.Equal(t, "/tmp", fs.Path)
})

t.Run("unknown ID returns false", func(t *testing.T) {
t.Parallel()

reg, err := NewProviderRegistry(config.StorageProviders{})
require.NoError(t, err)

_, ok := reg.S3("nope")
require.False(t, ok)

_, ok = reg.CDN("nope")
require.False(t, ok)

_, ok = reg.Redis("nope")
require.False(t, ok)

_, ok = reg.FileSystem("nope")
require.False(t, ok)
})

t.Run("duplicate S3 ID", func(t *testing.T) {
t.Parallel()

_, err := NewProviderRegistry(config.StorageProviders{
S3: []config.S3StorageProvider{{ID: "dup"}, {ID: "dup"}},
})
require.ErrorContains(t, err, "duplicate s3 storage provider with id 'dup'")
})

t.Run("duplicate CDN ID", func(t *testing.T) {
t.Parallel()

_, err := NewProviderRegistry(config.StorageProviders{
CDN: []config.CDNStorageProvider{{ID: "dup"}, {ID: "dup"}},
})
require.ErrorContains(t, err, "duplicate cdn storage provider with id 'dup'")
})

t.Run("duplicate Redis ID", func(t *testing.T) {
t.Parallel()

_, err := NewProviderRegistry(config.StorageProviders{
Redis: []config.RedisStorageProvider{{ID: "dup"}, {ID: "dup"}},
})
require.ErrorContains(t, err, "duplicate Redis storage provider with id 'dup'")
})

t.Run("duplicate FileSystem ID", func(t *testing.T) {
t.Parallel()

_, err := NewProviderRegistry(config.StorageProviders{
FileSystem: []config.FileSystemStorageProvider{{ID: "dup"}, {ID: "dup"}},
})
require.ErrorContains(t, err, "duplicate file system storage provider with id 'dup'")
})

t.Run("IsFileSystem", func(t *testing.T) {
t.Parallel()

reg, err := NewProviderRegistry(config.StorageProviders{
FileSystem: []config.FileSystemStorageProvider{{ID: "fs1"}},
})
require.NoError(t, err)

require.True(t, reg.IsFileSystem("fs1"))
require.False(t, reg.IsFileSystem("nope"))
})
}
Loading
Loading