Skip to content

Allow multiple management planes #1124

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Open
wants to merge 2 commits into
base: add-auxiliary-command-server-proto
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
127 changes: 95 additions & 32 deletions internal/command/command_plugin.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,17 @@ var _ bus.Plugin = (*CommandPlugin)(nil)

//go:generate go run github.com/maxbrunsfeld/counterfeiter/v6@v6.8.1 -generate
//counterfeiter:generate . commandService
type ServerType int

const (
Command ServerType = iota
Auxiliary
)

var serverType = map[ServerType]string{
Command: "command",
Auxiliary: "auxiliary",
}

type (
commandService interface {
Expand All @@ -38,37 +49,45 @@ type (
}

CommandPlugin struct {
messagePipe bus.MessagePipeInterface
config *config.Config
subscribeCancel context.CancelFunc
conn grpc.GrpcConnectionInterface
commandService commandService
subscribeChannel chan *mpi.ManagementPlaneRequest
subscribeMutex sync.Mutex
messagePipe bus.MessagePipeInterface
config *config.Config
subscribeCancel context.CancelFunc
conn grpc.GrpcConnectionInterface
commandService commandService
subscribeChannel chan *mpi.ManagementPlaneRequest
commandServerType ServerType
subscribeMutex sync.Mutex
}
)

func NewCommandPlugin(agentConfig *config.Config, grpcConnection grpc.GrpcConnectionInterface) *CommandPlugin {
func NewCommandPlugin(agentConfig *config.Config, grpcConnection grpc.GrpcConnectionInterface,
commandServerType ServerType,
) *CommandPlugin {
return &CommandPlugin{
config: agentConfig,
conn: grpcConnection,
subscribeChannel: make(chan *mpi.ManagementPlaneRequest),
config: agentConfig,
conn: grpcConnection,
subscribeChannel: make(chan *mpi.ManagementPlaneRequest),
commandServerType: commandServerType,
}
}

func (cp *CommandPlugin) Init(ctx context.Context, messagePipe bus.MessagePipeInterface) error {
slog.DebugContext(ctx, "Starting command plugin")
newCtx := context.WithValue(
ctx,
logger.ServerTypeContextKey, slog.Any(logger.ServerTypeKey, cp.commandServerType.String()),
)
slog.DebugContext(newCtx, "Starting command plugin", "command_server_type", cp.commandServerType.String())

cp.messagePipe = messagePipe
cp.commandService = NewCommandService(cp.conn.CommandServiceClient(), cp.config, cp.subscribeChannel)

go cp.monitorSubscribeChannel(ctx)
go cp.monitorSubscribeChannel(newCtx)

return nil
}

func (cp *CommandPlugin) Close(ctx context.Context) error {
slog.InfoContext(ctx, "Closing command plugin")
slog.InfoContext(ctx, "Closing command plugin", "command_server_type", cp.commandServerType.String())

cp.subscribeMutex.Lock()
if cp.subscribeCancel != nil {
Expand All @@ -81,32 +100,40 @@ func (cp *CommandPlugin) Close(ctx context.Context) error {

func (cp *CommandPlugin) Info() *bus.Info {
return &bus.Info{
Name: "command",
Name: cp.commandServerType.String(),
}
}

func (cp *CommandPlugin) Process(ctx context.Context, msg *bus.Message) {
switch msg.Topic {
case bus.ConnectionResetTopic:
cp.processConnectionReset(ctx, msg)
case bus.ResourceUpdateTopic:
cp.processResourceUpdate(ctx, msg)
case bus.InstanceHealthTopic:
cp.processInstanceHealth(ctx, msg)
case bus.DataPlaneHealthResponseTopic:
cp.processDataPlaneHealth(ctx, msg)
case bus.DataPlaneResponseTopic:
cp.processDataPlaneResponse(ctx, msg)
default:
slog.DebugContext(ctx, "Command plugin received unknown topic", "topic", msg.Topic)
slog.DebugContext(ctx, "Processing command", "command_server_type", logger.ServerType(ctx))

if logger.ServerType(ctx) == cp.commandServerType.String() || logger.ServerType(ctx) == "" {
switch msg.Topic {
case bus.ConnectionResetTopic:
cp.processConnectionReset(ctx, msg)
case bus.ResourceUpdateTopic:
cp.processResourceUpdate(ctx, msg)
case bus.InstanceHealthTopic:
cp.processInstanceHealth(ctx, msg)
case bus.DataPlaneHealthResponseTopic:
cp.processDataPlaneHealth(ctx, msg)
case bus.DataPlaneResponseTopic:
cp.processDataPlaneResponse(ctx, msg)
default:
slog.DebugContext(ctx, "Command plugin received unknown topic", "topic", msg.Topic)
}
}
}

func (cp *CommandPlugin) processResourceUpdate(ctx context.Context, msg *bus.Message) {
slog.DebugContext(ctx, "Command plugin received resource update message")
if resource, ok := msg.Data.(*mpi.Resource); ok {
if !cp.commandService.IsConnected() {
cp.createConnection(ctx, resource)
newCtx := context.WithValue(
ctx,
logger.ServerTypeContextKey, slog.Any(logger.ServerTypeKey, cp.commandServerType.String()),
)
cp.createConnection(newCtx, resource)
} else {
statusErr := cp.commandService.UpdateDataPlaneStatus(ctx, resource)
if statusErr != nil {
Expand Down Expand Up @@ -145,13 +172,14 @@ func (cp *CommandPlugin) processDataPlaneHealth(ctx context.Context, msg *bus.Me
correlationID := logger.CorrelationID(ctx)
if err != nil {
slog.ErrorContext(ctx, "Unable to update data plane health", "error", err)
cp.messagePipe.Process(ctx, &bus.Message{

cp.processDataPlaneResponse(ctx, &bus.Message{
Topic: bus.DataPlaneResponseTopic,
Data: cp.createDataPlaneResponse(correlationID, mpi.CommandResponse_COMMAND_STATUS_FAILURE,
"Failed to send the health status update", err.Error()),
})
}
cp.messagePipe.Process(ctx, &bus.Message{
cp.processDataPlaneResponse(ctx, &bus.Message{
Topic: bus.DataPlaneResponseTopic,
Data: cp.createDataPlaneResponse(correlationID, mpi.CommandResponse_COMMAND_STATUS_OK,
"Successfully sent health status update", ""),
Expand All @@ -164,7 +192,8 @@ func (cp *CommandPlugin) processInstanceHealth(ctx context.Context, msg *bus.Mes
if instances, ok := msg.Data.([]*mpi.InstanceHealth); ok {
err := cp.commandService.UpdateDataPlaneHealth(ctx, instances)
if err != nil {
slog.ErrorContext(ctx, "Unable to update data plane health", "error", err)
slog.ErrorContext(ctx, "Unable to update data plane health", "error", err,
"command_server_type", cp.commandServerType.String())
}
}
}
Expand Down Expand Up @@ -208,6 +237,7 @@ func (cp *CommandPlugin) Subscriptions() []string {
}
}

// nolint: revive, cyclop
func (cp *CommandPlugin) monitorSubscribeChannel(ctx context.Context) {
for {
select {
Expand All @@ -226,12 +256,26 @@ func (cp *CommandPlugin) monitorSubscribeChannel(ctx context.Context) {
slog.InfoContext(ctx, "Received management plane config upload request")
cp.handleConfigUploadRequest(newCtx, message)
case *mpi.ManagementPlaneRequest_ConfigApplyRequest:
if cp.commandServerType != Command {
slog.WarnContext(newCtx, "Auxiliary command server can not perform config apply",
"command_server_type", cp.commandServerType.String())
cp.handleInvalidRequest(newCtx, message)

return
}
slog.InfoContext(ctx, "Received management plane config apply request")
cp.handleConfigApplyRequest(newCtx, message)
case *mpi.ManagementPlaneRequest_HealthRequest:
slog.InfoContext(ctx, "Received management plane health request")
cp.handleHealthRequest(newCtx)
case *mpi.ManagementPlaneRequest_ActionRequest:
if cp.commandServerType != Command {
slog.WarnContext(newCtx, "Auxiliary command server can not perform api action",
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Same as comment above

"command_server_type", cp.commandServerType.String())
cp.handleInvalidRequest(newCtx, message)

return
}
slog.InfoContext(ctx, "Received management plane action request")
cp.handleAPIActionRequest(newCtx, message)
default:
Expand Down Expand Up @@ -320,6 +364,21 @@ func (cp *CommandPlugin) handleHealthRequest(newCtx context.Context) {
cp.messagePipe.Process(newCtx, &bus.Message{Topic: bus.DataPlaneHealthRequestTopic})
}

func (cp *CommandPlugin) handleInvalidRequest(ctx context.Context, message *mpi.ManagementPlaneRequest) {
err := cp.commandService.SendDataPlaneResponse(ctx, &mpi.DataPlaneResponse{
MessageMeta: message.GetMessageMeta(),
CommandResponse: &mpi.CommandResponse{
Status: mpi.CommandResponse_COMMAND_STATUS_FAILURE,
Message: "Can not perform write action as auxiliary command server",
Error: "request not allowed",
},
InstanceId: message.GetActionRequest().GetInstanceId(),
})
if err != nil {
slog.ErrorContext(ctx, "Unable to send data plane response", "error", err)
}
}

func (cp *CommandPlugin) createDataPlaneResponse(correlationID string, status mpi.CommandResponse_CommandStatus,
message, err string,
) *mpi.DataPlaneResponse {
Expand All @@ -336,3 +395,7 @@ func (cp *CommandPlugin) createDataPlaneResponse(correlationID string, status mp
},
}
}

func (s ServerType) String() string {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can you move this to the top near the ServerType declaration?

return serverType[s]
}
18 changes: 9 additions & 9 deletions internal/command/command_plugin_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -32,14 +32,14 @@ import (
)

func TestCommandPlugin_Info(t *testing.T) {
commandPlugin := NewCommandPlugin(types.AgentConfig(), &grpcfakes.FakeGrpcConnectionInterface{})
commandPlugin := NewCommandPlugin(types.AgentConfig(), &grpcfakes.FakeGrpcConnectionInterface{}, Command)
info := commandPlugin.Info()

assert.Equal(t, "command", info.Name)
}

func TestCommandPlugin_Subscriptions(t *testing.T) {
commandPlugin := NewCommandPlugin(types.AgentConfig(), &grpcfakes.FakeGrpcConnectionInterface{})
commandPlugin := NewCommandPlugin(types.AgentConfig(), &grpcfakes.FakeGrpcConnectionInterface{}, Command)
subscriptions := commandPlugin.Subscriptions()

assert.Equal(
Expand All @@ -60,7 +60,7 @@ func TestCommandPlugin_Init(t *testing.T) {
messagePipe := busfakes.NewFakeMessagePipe()
fakeCommandService := &commandfakes.FakeCommandService{}

commandPlugin := NewCommandPlugin(types.AgentConfig(), &grpcfakes.FakeGrpcConnectionInterface{})
commandPlugin := NewCommandPlugin(types.AgentConfig(), &grpcfakes.FakeGrpcConnectionInterface{}, Command)
err := commandPlugin.Init(ctx, messagePipe)
require.NoError(t, err)

Expand All @@ -79,7 +79,7 @@ func TestCommandPlugin_createConnection(t *testing.T) {
commandService.CreateConnectionReturns(&mpi.CreateConnectionResponse{}, nil)
messagePipe := busfakes.NewFakeMessagePipe()

commandPlugin := NewCommandPlugin(types.AgentConfig(), &grpcfakes.FakeGrpcConnectionInterface{})
commandPlugin := NewCommandPlugin(types.AgentConfig(), &grpcfakes.FakeGrpcConnectionInterface{}, Command)
err := commandPlugin.Init(ctx, messagePipe)
commandPlugin.commandService = commandService
require.NoError(t, err)
Expand Down Expand Up @@ -111,7 +111,7 @@ func TestCommandPlugin_Process(t *testing.T) {
messagePipe := busfakes.NewFakeMessagePipe()
fakeCommandService := &commandfakes.FakeCommandService{}

commandPlugin := NewCommandPlugin(types.AgentConfig(), &grpcfakes.FakeGrpcConnectionInterface{})
commandPlugin := NewCommandPlugin(types.AgentConfig(), &grpcfakes.FakeGrpcConnectionInterface{}, Command)
err := commandPlugin.Init(ctx, messagePipe)
require.NoError(t, err)
defer commandPlugin.Close(ctx)
Expand Down Expand Up @@ -219,7 +219,7 @@ func TestCommandPlugin_monitorSubscribeChannel(t *testing.T) {

agentConfig := types.AgentConfig()
agentConfig.Features = test.configFeatures
commandPlugin := NewCommandPlugin(agentConfig, &grpcfakes.FakeGrpcConnectionInterface{})
commandPlugin := NewCommandPlugin(agentConfig, &grpcfakes.FakeGrpcConnectionInterface{}, Command)
err := commandPlugin.Init(ctx, messagePipe)
require.NoError(tt, err)
defer commandPlugin.Close(ctx)
Expand Down Expand Up @@ -319,7 +319,7 @@ func TestCommandPlugin_FeatureDisabled(t *testing.T) {

agentConfig.Features = test.configFeatures

commandPlugin := NewCommandPlugin(agentConfig, &grpcfakes.FakeGrpcConnectionInterface{})
commandPlugin := NewCommandPlugin(agentConfig, &grpcfakes.FakeGrpcConnectionInterface{}, Command)
err := commandPlugin.Init(ctx, messagePipe)
commandPlugin.commandService = fakeCommandService
require.NoError(tt, err)
Expand All @@ -344,7 +344,7 @@ func TestMonitorSubscribeChannel(t *testing.T) {
logBuf := &bytes.Buffer{}
stub.StubLoggerWith(logBuf)

cp := NewCommandPlugin(types.AgentConfig(), &grpcfakes.FakeGrpcConnectionInterface{})
cp := NewCommandPlugin(types.AgentConfig(), &grpcfakes.FakeGrpcConnectionInterface{}, Command)
cp.subscribeCancel = cncl

message := protos.CreateManagementPlaneRequest()
Expand Down Expand Up @@ -383,7 +383,7 @@ func Test_createDataPlaneResponse(t *testing.T) {
Error: "",
},
}
commandPlugin := NewCommandPlugin(types.AgentConfig(), &grpcfakes.FakeGrpcConnectionInterface{})
commandPlugin := NewCommandPlugin(types.AgentConfig(), &grpcfakes.FakeGrpcConnectionInterface{}, Command)
result := commandPlugin.createDataPlaneResponse(expected.GetMessageMeta().GetCorrelationId(),
expected.GetCommandResponse().GetStatus(),
expected.GetCommandResponse().GetMessage(), expected.GetCommandResponse().GetError())
Expand Down
1 change: 1 addition & 0 deletions internal/command/command_service.go
Original file line number Diff line number Diff line change
Expand Up @@ -261,6 +261,7 @@ func (cs *CommandService) UpdateClient(ctx context.Context, client mpi.CommandSe
cs.subscribeClientMutex.Unlock()

cs.isConnected.Store(false)

resp, err := cs.CreateConnection(ctx, cs.resource)
if err != nil {
return err
Expand Down
Loading