Skip to content

Commit

Permalink
[UDT PR 6/N] Add default controller to async controller registry (#8050)
Browse files Browse the repository at this point in the history
# Description

This change adds a "default" controller capability to the controller
registry used by the async worker. This is needed for the dynamic-rp.
Because the dynamic-rp is extensible, it processes many different types
of operations, it's not possible for us to statically register all of
these. Instead, the dynamic-rp will use this new default capability to
register a single controller with dynamic functionality.

## Type of change

- This pull request is a minor refactor, code cleanup, test improvement,
or other maintenance task and doesn't change the functionality of Radius
(issue link optional).


## Contributor checklist
Please verify that the PR meets the following requirements, where
applicable:

- [ ] An overview of proposed schema changes is included in a linked
GitHub issue.
- [ ] A design document PR is created in the [design-notes
repository](https://github.com/radius-project/design-notes/), if new
APIs are being introduced.
- [ ] If applicable, design document has been reviewed and approved by
Radius maintainers/approvers.
- [ ] A PR for the [samples
repository](https://github.com/radius-project/samples) is created, if
existing samples are affected by the changes in this PR.
- [ ] A PR for the [documentation
repository](https://github.com/radius-project/docs) is created, if the
changes in this PR affect the documentation or any user facing updates
are made.
- [ ] A PR for the [recipes
repository](https://github.com/radius-project/recipes) is created, if
existing recipes are affected by the changes in this PR.

Signed-off-by: Ryan Nowak <nowakra@gmail.com>
  • Loading branch information
rynowak authored Nov 20, 2024
1 parent c1237c2 commit 654ce08
Show file tree
Hide file tree
Showing 4 changed files with 107 additions and 9 deletions.
47 changes: 42 additions & 5 deletions pkg/armrpc/asyncoperation/worker/registry.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,13 +25,17 @@ import (
"github.com/radius-project/radius/pkg/ucp/dataprovider"
)

// ControllerFactoryFunc is a factory function to create a controller.
type ControllerFactoryFunc func(opts ctrl.Options) (ctrl.Controller, error)

// ControllerRegistry is an registry to register async controllers.
type ControllerRegistry struct {
ctrlMap map[string]ctrl.Controller
ctrlMapMu sync.RWMutex
sp dataprovider.DataStorageProvider

defaultFactory ControllerFactoryFunc
defaultOpts ctrl.Options
}

// NewControllerRegistry creates an ControllerRegistry instance.
Expand All @@ -42,13 +46,14 @@ func NewControllerRegistry(sp dataprovider.DataStorageProvider) *ControllerRegis
}
}

// Register registers controller.
// Register registers a controller for a specific resource type and operation method.
//
// Controllers registered using Register will be cached by the registry and the same instance will be reused.
func (h *ControllerRegistry) Register(ctx context.Context, resourceType string, method v1.OperationMethod, factoryFn ControllerFactoryFunc, opts ctrl.Options) error {
h.ctrlMapMu.Lock()
defer h.ctrlMapMu.Unlock()

ot := v1.OperationType{Type: resourceType, Method: method}

storageClient, err := h.sp.GetStorageClient(ctx, resourceType)
if err != nil {
return err
Expand All @@ -65,14 +70,46 @@ func (h *ControllerRegistry) Register(ctx context.Context, resourceType string,
return nil
}

// RegisterDefault registers a default controller that will be used when no other controller is found.
//
// The default controller will be used when Get is called with an operation type that has no registered controller.
// The default controller will not be cached by the registry.
func (h *ControllerRegistry) RegisterDefault(ctx context.Context, factoryFn ControllerFactoryFunc, opts ctrl.Options) error {
h.ctrlMapMu.Lock()
defer h.ctrlMapMu.Unlock()

h.defaultFactory = factoryFn
h.defaultOpts = opts
return nil
}

// Get gets the registered async controller instance.
func (h *ControllerRegistry) Get(operationType v1.OperationType) ctrl.Controller {
func (h *ControllerRegistry) Get(ctx context.Context, operationType v1.OperationType) (ctrl.Controller, error) {
h.ctrlMapMu.RLock()
defer h.ctrlMapMu.RUnlock()

if h, ok := h.ctrlMap[operationType.String()]; ok {
return h
return h, nil
}

return nil
return h.getDefault(ctx, operationType)
}

func (h *ControllerRegistry) getDefault(ctx context.Context, operationType v1.OperationType) (ctrl.Controller, error) {
if h.defaultFactory == nil {
return nil, nil
}

storageClient, err := h.sp.GetStorageClient(ctx, operationType.Type)
if err != nil {
return nil, err
}

// Copy the options so we can update it.
opts := h.defaultOpts

opts.StorageClient = storageClient
opts.ResourceType = operationType.Type

return h.defaultFactory(opts)
}
56 changes: 54 additions & 2 deletions pkg/armrpc/asyncoperation/worker/registry_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -63,8 +63,60 @@ func TestRegister_Get(t *testing.T) {
}, ctrlOpts)
require.NoError(t, err)

ctrl := registry.Get(opGet)
ctrl, err := registry.Get(context.Background(), opGet)
require.NoError(t, err)
require.NotNil(t, ctrl)

ctrl, err = registry.Get(context.Background(), opPut)
require.NoError(t, err)
require.NotNil(t, ctrl)

// Getting a controller that is not registered should return nil by default.
ctrl, err = registry.Get(context.Background(), v1.OperationType{Type: "Applications.Core/unknown", Method: v1.OperationGet})
require.NoError(t, err)
require.Nil(t, ctrl)
}

func TestRegister_Get_WithDefault(t *testing.T) {
mctrl := gomock.NewController(t)
defer mctrl.Finish()

mockSP := dataprovider.NewMockDataStorageProvider(mctrl)
mockSP.EXPECT().GetStorageClient(gomock.Any(), gomock.Any()).Return(nil, nil).AnyTimes()

registry := NewControllerRegistry(mockSP)

opGet := v1.OperationType{Type: "Applications.Core/environments", Method: v1.OperationGet}

ctrlOpts := ctrl.Options{
StorageClient: nil,
DataProvider: mockSP,
GetDeploymentProcessor: func() deployment.DeploymentProcessor { return nil },
}

err := registry.Register(context.TODO(), opGet.Type, opGet.Method, func(opts ctrl.Options) (ctrl.Controller, error) {
return &testAsyncController{
BaseController: ctrl.NewBaseAsyncController(ctrlOpts),
fn: func(ctx context.Context) (ctrl.Result, error) {
return ctrl.Result{}, nil
},
}, nil
}, ctrlOpts)
require.NoError(t, err)

err = registry.RegisterDefault(context.TODO(), func(opts ctrl.Options) (ctrl.Controller, error) {
return &testAsyncController{
BaseController: ctrl.NewBaseAsyncController(ctrlOpts),
}, nil
}, ctrlOpts)
require.NoError(t, err)

ctrl, err := registry.Get(context.Background(), opGet)
require.NoError(t, err)
require.NotNil(t, ctrl)
ctrl = registry.Get(opPut)

// Getting a controller that is not registered should default the default
ctrl, err = registry.Get(context.Background(), v1.OperationType{Type: "Applications.Core/unknown", Method: v1.OperationGet})
require.NoError(t, err)
require.NotNil(t, ctrl)
}
10 changes: 9 additions & 1 deletion pkg/armrpc/asyncoperation/worker/worker.go
Original file line number Diff line number Diff line change
Expand Up @@ -168,7 +168,15 @@ func (w *AsyncRequestProcessWorker) Start(ctx context.Context) error {
}
reqCtx = v1.WithARMRequestContext(reqCtx, armReqCtx)

asyncCtrl := w.registry.Get(armReqCtx.OperationType)
asyncCtrl, err := w.registry.Get(reqCtx, armReqCtx.OperationType)
if err != nil {
opLogger.Error(err, "failed to get async controller.")
if err := w.requestQueue.FinishMessage(reqCtx, msgreq); err != nil {
opLogger.Error(err, "failed to finish the message")
}
return
}

if asyncCtrl == nil {
opLogger.Error(nil, "cannot process unknown operation: "+armReqCtx.OperationType.String())
if err := w.requestQueue.FinishMessage(reqCtx, msgreq); err != nil {
Expand Down
3 changes: 2 additions & 1 deletion pkg/armrpc/builder/builder_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -261,7 +261,8 @@ func TestApplyAsyncHandler(t *testing.T) {
}

for _, op := range expectedOperations {
jobCtrl := registry.Get(op)
jobCtrl, err := registry.Get(context.Background(), op)
require.NoError(t, err)
require.NotNil(t, jobCtrl)
}
}

0 comments on commit 654ce08

Please sign in to comment.