Skip to content

Commit

Permalink
Add custominfo to agents
Browse files Browse the repository at this point in the history
Signed-off-by: ddl-rliu <richard.liu@dominodatalab.com>
  • Loading branch information
ddl-rliu committed Jul 29, 2024
1 parent f075b34 commit 252bac0
Showing 1 changed file with 18 additions and 14 deletions.
32 changes: 18 additions & 14 deletions flyteplugins/go/tasks/plugins/webapi/agent/plugin.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ import (
"time"

"golang.org/x/exp/maps"
"google.golang.org/protobuf/types/known/structpb"
"k8s.io/apimachinery/pkg/util/wait"

"github.com/flyteorg/flyte/flyteidl/gen/pb-go/flyteidl/admin"
Expand Down Expand Up @@ -39,10 +40,11 @@ type Plugin struct {
type ResourceWrapper struct {
Phase flyteIdl.TaskExecution_Phase
// Deprecated: Please Use Phase instead.
State admin.State
Outputs *flyteIdl.LiteralMap
Message string
LogLinks []*flyteIdl.TaskLog
State admin.State
Outputs *flyteIdl.LiteralMap
Message string
LogLinks []*flyteIdl.TaskLog
CustomInfo *structpb.Struct
}

// IsTerminal is used to avoid making network calls to the agent service if the resource is already in a terminal state.
Expand Down Expand Up @@ -192,10 +194,11 @@ func (p *Plugin) ExecuteTaskSync(
}

return nil, ResourceWrapper{
Phase: resource.Phase,
Outputs: resource.Outputs,
Message: resource.Message,
LogLinks: resource.LogLinks,
Phase: resource.Phase,
Outputs: resource.Outputs,
Message: resource.Message,
LogLinks: resource.LogLinks,
CustomInfo: resource.CustomInfo,
}, err
}

Expand All @@ -221,11 +224,12 @@ func (p *Plugin) Get(ctx context.Context, taskCtx webapi.GetContext) (latest web
}

return ResourceWrapper{
Phase: res.Resource.Phase,
State: res.Resource.State,
Outputs: res.Resource.Outputs,
Message: res.Resource.Message,
LogLinks: res.Resource.LogLinks,
Phase: res.Resource.Phase,
State: res.Resource.State,
Outputs: res.Resource.Outputs,
Message: res.Resource.Message,
LogLinks: res.Resource.LogLinks,
CustomInfo: res.Resource.CustomInfo,
}, nil
}

Expand Down Expand Up @@ -254,7 +258,7 @@ func (p *Plugin) Delete(ctx context.Context, taskCtx webapi.DeleteContext) error

func (p *Plugin) Status(ctx context.Context, taskCtx webapi.StatusContext) (phase core.PhaseInfo, err error) {
resource := taskCtx.Resource().(ResourceWrapper)
taskInfo := &core.TaskInfo{Logs: resource.LogLinks}
taskInfo := &core.TaskInfo{Logs: resource.LogLinks, CustomInfo: resource.CustomInfo}

switch resource.Phase {
case flyteIdl.TaskExecution_QUEUED:
Expand Down

0 comments on commit 252bac0

Please sign in to comment.