diff --git a/flyteplugins/go/tasks/plugins/webapi/agent/plugin.go b/flyteplugins/go/tasks/plugins/webapi/agent/plugin.go index 20a65ccba14..a7b2a3d1d47 100644 --- a/flyteplugins/go/tasks/plugins/webapi/agent/plugin.go +++ b/flyteplugins/go/tasks/plugins/webapi/agent/plugin.go @@ -8,6 +8,7 @@ import ( "time" "golang.org/x/exp/maps" + "google.golang.org/protobuf/types/known/structpb" "k8s.io/apimachinery/pkg/util/wait" "github.com/flyteorg/flyte/flyteidl/gen/pb-go/flyteidl/admin" @@ -39,10 +40,11 @@ type Plugin struct { type ResourceWrapper struct { Phase flyteIdl.TaskExecution_Phase // Deprecated: Please Use Phase instead. - State admin.State - Outputs *flyteIdl.LiteralMap - Message string - LogLinks []*flyteIdl.TaskLog + State admin.State + Outputs *flyteIdl.LiteralMap + Message string + LogLinks []*flyteIdl.TaskLog + CustomInfo *structpb.Struct } // IsTerminal is used to avoid making network calls to the agent service if the resource is already in a terminal state. @@ -192,10 +194,11 @@ func (p *Plugin) ExecuteTaskSync( } return nil, ResourceWrapper{ - Phase: resource.Phase, - Outputs: resource.Outputs, - Message: resource.Message, - LogLinks: resource.LogLinks, + Phase: resource.Phase, + Outputs: resource.Outputs, + Message: resource.Message, + LogLinks: resource.LogLinks, + CustomInfo: resource.CustomInfo, }, err } @@ -221,11 +224,12 @@ func (p *Plugin) Get(ctx context.Context, taskCtx webapi.GetContext) (latest web } return ResourceWrapper{ - Phase: res.Resource.Phase, - State: res.Resource.State, - Outputs: res.Resource.Outputs, - Message: res.Resource.Message, - LogLinks: res.Resource.LogLinks, + Phase: res.Resource.Phase, + State: res.Resource.State, + Outputs: res.Resource.Outputs, + Message: res.Resource.Message, + LogLinks: res.Resource.LogLinks, + CustomInfo: res.Resource.CustomInfo, }, nil } @@ -254,7 +258,7 @@ func (p *Plugin) Delete(ctx context.Context, taskCtx webapi.DeleteContext) error func (p *Plugin) Status(ctx context.Context, taskCtx webapi.StatusContext) (phase core.PhaseInfo, err error) { resource := taskCtx.Resource().(ResourceWrapper) - taskInfo := &core.TaskInfo{Logs: resource.LogLinks} + taskInfo := &core.TaskInfo{Logs: resource.LogLinks, CustomInfo: resource.CustomInfo} switch resource.Phase { case flyteIdl.TaskExecution_QUEUED: