Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Submodules step 1/2: convert archivers to a plugin architecture #5597

Merged
merged 3 commits into from
Jan 20, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
91 changes: 91 additions & 0 deletions common/archiver/provider/init.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,91 @@
// The MIT License (MIT)

// Copyright (c) 2017-2020 Uber Technologies Inc.

// Permission is hereby granted, free of charge, to any person obtaining a copy
// of this software and associated documentation files (the "Software"), to deal
// in the Software without restriction, including without limitation the rights
// to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
// copies of the Software, and to permit persons to whom the Software is
// furnished to do so, subject to the following conditions:
//
// The above copyright notice and this permission notice shall be included in all
// copies or substantial portions of the Software.
//
// THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
// IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
// FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
// AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
// LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
// OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
// SOFTWARE.

package provider

import (
"fmt"

"github.com/uber/cadence/common/archiver"
"github.com/uber/cadence/common/archiver/filestore"
"github.com/uber/cadence/common/archiver/gcloud"
"github.com/uber/cadence/common/archiver/s3store"
"github.com/uber/cadence/common/config"
)

func init() {
// TODO: ideally remove this and handle per-instance registration during startup somehow,
// as globals and inits have consistently caused issues.
//
// For now though, it's replacing a hard-coded switch statement, so an init func
// is the most straightforward and should-be-identical conversion.

must := func(err error) {
if err != nil {
panic(fmt.Errorf("failed to register default provider: %w", err))
}
}

must(RegisterHistoryArchiver(filestore.URIScheme, config.FilestoreConfig, func(cfg *config.YamlNode, container *archiver.HistoryBootstrapContainer) (archiver.HistoryArchiver, error) {
var out *config.FilestoreArchiver
if err := cfg.Decode(&out); err != nil {
return nil, fmt.Errorf("bad config: %w", err)
}
return filestore.NewHistoryArchiver(container, out)
}))
must(RegisterHistoryArchiver(s3store.URIScheme, config.S3storeConfig, func(cfg *config.YamlNode, container *archiver.HistoryBootstrapContainer) (archiver.HistoryArchiver, error) {
var out *config.S3Archiver
if err := cfg.Decode(&out); err != nil {
return nil, fmt.Errorf("bad config: %w", err)
}
return s3store.NewHistoryArchiver(container, out)
}))
must(RegisterHistoryArchiver(gcloud.URIScheme, config.GCloudConfig, func(cfg *config.YamlNode, container *archiver.HistoryBootstrapContainer) (archiver.HistoryArchiver, error) {
var out *config.GstorageArchiver
if err := cfg.Decode(&out); err != nil {
return nil, fmt.Errorf("bad config: %w", err)
}
return gcloud.NewHistoryArchiver(container, out)
}))

must(RegisterVisibilityArchiver(filestore.URIScheme, config.FilestoreConfig, func(cfg *config.YamlNode, container *archiver.VisibilityBootstrapContainer) (archiver.VisibilityArchiver, error) {
var out *config.FilestoreArchiver
if err := cfg.Decode(&out); err != nil {
return nil, fmt.Errorf("bad config: %w", err)
}
return filestore.NewVisibilityArchiver(container, out)
}))
must(RegisterVisibilityArchiver(s3store.URIScheme, config.S3storeConfig, func(cfg *config.YamlNode, container *archiver.VisibilityBootstrapContainer) (archiver.VisibilityArchiver, error) {
var out *config.S3Archiver
if err := cfg.Decode(&out); err != nil {
return nil, fmt.Errorf("bad config: %w", err)
}
return s3store.NewVisibilityArchiver(container, out)
}))
must(RegisterVisibilityArchiver(gcloud.URIScheme, config.GCloudConfig, func(cfg *config.YamlNode, container *archiver.VisibilityBootstrapContainer) (archiver.VisibilityArchiver, error) {
var out *config.GstorageArchiver
if err := cfg.Decode(&out); err != nil {
return nil, fmt.Errorf("bad config: %w", err)
}
return gcloud.NewVisibilityArchiver(container, out)
}))
}
117 changes: 66 additions & 51 deletions common/archiver/provider/provider.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,12 +22,11 @@ package provider

import (
"errors"
"fmt"
"sync"

"github.com/uber/cadence/common/archiver"
"github.com/uber/cadence/common/archiver/filestore"
"github.com/uber/cadence/common/archiver/gcloud"
"github.com/uber/cadence/common/archiver/s3store"
"github.com/uber/cadence/common/archiver/provider/syncmap"
"github.com/uber/cadence/common/config"
)

Expand Down Expand Up @@ -60,8 +59,8 @@ type (
archiverProvider struct {
sync.RWMutex

historyArchiverConfigs *config.HistoryArchiverProvider
visibilityArchiverConfigs *config.VisibilityArchiverProvider
historyArchiverConfigs config.HistoryArchiverProvider
visibilityArchiverConfigs config.VisibilityArchiverProvider

// Key for the container is just serviceName
historyContainers map[string]*archiver.HistoryBootstrapContainer
Expand All @@ -71,12 +70,52 @@ type (
historyArchivers map[string]archiver.HistoryArchiver
visibilityArchivers map[string]archiver.VisibilityArchiver
}

historyConstructor struct {
fn func(cfg *config.YamlNode, container *archiver.HistoryBootstrapContainer) (archiver.HistoryArchiver, error)
// yaml key where this config exists, under archival.history.provider.
// This almost certainly should be the same as the scheme, but that'll need more work.
configKey string
}
visibilityConstructor struct {
fn func(cfg *config.YamlNode, container *archiver.VisibilityBootstrapContainer) (archiver.VisibilityArchiver, error)
// yaml key where this config exists, under archival.visibility.provider.
// This almost certainly should be the same as the scheme, but that'll need more work.
configKey string
}
)

var (
historyConstructors = syncmap.New[string, historyConstructor]()
visibilityConstructors = syncmap.New[string, visibilityConstructor]()
)

func RegisterHistoryArchiver(scheme, configKey string, constructor func(cfg *config.YamlNode, container *archiver.HistoryBootstrapContainer) (archiver.HistoryArchiver, error)) error {
inserted := historyConstructors.Put(scheme, historyConstructor{
fn: constructor,
configKey: configKey,
})
if !inserted {
return fmt.Errorf("history archiver already registered for scheme %q", scheme)
}
return nil
}

func RegisterVisibilityArchiver(scheme, configKey string, constructor func(cfg *config.YamlNode, container *archiver.VisibilityBootstrapContainer) (archiver.VisibilityArchiver, error)) error {
inserted := visibilityConstructors.Put(scheme, visibilityConstructor{
fn: constructor,
configKey: configKey,
})
if !inserted {
return fmt.Errorf("visibility archiver already registered for scheme %q", scheme)
}
return nil
}

// NewArchiverProvider returns a new Archiver provider
func NewArchiverProvider(
historyArchiverConfigs *config.HistoryArchiverProvider,
visibilityArchiverConfigs *config.VisibilityArchiverProvider,
historyArchiverConfigs config.HistoryArchiverProvider,
visibilityArchiverConfigs config.VisibilityArchiverProvider,
) ArchiverProvider {
return &archiverProvider{
historyArchiverConfigs: historyArchiverConfigs,
Expand Down Expand Up @@ -131,31 +170,19 @@ func (p *archiverProvider) GetHistoryArchiver(scheme, serviceName string) (histo
return nil, ErrBootstrapContainerNotFound
}

switch scheme {
case filestore.URIScheme:
if p.historyArchiverConfigs.Filestore == nil {
return nil, ErrArchiverConfigNotFound
}
historyArchiver, err = filestore.NewHistoryArchiver(container, p.historyArchiverConfigs.Filestore)

case gcloud.URIScheme:
if p.historyArchiverConfigs.Gstorage == nil {
return nil, ErrArchiverConfigNotFound
}

historyArchiver, err = gcloud.NewHistoryArchiver(container, p.historyArchiverConfigs.Gstorage)
constructor, ok := historyConstructors.Get(scheme)
if !ok {
return nil, fmt.Errorf("no history archiver constructor for scheme %q", scheme)
}

case s3store.URIScheme:
if p.historyArchiverConfigs.S3store == nil {
return nil, ErrArchiverConfigNotFound
}
historyArchiver, err = s3store.NewHistoryArchiver(container, p.historyArchiverConfigs.S3store)
default:
return nil, ErrUnknownScheme
cfg, ok := p.historyArchiverConfigs[constructor.configKey]
if !ok {
return nil, fmt.Errorf("no history archiver config for scheme %q, config key %q", scheme, constructor.configKey)
}

historyArchiver, err = constructor.fn(cfg, container)
if err != nil {
return nil, err
return nil, fmt.Errorf("history archiver constructor failed for scheme %q, config key %q: err: %w", scheme, constructor.configKey, err)
}

p.Lock()
Expand All @@ -181,31 +208,19 @@ func (p *archiverProvider) GetVisibilityArchiver(scheme, serviceName string) (ar
return nil, ErrBootstrapContainerNotFound
}

var visibilityArchiver archiver.VisibilityArchiver
var err error

switch scheme {
case filestore.URIScheme:
if p.visibilityArchiverConfigs.Filestore == nil {
return nil, ErrArchiverConfigNotFound
}
visibilityArchiver, err = filestore.NewVisibilityArchiver(container, p.visibilityArchiverConfigs.Filestore)
case s3store.URIScheme:
if p.visibilityArchiverConfigs.S3store == nil {
return nil, ErrArchiverConfigNotFound
}
visibilityArchiver, err = s3store.NewVisibilityArchiver(container, p.visibilityArchiverConfigs.S3store)
case gcloud.URIScheme:
if p.visibilityArchiverConfigs.Gstorage == nil {
return nil, ErrArchiverConfigNotFound
}
visibilityArchiver, err = gcloud.NewVisibilityArchiver(container, p.visibilityArchiverConfigs.Gstorage)

default:
return nil, ErrUnknownScheme
constructor, ok := visibilityConstructors.Get(scheme)
if !ok {
return nil, fmt.Errorf("no visibility archiver constructor for scheme %q", scheme)
}

cfg, ok := p.visibilityArchiverConfigs[constructor.configKey]
if !ok {
return nil, fmt.Errorf("no visibility archiver config for scheme %q, config key %q", scheme, constructor.configKey)
}

visibilityArchiver, err := constructor.fn(cfg, container)
if err != nil {
return nil, err
return nil, fmt.Errorf("visibility archiver constructor failed for scheme %q, config key %q: err: %w", scheme, constructor.configKey, err)
}

p.Lock()
Expand Down
61 changes: 61 additions & 0 deletions common/archiver/provider/syncmap/syncmap.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,61 @@
// The MIT License (MIT)

// Copyright (c) 2017-2020 Uber Technologies Inc.

// Permission is hereby granted, free of charge, to any person obtaining a copy
// of this software and associated documentation files (the "Software"), to deal
// in the Software without restriction, including without limitation the rights
// to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
// copies of the Software, and to permit persons to whom the Software is
// furnished to do so, subject to the following conditions:
//
// The above copyright notice and this permission notice shall be included in all
// copies or substantial portions of the Software.
//
// THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
// IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
// FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
// AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
// LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
// OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
// SOFTWARE.

package syncmap

import "sync"

// syncmap is a very simple type-safe locked map, with semantics similar to sync.Map,
// but only supports inserting once and getting.
type (
syncmap[K comparable, V any] struct {
mut sync.Mutex
data map[K]V
}
SyncMap[K comparable, V any] interface {
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I didn't look too hard in our libraries for similar stuff... but this is simple enough that tbh I just kinda don't care.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

can you add a unit test for this?

Copy link
Contributor Author

@Groxx Groxx Jan 22, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Get(key K) (value V, ok bool)
Put(key K, value V) (inserted bool)
}
)

func New[K comparable, V any]() SyncMap[K, V] {
return &syncmap[K, V]{
data: make(map[K]V),
}
}

func (m *syncmap[K, V]) Get(key K) (value V, ok bool) {
m.mut.Lock()
defer m.mut.Unlock()
value, ok = m.data[key]
return value, ok
}

func (m *syncmap[K, V]) Put(key K, value V) (inserted bool) {
m.mut.Lock()
defer m.mut.Unlock()
if _, ok := m.data[key]; ok {
return false
}
m.data[key] = value
return true
}
32 changes: 16 additions & 16 deletions common/config/archival_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,16 +28,22 @@ import (
"github.com/uber/cadence/common"
)

func defaultFilestoreConfig(t *testing.T) *YamlNode {
node, err := ToYamlNode(&FilestoreArchiver{
FileMode: "044",
})
require.NoError(t, err)
return node
}

// History archival

func TestValidEnabledHistoryArchivalConfig(t *testing.T) {
archival := Archival{
History: HistoryArchival{
Status: common.ArchivalEnabled,
Provider: &HistoryArchiverProvider{
Filestore: &FilestoreArchiver{
FileMode: "044",
},
Provider: HistoryArchiverProvider{
FilestoreConfig: defaultFilestoreConfig(t),
},
},
}
Expand All @@ -62,10 +68,8 @@ func TestInvalidHEnabledHistoryArchivalConfig(t *testing.T) {
func TestValidDisabledHistoryArchivalConfig(t *testing.T) {
archival := Archival{
History: HistoryArchival{
Provider: &HistoryArchiverProvider{
Filestore: &FilestoreArchiver{
FileMode: "044",
},
Provider: HistoryArchiverProvider{
FilestoreConfig: defaultFilestoreConfig(t),
},
},
}
Expand Down Expand Up @@ -97,10 +101,8 @@ func TestValidEnabledVisibilityArchivalConfig(t *testing.T) {
archival := Archival{
Visibility: VisibilityArchival{
Status: common.ArchivalEnabled,
Provider: &VisibilityArchiverProvider{
Filestore: &FilestoreArchiver{
FileMode: "044",
},
Provider: VisibilityArchiverProvider{
FilestoreConfig: defaultFilestoreConfig(t),
},
},
}
Expand All @@ -125,10 +127,8 @@ func TestInvalidHEnabledVisibilityArchivalConfig(t *testing.T) {
func TestValidDisabledVisibilityArchivalConfig(t *testing.T) {
archival := Archival{
Visibility: VisibilityArchival{
Provider: &VisibilityArchiverProvider{
Filestore: &FilestoreArchiver{
FileMode: "044",
},
Provider: VisibilityArchiverProvider{
FilestoreConfig: defaultFilestoreConfig(t),
},
},
}
Expand Down
Loading