Skip to content

Commit

Permalink
refactor: rename container state to container status
Browse files Browse the repository at this point in the history
  • Loading branch information
m8vago committed Apr 2, 2024
1 parent c61ef4b commit 4d027e8
Show file tree
Hide file tree
Showing 7 changed files with 59 additions and 56 deletions.
75 changes: 40 additions & 35 deletions golang/internal/grpc/grpc.go
Original file line number Diff line number Diff line change
Expand Up @@ -53,7 +53,7 @@ type ContainerLogContext struct {
Echo bool
}

type ContainerStateStream struct {
type ContainerStatusStream struct {
Events chan []*common.ContainerStateItem
Error chan error
}
Expand All @@ -77,7 +77,7 @@ type ClientLoop struct {

type (
DeployFunc func(context.Context, *dogger.DeploymentLogger, *v1.DeployImageRequest, *v1.VersionData) error
ContainerStateFunc func(context.Context, string, bool) (*ContainerStateStream, error)
WatchContainerStatusFunc func(context.Context, string, bool) (*ContainerStatusStream, error)
DeleteFunc func(context.Context, string, string) error
SecretListFunc func(context.Context, string, string) ([]string, error)
SelfUpdateFunc func(context.Context, *agent.AgentUpdateRequest, UpdateOptions) error
Expand All @@ -93,7 +93,7 @@ type (

type WorkerFunctions struct {
Deploy DeployFunc
WatchContainerState ContainerStateFunc
WatchContainerStatus WatchContainerStatusFunc
Delete DeleteFunc
SecretList SecretListFunc
SelfUpdate SelfUpdateFunc
Expand Down Expand Up @@ -167,7 +167,7 @@ func (cl *ClientLoop) grpcProcessCommand(command *agent.AgentCommand) {
case command.GetDeploy() != nil:
go executeVersionDeployRequest(cl.Ctx, command.GetDeploy(), cl.WorkerFuncs.Deploy, cl.AppConfig)
case command.GetContainerState() != nil:
go executeContainerState(cl.Ctx, command.GetContainerState(), cl.WorkerFuncs.WatchContainerState)
go executeWatchContainerStatus(cl.Ctx, command.GetContainerState(), cl.WorkerFuncs.WatchContainerStatus)
case command.GetContainerDelete() != nil:
go executeDeleteContainer(cl.Ctx, command.GetContainerDelete(), cl.WorkerFuncs.Delete)
case command.GetDeployLegacy() != nil:
Expand All @@ -183,7 +183,7 @@ func (cl *ClientLoop) grpcProcessCommand(command *agent.AgentCommand) {
case command.GetDeleteContainers() != nil:
go executeDeleteMultipleContainers(cl.Ctx, command.GetDeleteContainers(), cl.WorkerFuncs.DeleteContainers)
case command.GetContainerLog() != nil:
go executeContainerLog(cl.Ctx, command.GetContainerLog(), cl.WorkerFuncs.ContainerLog, cl.WorkerFuncs.WatchContainerState)
go executeContainerLog(cl.Ctx, command.GetContainerLog(), cl.WorkerFuncs.ContainerLog, cl.WorkerFuncs.WatchContainerStatus)
case command.GetContainerInspect() != nil:
go executeContainerInspect(cl.Ctx, command.GetContainerInspect(), cl.WorkerFuncs.ContainerInspect)
case command.GetReplaceToken() != nil:
Expand Down Expand Up @@ -462,36 +462,36 @@ func executeVersionDeployRequest(
}
}

func streamContainerState(
func streamContainerStatus(
streamCtx context.Context,
filterPrefix string,
stream agent.Agent_ContainerStateClient,
req *agent.ContainerStateRequest,
eventsContext *ContainerStateStream,
eventsContext *ContainerStatusStream,
) {
for {
select {
case <-streamCtx.Done():
return
case eventError := <-eventsContext.Error:
log.Error().Err(eventError).Msg("Container state stream error")
log.Error().Err(eventError).Msg("Container status stream error")
return
case event := <-eventsContext.Events:
err := stream.Send(&common.ContainerStateListMessage{
Prefix: req.Prefix,
Data: event,
})
if err != nil {
log.Error().Err(err).Msg("Container state channel error")
log.Error().Err(err).Msg("Container status channel error")
return
}

if req.OneShot != nil && *req.OneShot {
err := stream.CloseSend()
if err == nil {
log.Info().Str("prefix", filterPrefix).Msg("Closed container state channel")
log.Info().Str("prefix", filterPrefix).Msg("Closed container status channel")
} else {
log.Error().Err(err).Str("prefix", filterPrefix).Msg("Failed to close container state channel")
log.Error().Err(err).Str("prefix", filterPrefix).Msg("Failed to close container status channel")
}

return
Expand All @@ -501,8 +501,8 @@ func streamContainerState(
}
}

func executeContainerState(ctx context.Context, req *agent.ContainerStateRequest, containerStateFn ContainerStateFunc) {
if containerStateFn == nil {
func executeWatchContainerStatus(ctx context.Context, req *agent.ContainerStateRequest, containerStatusFn WatchContainerStatusFunc) {
if containerStatusFn == nil {
log.Error().Msg("Watch function not implemented")
return
}
Expand All @@ -512,32 +512,32 @@ func executeContainerState(ctx context.Context, req *agent.ContainerStateRequest
filterPrefix = *req.Prefix
}

log.Info().Str("prefix", filterPrefix).Msg("Opening container state channel")
log.Info().Str("prefix", filterPrefix).Msg("Opening container status channel")

streamCtx := metadata.AppendToOutgoingContext(ctx, "dyo-filter-prefix", filterPrefix)
stream, err := grpcConn.Client.ContainerState(streamCtx, grpc.WaitForReady(true))
if err != nil {
log.Error().Err(err).Msg("Failed to open container state channel")
log.Error().Err(err).Msg("Failed to open container status channel")
return
}

defer func() {
err = stream.CloseSend()
if err != nil {
log.Error().Err(err).Stack().Str("prefix", filterPrefix).Msg("Failed to close container state stream")
log.Error().Err(err).Stack().Str("prefix", filterPrefix).Msg("Failed to close container status stream")
}
}()

streamCtx = stream.Context()

eventsContext, err := containerStateFn(streamCtx, filterPrefix, true)
eventsContext, err := containerStatusFn(streamCtx, filterPrefix, true)
if err != nil {
log.Error().Err(err).Str("prefix", filterPrefix).Msg("Failed to open container state reader")
log.Error().Err(err).Str("prefix", filterPrefix).Msg("Failed to open container status reader")
return
}

// The channel consumer must run in a gofunc so RecvMsg can receive server side stream close events
go streamContainerState(streamCtx, filterPrefix, stream, req, eventsContext)
go streamContainerStatus(streamCtx, filterPrefix, stream, req, eventsContext)

// RecvMsg must be called in order to get an error if the server closes the stream
for {
Expand All @@ -550,7 +550,7 @@ func executeContainerState(ctx context.Context, req *agent.ContainerStateRequest

<-streamCtx.Done()

log.Info().Str("prefix", filterPrefix).Msg("Container state channel closed")
log.Info().Str("prefix", filterPrefix).Msg("Container status channel closed")
}

func executeDeleteContainer(ctx context.Context, req *agent.ContainerDeleteRequest, deleteFn DeleteFunc) {
Expand Down Expand Up @@ -593,7 +593,7 @@ func executeDeleteMultipleContainers(ctx context.Context, req *common.DeleteCont

_, err = grpcConn.Client.DeleteContainers(ctx, &common.Empty{})
if err != nil {
log.Error().Stack().Err(err).Msg("Secret list response error")
log.Error().Stack().Err(err).Msg("Delete multiple containers response error")
return
}
}
Expand Down Expand Up @@ -823,14 +823,14 @@ func readContainerLog(logContext *ContainerLogContext, sendLog SendLogFunc, pref
return nil
}

func waitForContainerState(ctx context.Context,
containerStateFn ContainerStateFunc,
func waitForContainerStatus(ctx context.Context,
containerStatusFn WatchContainerStatusFunc,
prefix, name string,
state common.ContainerState,
containerStatus common.ContainerState,
) error {
eventsContext, err := containerStateFn(ctx, prefix, false)
eventsContext, err := containerStatusFn(ctx, prefix, false)
if err != nil {
log.Error().Err(err).Str("prefix", prefix).Msg("Failed to open container state reader")
log.Error().Err(err).Str("prefix", prefix).Msg("Failed to open container status reader")
return err
}

Expand All @@ -839,15 +839,15 @@ func waitForContainerState(ctx context.Context,
case <-ctx.Done():
return nil
case err = <-eventsContext.Error:
log.Error().Err(err).Msg("Container state stream error, while streaming the container log")
log.Error().Err(err).Msg("Container status stream error, while streaming the container log")
return err
case events := <-eventsContext.Events:
for _, ev := range events {
if ev.Id.Prefix != prefix || ev.Id.Name != name {
continue
}

if ev.State == state {
if ev.State == containerStatus {
return nil
}
}
Expand All @@ -859,7 +859,7 @@ func streamContainerLog(
streamCtx context.Context,
logFunc ContainerLogFunc,
client agent.Agent_ContainerLogStreamClient,
containerStateFn ContainerStateFunc,
containerStatusFn WatchContainerStatusFunc,
command *agent.ContainerLogRequest,
) {
prefix := command.Container.Prefix
Expand Down Expand Up @@ -889,9 +889,9 @@ func streamContainerLog(

log.Trace().Str("prefix", prefix).Str("name", name).Msg("Container log finished non streaming (EOF), waiting for running state")

err = waitForContainerState(streamCtx, containerStateFn, prefix, name, common.ContainerState_RUNNING)
err = waitForContainerStatus(streamCtx, containerStatusFn, prefix, name, common.ContainerState_RUNNING)
if err != nil {
log.Error().Err(err).Stack().Str("prefix", prefix).Str("name", name).Msg("Container state stream error")
log.Error().Err(err).Stack().Str("prefix", prefix).Str("name", name).Msg("Container status stream error")
break
}
}
Expand All @@ -906,7 +906,7 @@ func streamContainerLog(

func executeContainerLogStream(streamCtx context.Context,
logFunc ContainerLogFunc,
stateFunc ContainerStateFunc,
statusFunc WatchContainerStatusFunc,
command *agent.ContainerLogRequest,
) {
prefix := command.Container.Prefix
Expand All @@ -927,7 +927,7 @@ func executeContainerLogStream(streamCtx context.Context,

streamCtx = stream.Context()

go streamContainerLog(streamCtx, logFunc, stream, stateFunc, command)
go streamContainerLog(streamCtx, logFunc, stream, statusFunc, command)

for {
var msg interface{}
Expand Down Expand Up @@ -985,7 +985,12 @@ func executeContainerLogRequest(ctx context.Context, logFunc ContainerLogFunc, c
log.Trace().Str("prefix", prefix).Str("name", name).Msg("Container log sent")
}

func executeContainerLog(ctx context.Context, command *agent.ContainerLogRequest, logFunc ContainerLogFunc, stateFunc ContainerStateFunc) {
func executeContainerLog(
ctx context.Context,
command *agent.ContainerLogRequest,
logFunc ContainerLogFunc,
statusFunc WatchContainerStatusFunc,
) {
if logFunc == nil {
log.Error().Msg("Container log function not implemented")
return
Expand All @@ -1000,7 +1005,7 @@ func executeContainerLog(ctx context.Context, command *agent.ContainerLogRequest
ctx = metadata.AppendToOutgoingContext(ctx, "dyo-container-prefix", prefix, "dyo-container-name", name)

if command.Streaming {
executeContainerLogStream(ctx, logFunc, stateFunc, command)
executeContainerLogStream(ctx, logFunc, statusFunc, command)
} else {
executeContainerLogRequest(ctx, logFunc, command)
}
Expand Down
16 changes: 8 additions & 8 deletions golang/pkg/crane/crane.go
Original file line number Diff line number Diff line change
Expand Up @@ -38,14 +38,14 @@ func Serve(cfg *config.Configuration, secretStore commonConfig.SecretStore) {

grpcContext := grpc.WithGRPCConfig(context.Background(), cfg)
grpc.Init(grpcContext, &cfg.CommonConfiguration, secretStore, &grpc.WorkerFunctions{
Deploy: k8s.Deploy,
WatchContainerState: crux.WatchDeploymentsByPrefix,
Delete: k8s.Delete,
ContainerCommand: crux.DeploymentCommand,
DeleteContainers: k8s.DeleteMultiple,
SecretList: crux.GetSecretsList,
ContainerLog: k8s.PodLog,
Close: grpcClose,
Deploy: k8s.Deploy,
WatchContainerStatus: crux.WatchDeploymentsByPrefix,
Delete: k8s.Delete,
ContainerCommand: crux.DeploymentCommand,
DeleteContainers: k8s.DeleteMultiple,
SecretList: crux.GetSecretsList,
ContainerLog: k8s.PodLog,
Close: grpcClose,
})
}

Expand Down
4 changes: 2 additions & 2 deletions golang/pkg/crane/crux/deploy.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@ import (
"github.com/dyrector-io/dyrectorio/protobuf/go/common"
)

func WatchDeploymentsByPrefix(ctx context.Context, namespace string, sendInitialStates bool) (*grpc.ContainerStateStream, error) {
func WatchDeploymentsByPrefix(ctx context.Context, namespace string, sendInitialStates bool) (*grpc.ContainerStatusStream, error) {
cfg := grpc.GetConfigFromContext(ctx).(*config.Configuration)
client := k8s.NewClient(cfg)

Expand All @@ -37,7 +37,7 @@ func WatchDeploymentsByPrefix(ctx context.Context, namespace string, sendInitial
eventChannel := make(chan []*common.ContainerStateItem)
errorChannel := make(chan error)

watchContext := &grpc.ContainerStateStream{
watchContext := &grpc.ContainerStatusStream{
Events: eventChannel,
Error: errorChannel,
}
Expand Down
8 changes: 4 additions & 4 deletions golang/pkg/crane/crux/watch.go
Original file line number Diff line number Diff line change
Expand Up @@ -113,7 +113,7 @@ func sendDeploymentInformerEvent(
filterNamespace string,
deploymentHandler *k8s.Deployment,
svcHandler *k8s.Service,
watchContext *grpc.ContainerStateStream,
watchContext *grpc.ContainerStatusStream,
cfg *config.Configuration,
) {
data, ok := obj.(*unstructured.Unstructured)
Expand Down Expand Up @@ -149,7 +149,7 @@ func watchDeployments(
clusterClient *dynamic.DynamicClient,
deploymentHandler *k8s.Deployment,
svcHandler *k8s.Service,
watchContext *grpc.ContainerStateStream,
watchContext *grpc.ContainerStatusStream,
cfg *config.Configuration,
) {
resource := schema.GroupVersionResource{Group: "apps", Version: "v1", Resource: "deployments"}
Expand Down Expand Up @@ -227,7 +227,7 @@ func podToStateItem(
func pushEvent(
ctx context.Context,
event watch.Event,
watchContext *grpc.ContainerStateStream,
watchContext *grpc.ContainerStatusStream,
deploymentHandler *k8s.Deployment,
svcHandler *k8s.Service,
cfg *config.Configuration,
Expand Down Expand Up @@ -265,7 +265,7 @@ func watchPods(
clientSet *kubernetes.Clientset,
deploymentHandler *k8s.Deployment,
svcHandler *k8s.Service,
watchContext *grpc.ContainerStateStream,
watchContext *grpc.ContainerStatusStream,
cfg *config.Configuration,
sendInitialStates bool,
) {
Expand Down
2 changes: 1 addition & 1 deletion golang/pkg/dagent/dagent.go
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,7 @@ func Serve(cfg *config.Configuration) {
grpcContext := grpc.WithGRPCConfig(context.Background(), cfg)
grpc.Init(grpcContext, &cfg.CommonConfiguration, cfg, &grpc.WorkerFunctions{
Deploy: utils.DeployImage,
WatchContainerState: utils.ContainerStateStream,
WatchContainerStatus: utils.ContainerStateStream,
Delete: utils.DeleteContainerByPrefixAndName,
SecretList: utils.SecretList,
SelfUpdate: update.SelfUpdate,
Expand Down
6 changes: 3 additions & 3 deletions golang/pkg/dagent/utils/dockerhelper.go
Original file line number Diff line number Diff line change
Expand Up @@ -95,7 +95,7 @@ func messageToStateItem(ctx context.Context, prefix string, event *events.Messag
return newState, nil
}

func ContainerStateStream(ctx context.Context, prefix string, sendInitalStates bool) (*grpc.ContainerStateStream, error) {
func ContainerStateStream(ctx context.Context, prefix string, sendInitalStates bool) (*grpc.ContainerStatusStream, error) {
cli, err := client.NewClientWithOpts(client.FromEnv, client.WithAPIVersionNegotiation())
if err != nil {
return nil, err
Expand All @@ -120,7 +120,7 @@ func WatchContainersByPrefix(ctx context.Context,
cli client.APIClient,
prefix string,
initialStates []types.Container,
) (*grpc.ContainerStateStream, error) {
) (*grpc.ContainerStatusStream, error) {
var err error

eventChannel := make(chan []*common.ContainerStateItem)
Expand Down Expand Up @@ -156,7 +156,7 @@ func WatchContainersByPrefix(ctx context.Context,
}
}(ctx, prefix, chanMessages, chanErrors)

return &grpc.ContainerStateStream{
return &grpc.ContainerStatusStream{
Events: eventChannel,
Error: errorChannel,
}, nil
Expand Down
4 changes: 1 addition & 3 deletions protobuf/proto/agent.proto
Original file line number Diff line number Diff line change
Expand Up @@ -84,9 +84,7 @@ message VersionDeployRequest {
/*
* Request for a keys of existing secrets in a prefix, eg. namespace
*/
message ListSecretsRequest {
common.ContainerIdentifier container = 1;
}
message ListSecretsRequest { common.ContainerIdentifier container = 1; }

/**
* Deploys a single container
Expand Down

0 comments on commit 4d027e8

Please sign in to comment.