5
5
"fmt"
6
6
"strconv"
7
7
"strings"
8
- "time"
9
8
10
- "github.com/flyteorg/flyteidl/gen/pb-go/flyteidl/core"
9
+ "github.com/flyteorg/flyteplugins/go/tasks/pluginmachinery/tasklog"
10
+
11
11
"github.com/flyteorg/flyteidl/gen/pb-go/flyteidl/plugins"
12
12
"github.com/flyteorg/flyteplugins/go/tasks/logs"
13
13
"github.com/flyteorg/flyteplugins/go/tasks/pluginmachinery"
@@ -27,11 +27,12 @@ import (
27
27
)
28
28
29
29
const (
30
- rayTaskType = "ray"
31
- KindRayJob = "RayJob"
32
- IncludeDashboard = "include-dashboard"
33
- NodeIPAddress = "node-ip-address"
34
- DashboardHost = "dashboard-host"
30
+ rayTaskType = "ray"
31
+ KindRayJob = "RayJob"
32
+ IncludeDashboard = "include-dashboard"
33
+ NodeIPAddress = "node-ip-address"
34
+ DashboardHost = "dashboard-host"
35
+ DisableUsageStatsStartParameter = "disable-usage-stats"
35
36
)
36
37
37
38
type rayJobResourceHandler struct {
@@ -57,7 +58,6 @@ func (rayJobResourceHandler) BuildResource(ctx context.Context, taskCtx pluginsC
57
58
}
58
59
59
60
podSpec , objectMeta , primaryContainerName , err := flytek8s .ToK8sPodSpec (ctx , taskCtx )
60
-
61
61
if err != nil {
62
62
return nil , flyteerr .Errorf (flyteerr .BadTaskSpecification , "Unable to create pod spec: [%v]" , err .Error ())
63
63
}
@@ -76,26 +76,36 @@ func (rayJobResourceHandler) BuildResource(ctx context.Context, taskCtx pluginsC
76
76
return nil , flyteerr .Errorf (flyteerr .BadTaskSpecification , "Unable to get primary container from the pod: [%v]" , err .Error ())
77
77
}
78
78
79
+ cfg := GetConfig ()
79
80
headReplicas := int32 (1 )
80
81
headNodeRayStartParams := make (map [string ]string )
81
82
if rayJob .RayCluster .HeadGroupSpec != nil && rayJob .RayCluster .HeadGroupSpec .RayStartParams != nil {
82
83
headNodeRayStartParams = rayJob .RayCluster .HeadGroupSpec .RayStartParams
84
+ } else if headNode := cfg .Defaults .HeadNode ; len (headNode .StartParameters ) > 0 {
85
+ headNodeRayStartParams = headNode .StartParameters
83
86
}
87
+
84
88
if _ , exist := headNodeRayStartParams [IncludeDashboard ]; ! exist {
85
89
headNodeRayStartParams [IncludeDashboard ] = strconv .FormatBool (GetConfig ().IncludeDashboard )
86
90
}
91
+
87
92
if _ , exist := headNodeRayStartParams [NodeIPAddress ]; ! exist {
88
- headNodeRayStartParams [NodeIPAddress ] = GetConfig (). NodeIPAddress
93
+ headNodeRayStartParams [NodeIPAddress ] = cfg . Defaults . HeadNode . IPAddress
89
94
}
95
+
90
96
if _ , exist := headNodeRayStartParams [DashboardHost ]; ! exist {
91
- headNodeRayStartParams [DashboardHost ] = GetConfig ().DashboardHost
97
+ headNodeRayStartParams [DashboardHost ] = cfg .DashboardHost
98
+ }
99
+
100
+ if _ , exists := headNodeRayStartParams [DisableUsageStatsStartParameter ]; ! exists && ! cfg .EnableUsageStats {
101
+ headNodeRayStartParams [DisableUsageStatsStartParameter ] = "true"
92
102
}
93
103
94
104
enableIngress := true
95
105
rayClusterSpec := rayv1alpha1.RayClusterSpec {
96
106
HeadGroupSpec : rayv1alpha1.HeadGroupSpec {
97
107
Template : buildHeadPodTemplate (& container , podSpec , objectMeta , taskCtx ),
98
- ServiceType : v1 .ServiceType (GetConfig () .ServiceType ),
108
+ ServiceType : v1 .ServiceType (cfg .ServiceType ),
99
109
Replicas : & headReplicas ,
100
110
EnableIngress : & enableIngress ,
101
111
RayStartParams : headNodeRayStartParams ,
@@ -111,16 +121,24 @@ func (rayJobResourceHandler) BuildResource(ctx context.Context, taskCtx pluginsC
111
121
if spec .MinReplicas != 0 {
112
122
minReplicas = spec .MinReplicas
113
123
}
124
+
114
125
if spec .MaxReplicas != 0 {
115
126
maxReplicas = spec .MaxReplicas
116
127
}
117
128
118
129
workerNodeRayStartParams := make (map [string ]string )
119
130
if spec .RayStartParams != nil {
120
131
workerNodeRayStartParams = spec .RayStartParams
132
+ } else if workerNode := cfg .Defaults .WorkerNode ; len (workerNode .StartParameters ) > 0 {
133
+ workerNodeRayStartParams = workerNode .StartParameters
121
134
}
135
+
122
136
if _ , exist := workerNodeRayStartParams [NodeIPAddress ]; ! exist {
123
- workerNodeRayStartParams [NodeIPAddress ] = GetConfig ().NodeIPAddress
137
+ workerNodeRayStartParams [NodeIPAddress ] = cfg .Defaults .WorkerNode .IPAddress
138
+ }
139
+
140
+ if _ , exists := workerNodeRayStartParams [DisableUsageStatsStartParameter ]; ! exists && ! cfg .EnableUsageStats {
141
+ workerNodeRayStartParams [DisableUsageStatsStartParameter ] = "true"
124
142
}
125
143
126
144
workerNodeSpec := rayv1alpha1.WorkerGroupSpec {
@@ -145,8 +163,8 @@ func (rayJobResourceHandler) BuildResource(ctx context.Context, taskCtx pluginsC
145
163
jobSpec := rayv1alpha1.RayJobSpec {
146
164
RayClusterSpec : rayClusterSpec ,
147
165
Entrypoint : strings .Join (container .Args , " " ),
148
- ShutdownAfterJobFinishes : GetConfig () .ShutdownAfterJobFinishes ,
149
- TTLSecondsAfterFinished : & GetConfig () .TTLSecondsAfterFinished ,
166
+ ShutdownAfterJobFinishes : cfg .ShutdownAfterJobFinishes ,
167
+ TTLSecondsAfterFinished : & cfg .TTLSecondsAfterFinished ,
150
168
RuntimeEnv : rayJob .RuntimeEnv ,
151
169
}
152
170
@@ -347,12 +365,10 @@ func (rayJobResourceHandler) BuildIdentityResource(ctx context.Context, taskCtx
347
365
}, nil
348
366
}
349
367
350
- func getEventInfoForRayJob () (* pluginsCore.TaskInfo , error ) {
351
- taskLogs := make ([]* core.TaskLog , 0 , 3 )
352
- logPlugin , err := logs .InitializeLogPlugins (logs .GetLogConfig ())
353
-
368
+ func getEventInfoForRayJob (logConfig logs.LogConfig , pluginContext k8s.PluginContext , rayJob * rayv1alpha1.RayJob ) (* pluginsCore.TaskInfo , error ) {
369
+ logPlugin , err := logs .InitializeLogPlugins (& logConfig )
354
370
if err != nil {
355
- return nil , err
371
+ return nil , fmt . Errorf ( "failed to initialize log plugins. Error: %w" , err )
356
372
}
357
373
358
374
if logPlugin == nil {
@@ -362,22 +378,31 @@ func getEventInfoForRayJob() (*pluginsCore.TaskInfo, error) {
362
378
// TODO: Retrieve the name of head pod from rayJob.status, and add it to task logs
363
379
// RayJob CRD does not include the name of the worker or head pod for now
364
380
365
- // TODO: Add ray Dashboard URI to task logs
381
+ taskID := pluginContext .TaskExecutionMetadata ().GetTaskExecutionID ().GetID ()
382
+ logOutput , err := logPlugin .GetTaskLogs (tasklog.Input {
383
+ Namespace : rayJob .Namespace ,
384
+ TaskExecutionIdentifier : & taskID ,
385
+ })
386
+
387
+ if err != nil {
388
+ return nil , fmt .Errorf ("failed to generate task logs. Error: %w" , err )
389
+ }
366
390
367
391
return & pluginsCore.TaskInfo {
368
- Logs : taskLogs ,
392
+ Logs : logOutput . TaskLogs ,
369
393
}, nil
370
394
}
371
395
372
- func (rayJobResourceHandler ) GetTaskPhase (ctx context.Context , pluginContext k8s.PluginContext , resource client.Object ) (pluginsCore.PhaseInfo , error ) {
396
+ func (plugin rayJobResourceHandler ) GetTaskPhase (ctx context.Context , pluginContext k8s.PluginContext , resource client.Object ) (pluginsCore.PhaseInfo , error ) {
373
397
rayJob := resource .(* rayv1alpha1.RayJob )
374
- info , err := getEventInfoForRayJob ()
398
+ info , err := getEventInfoForRayJob (GetConfig (). Logs , pluginContext , rayJob )
375
399
if err != nil {
376
400
return pluginsCore .PhaseInfoUndefined , err
377
401
}
402
+
378
403
switch rayJob .Status .JobStatus {
379
404
case rayv1alpha1 .JobStatusPending :
380
- return pluginsCore .PhaseInfoNotReady ( time . Now () , pluginsCore .DefaultPhaseVersion , "job is pending" ), nil
405
+ return pluginsCore .PhaseInfoInitializing ( rayJob . Status . StartTime . Time , pluginsCore .DefaultPhaseVersion , "job is pending" , info ), nil
381
406
case rayv1alpha1 .JobStatusFailed :
382
407
reason := fmt .Sprintf ("Failed to create Ray job: %s" , rayJob .Name )
383
408
return pluginsCore .PhaseInfoFailure (flyteerr .TaskFailedWithError , reason , info ), nil
@@ -386,7 +411,8 @@ func (rayJobResourceHandler) GetTaskPhase(ctx context.Context, pluginContext k8s
386
411
case rayv1alpha1 .JobStatusRunning :
387
412
return pluginsCore .PhaseInfoRunning (pluginsCore .DefaultPhaseVersion , info ), nil
388
413
}
389
- return pluginsCore .PhaseInfoQueued (time .Now (), pluginsCore .DefaultPhaseVersion , "JobCreated" ), nil
414
+
415
+ return pluginsCore .PhaseInfoQueued (rayJob .CreationTimestamp .Time , pluginsCore .DefaultPhaseVersion , "JobCreated" ), nil
390
416
}
391
417
392
418
func init () {
0 commit comments